use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; use crate::{Consumer, Driver, Executor, Folder, ParallelIterator, Reducer, WithSetup}; #[derive(Copy, Clone, Eq, PartialEq)] pub enum FindMatch { Any, First, Last, } /* Find */ pub struct Find { iterator: X, operation: O, find_match: FindMatch, } impl Find { pub fn new(iterator: X, operation: O, find_match: FindMatch) -> Self { Self { iterator, operation, find_match, } } } impl<'a, X, O> Driver<'a, Option> for Find where X: ParallelIterator<'a>, O: Fn(&X::Item) -> bool + Clone + Send + 'a, { fn exec_with(self, executor: E) -> E::Result where E: Executor<'a, Option>, { let consumer = FindConsumer { operation: self.operation, found: Arc::new(AtomicUsize::new(0)), lower_bound: 1, upper_bound: usize::max_value(), find_match: self.find_match, }; self.iterator.drive(executor, consumer) } } /* FindMap */ pub struct FindMap { iterator: X, operation: O, find_match: FindMatch, } impl FindMap { pub fn new(iterator: X, operation: O, find_match: FindMatch) -> Self { Self { iterator, operation, find_match, } } } impl<'a, X, O, T> Driver<'a, Option> for FindMap where X: ParallelIterator<'a>, O: Fn(X::Item) -> Option + Clone + Send + 'a, T: Send + 'a, { fn exec_with(self, executor: E) -> E::Result where E: Executor<'a, Option>, { Find::new( self.iterator.filter_map(self.operation), |_: &T| true, self.find_match, ) .exec_with(executor) } } /* Any */ pub struct Any { iterator: X, operation: O, } impl Any { pub fn new(iterator: X, operation: O) -> Self { Self { iterator, operation, } } } impl<'a, X, O> Driver<'a, bool, Option> for Any where X: ParallelIterator<'a>, O: Fn(X::Item) -> bool + Clone + Send + 'a, { fn exec_with(self, executor: E) -> E::Result where E: Executor<'a, bool, Option>, { let executor = executor.into_inner(); let ret = Find::new( self.iterator.map(self.operation), bool::clone, FindMatch::Any, ) .exec_with(executor); E::map(ret, |x| x.is_some()) } } /* All */ pub struct All { iterator: X, operation: O, } impl All { pub fn new(iterator: X, operation: O) -> Self { Self { iterator, operation, } } } impl<'a, X, O> Driver<'a, bool, Option> for All where X: ParallelIterator<'a>, O: Fn(X::Item) -> bool + Clone + Send + 'a, { fn exec_with(self, executor: E) -> E::Result where E: Executor<'a, bool, Option>, { let executor = executor.into_inner(); let ret = Find::new( self.iterator.map(self.operation), |x: &bool| !x, FindMatch::Any, ) .exec_with(executor); E::map(ret, |x| x.is_none()) } } /* FindConsumer */ struct FindConsumer { operation: O, found: Arc, lower_bound: usize, upper_bound: usize, find_match: FindMatch, } impl WithSetup for FindConsumer {} impl Consumer for FindConsumer where O: Fn(&T) -> bool + Clone + Send, T: Send, { type Folder = FindFolder; type Reducer = FindReducer; type Result = Option; fn split(self) -> (Self, Self, Self::Reducer) { let FindConsumer { operation, found, lower_bound, upper_bound, find_match, } = self; let mid = lower_bound + (upper_bound - lower_bound) / 2; ( Self { operation: operation.clone(), found: found.clone(), lower_bound, upper_bound: mid, find_match, }, Self { operation, found, lower_bound: mid, upper_bound, find_match, }, FindReducer { find_match: self.find_match, }, ) } fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { self.split() } fn into_folder(self) -> Self::Folder { FindFolder { operation: self.operation, found: self.found, item: None, find_match: self.find_match, lower_bound: self.lower_bound, upper_bound: self.upper_bound, } } fn is_full(&self) -> bool { self.found.load(Ordering::Relaxed) > 0 } } /* FindFolder */ struct FindFolder { operation: O, found: Arc, item: Option, lower_bound: usize, upper_bound: usize, find_match: FindMatch, } impl Folder for FindFolder where O: Fn(&T) -> bool + Clone, { type Result = Option; fn consume(mut self, item: T) -> Self { match self.find_match { FindMatch::First if self.item.is_some() => return self, FindMatch::Any if self.item.is_some() => return self, _ => (), } if (self.operation)(&item) { let mut found = self.found.load(Ordering::Relaxed); loop { let boundary = match self.find_match { FindMatch::Any if found > 0 => return self, FindMatch::First if found != 0 && found < self.lower_bound => return self, FindMatch::Last if found != 0 && found > self.upper_bound => return self, FindMatch::Any => self.lower_bound, FindMatch::First => self.lower_bound, FindMatch::Last => self.upper_bound, }; let ret = self.found.compare_exchange_weak( found, boundary, Ordering::Relaxed, Ordering::Relaxed, ); match ret { Ok(_) => { self.item = Some(item); break; } Err(v) => found = v, } } } self } fn complete(self) -> Self::Result { self.item } fn is_full(&self) -> bool { self.found.load(Ordering::Relaxed) > 0 } } /* FindReducer */ struct FindReducer { find_match: FindMatch, } impl Reducer> for FindReducer { fn reduce(self, left: Option, right: Option) -> Option { match self.find_match { FindMatch::First => left.or(right), FindMatch::Any => left.or(right), FindMatch::Last => right.or(left), } } }