Падение программы с (exit code: 0xc0000005, STATUS_ACCESS_VIOLATION)
Это код.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::alloc::{alloc, dealloc, Layout};
use std::ptr;
use criterion::{criterion_group, criterion_main, Criterion, black_box};
/// Мой "потокобезопасный" пул
pub struct ConcurrentPool {
memory: *mut u8,
next: AtomicUsize,
object_size: usize,
capacity: usize,
}
/// Позволяю "ConcurrentPool" распределять данными через потоки
unsafe impl Send for ConcurrentPool {}
unsafe impl Sync for ConcurrentPool {}
impl ConcurrentPool {
pub fn new(object_size: usize, capacity: usize) -> Arc<Self> {
let layout = Layout::array::<u8>(object_size * capacity).unwrap();
let memory = unsafe { alloc(layout) };
Arc::new(Self {
memory,
next: AtomicUsize::new(capacity),
object_size,
capacity,
})
}
#[inline(always)]
pub fn allocate(&self) -> Option<*mut u8> {
let idx = self.next.fetch_sub(1, Ordering::Relaxed);
if idx == 0 {
None
} else {
Some(unsafe { self.memory.add((idx - 1) * self.object_size) })
}
}
}
impl Drop for ConcurrentPool {
fn drop(&mut self) {
unsafe {
dealloc(self.memory, Layout::array::<u8>(self.object_size * self.capacity).unwrap())
};
}
}
/// Бенчмарки
const OBJECT_SIZE: usize = 1024; // 1KB
const OBJECTS_PER_THREAD: usize = 250;
const THREADS: usize = 4;
fn bench_standard_box(c: &mut Criterion) {
c.bench_function("Standard Box 4 threads", |b| {
b.iter(|| {
let mut handles = Vec::with_capacity(THREADS);
for _ in 0..THREADS {
handles.push(std::thread::spawn(move || {
let mut sum = 0;
for i in 0..OBJECTS_PER_THREAD {
let obj = Box::new([i as u8; OBJECT_SIZE]);
sum += obj[0] as usize;
black_box(obj);
}
sum
}));
}
let total: usize = handles.into_iter()
.map(|h| h.join().unwrap())
.sum();
black_box(total);
})
});
}
fn bench_concurrent_pool(c: &mut Criterion) {
let pool = ConcurrentPool::new(OBJECT_SIZE, THREADS * OBJECTS_PER_THREAD);
c.bench_function("ConcurrentPool 4 threads", |b| {
b.iter(|| {
let mut handles = Vec::with_capacity(THREADS);
for _ in 0..THREADS {
let pool = Arc::clone(&pool);
handles.push(std::thread::spawn(move || {
let mut sum = 0;
for i in 0..OBJECTS_PER_THREAD {
if let Some(ptr) = pool.allocate() {
unsafe { ptr::write_bytes(ptr, i as u8, OBJECT_SIZE) };
sum += unsafe { *ptr } as usize;
}
}
sum
}));
}
let total: usize = handles.into_iter()
.map(|h| h.join().unwrap())
.sum();
black_box(total);
})
});
}
criterion_group! {
name = benches;
config = Criterion::default()
.sample_size(1000)
.measurement_time(std::time::Duration::from_secs(10));
targets = bench_standard_box, bench_concurrent_pool
}
criterion_main!(benches);
Программа, доходя до теста с brench_concurrent_box падает с сообщением exit code: 0xc0000005, STATUS_ACCESS_VIOLATION. Я понимаю, что ошибка где-то в unsafe, но никак не могу найти.
Я пробовал сохранять layout: Layout в структуре, есть подозрения на метод new(), но одно могу сказать точно: unsafe в Rust страшнее всякого C.
Ответы (1 шт):
Ошибка в том, что вы в b.iter(..) переиспользуете пул и pub fn allocate(&self) -> Option<*mut u8> реализован с ошибкой. b.iter(..) пройдет первый цикл правильно, затем у нас после этого закончится пул - next будет равен 0. Потом при вызове allocate мы в idx получим 18446744073709551615 (это usize::MAX) при вычитание (fetch_sub), поэтому проверка на 0 игнорируется.Затем идем к указательной арифметике, где к базовому указателю памяти прибавляем 18446744073709549568 (это (idx - 1) * 1024, в данном случае произошло переполнение). Поэтому мы получаем невалидный указатель на данные и соответственно:
Ошибка "Status Access Violation" (Статус нарушения доступа) в Windows означает, что программа или процесс пытается получить доступ к области памяти, к которой у него нет прав.
Решение это сделать функцию для вычитания с проверкой на 0. Вот пример кода:
fn fetch_saturating_dec(value: &AtomicUsize) -> Option<usize> {
let mut current = value.load(Ordering::Relaxed);
loop {
if current == 0 {
return None;
}
match value.compare_exchange_weak(current, current - 1, Ordering::AcqRel, Ordering::Relaxed)
{
Ok(value) => return Some(value),
Err(value) => current = value,
}
}
}
Весь код, который я тестировал:
#![feature(test)]
use std::{
alloc::{Layout, alloc, dealloc},
ptr::{read, write_bytes},
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
extern crate test;
pub struct ConcurrentPool {
memory: *mut u8,
next: AtomicUsize,
object_size: usize,
capacity: usize,
}
/// Позволяю "ConcurrentPool" распределять данными через потоки
unsafe impl Send for ConcurrentPool {}
unsafe impl Sync for ConcurrentPool {}
impl ConcurrentPool {
pub fn new(object_size: usize, capacity: usize) -> Arc<Self> {
let layout = Layout::array::<u8>(object_size * capacity).unwrap();
let memory = unsafe { alloc(layout) };
Arc::new(Self {
memory,
next: AtomicUsize::new(capacity),
object_size,
capacity,
})
}
#[inline(always)]
pub fn allocate(&self) -> Option<*mut u8> {
fetch_saturating_dec(&self.next)
.map(|idx| unsafe { self.memory.add(idx * self.object_size) })
}
}
impl Drop for ConcurrentPool {
fn drop(&mut self) {
unsafe {
dealloc(
self.memory,
Layout::array::<u8>(self.object_size * self.capacity).unwrap(),
)
};
}
}
fn fetch_saturating_dec(value: &AtomicUsize) -> Option<usize> {
let mut current = value.load(Ordering::Relaxed);
loop {
if current == 0 {
return None;
}
match value.compare_exchange_weak(current, current - 1, Ordering::AcqRel, Ordering::Relaxed)
{
Ok(value) => return Some(value),
Err(value) => current = value,
}
}
}
const OBJECT_SIZE: usize = 1024; // 1KB
const OBJECTS_PER_THREAD: usize = 250;
const THREADS: usize = 4;
fn main() {
let pool = ConcurrentPool::new(OBJECT_SIZE, THREADS * OBJECTS_PER_THREAD);
for i in 0..2 {
let mut handles = Vec::with_capacity(THREADS);
for _ in 0..THREADS {
let pool = Arc::clone(&pool);
handles.push(std::thread::spawn(move || {
let mut sum = 0;
for i in 0..OBJECTS_PER_THREAD {
if let Some(ptr) = pool.allocate() {
unsafe { write_bytes(ptr, i as u8, OBJECT_SIZE) };
sum += unsafe { read::<u8>(ptr) } as usize;
}
}
sum
}));
}
let total = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.sum::<usize>();
println!("total = {total}");
if i == 0 {
assert_eq!(total, 124500);
} else if i == 1 {
// no memory
assert_eq!(total, 0);
}
}
}
#[cfg(test)]
mod tests {
use std::ptr::{read, write_bytes};
use test::Bencher;
use super::*;
#[bench]
fn bench_standard_box(b: &mut Bencher) {
b.iter(|| {
let mut handles = Vec::with_capacity(THREADS);
for _ in 0..THREADS {
handles.push(std::thread::spawn(move || {
let mut sum = 0;
for i in 0..OBJECTS_PER_THREAD {
let obj = test::black_box(Box::new([i as u8; OBJECT_SIZE]));
sum += obj[0] as usize;
}
sum
}));
}
test::black_box(
handles
.into_iter()
.map(|handle| handle.join().unwrap())
.sum::<usize>(),
);
});
}
#[bench]
fn bench_concurrent_pool(b: &mut Bencher) {
let pool = ConcurrentPool::new(OBJECT_SIZE, THREADS * OBJECTS_PER_THREAD);
b.iter(|| {
let mut handles = Vec::with_capacity(THREADS);
for _ in 0..THREADS {
let pool = Arc::clone(&pool);
handles.push(std::thread::spawn(move || {
let mut sum = 0;
for i in 0..OBJECTS_PER_THREAD {
if let Some(ptr) = pool.allocate() {
unsafe { write_bytes(ptr, i as u8, OBJECT_SIZE) };
sum += unsafe { read::<u8>(ptr) } as usize;
}
}
sum
}));
}
test::black_box(
handles
.into_iter()
.map(|handle| handle.join().unwrap())
.sum::<usize>(),
);
});
}
}
Результат:
% cargo +nightly bench
Finished `bench` profile [optimized] target(s) in 0.03s
Running unittests src/main.rs (target/release/deps/test_thread-1987f76508c67b3e)
running 2 tests
test tests::bench_concurrent_pool ... bench: 57,233.30 ns/iter (+/- 1,607.18)
test tests::bench_standard_box ... bench: 67,687.85 ns/iter (+/- 1,383.13)
test result: ok. 0 passed; 0 failed; 0 ignored; 2 measured; 0 filtered out; finished in 7.99s