Rayon: A data parallelism library for Rust

Overview

Rayon

Rayon crate Rayon documentation minimum rustc 1.36 build status Join the chat at https://gitter.im/rayon-rs/Lobby

Rayon is a data-parallelism library for Rust. It is extremely lightweight and makes it easy to convert a sequential computation into a parallel one. It also guarantees data-race freedom. (You may also enjoy this blog post about Rayon, which gives more background and details about how it works, or this video, from the Rust Belt Rust conference.) Rayon is available on crates.io, and API Documentation is available on docs.rs.

Parallel iterators and more

Rayon makes it drop-dead simple to convert sequential iterators into parallel ones: usually, you just change your foo.iter() call into foo.par_iter(), and Rayon does the rest:

use rayon::prelude::*;
fn sum_of_squares(input: &[i32]) -> i32 {
    input.par_iter() // <-- just change that!
         .map(|&i| i * i)
         .sum()
}

Parallel iterators take care of deciding how to divide your data into tasks; it will dynamically adapt for maximum performance. If you need more flexibility than that, Rayon also offers the join and scope functions, which let you create parallel tasks on your own. For even more control, you can create custom threadpools rather than using Rayon's default, global threadpool.

No data races

You may have heard that parallel execution can produce all kinds of crazy bugs. Well, rest easy. Rayon's APIs all guarantee data-race freedom, which generally rules out most parallel bugs (though not all). In other words, if your code compiles, it typically does the same thing it did before.

For the most, parallel iterators in particular are guaranteed to produce the same results as their sequential counterparts. One caveat: If your iterator has side effects (for example, sending methods to other threads through a Rust channel or writing to disk), those side effects may occur in a different order. Note also that, in some cases, parallel iterators offer alternative versions of the sequential iterator methods that can have higher performance.

Using Rayon

Rayon is available on crates.io. The recommended way to use it is to add a line into your Cargo.toml such as:

[dependencies]
rayon = "1.5"

To use the Parallel Iterator APIs, a number of traits have to be in scope. The easiest way to bring those things into scope is to use the Rayon prelude. In each module where you would like to use the parallel iterator APIs, just add:

use rayon::prelude::*;

Rayon currently requires rustc 1.36.0 or greater.

Contribution

Rayon is an open source project! If you'd like to contribute to Rayon, check out the list of "help wanted" issues. These are all (or should be) issues that are suitable for getting started, and they generally include a detailed set of instructions for what to do. Please ask questions if anything is unclear! Also, check out the Guide to Development page on the wiki. Note that all code submitted in PRs to Rayon is assumed to be licensed under Rayon's dual MIT/Apache2 licensing.

Quick demo

To see Rayon in action, check out the rayon-demo directory, which includes a number of demos of code using Rayon. For example, run this command to get a visualization of an nbody simulation. To see the effect of using Rayon, press s to run sequentially and p to run in parallel.

> cd rayon-demo
> cargo run --release -- nbody visualize

For more information on demos, try:

> cd rayon-demo
> cargo run --release -- --help

Other questions?

See the Rayon FAQ.

License

Rayon is distributed under the terms of both the MIT license and the Apache License (Version 2.0). See LICENSE-APACHE and LICENSE-MIT for details. Opening a pull requests is assumed to signal agreement with these licensing terms.

Comments
  • Add the ability to customize thread spawning

    Add the ability to customize thread spawning

    As an alternative to ThreadPoolBuilder::build() and build_global(), the new spawn() and spawn_global() methods take a closure which will be responsible for spawning the actual threads. This is called with a ThreadBuilder argument that provides the thread index, name, and stack size, with the expectation to call its run() method in the new thread.

    The motivating use cases for this are:

    • experimental WASM threading, to be externally implemented.
    • scoped threads, like the new test using scoped_tls.
    opened by cuviper 39
  • Reduce use of the pub-in-private hack

    Reduce use of the pub-in-private hack

    A recent thread on internals got me thinking whether we could avoid the private_impl!{} hack after all. With a few extra constraints, this does work!

    With ParallelString: Borrow<str>, we can provide default implementations of all its methods, and should be free to add new methods the same way. Then a blanket impl only has to meet the constraints, so there's also less repetition defining these methods.

    Similarly ParallelSlice<T: Sync>: Borrow<[T]> works for slices, and a new ParallelSliceMut<T: Send>: BorrowMut<[T]> for mutable slices. The latter is also added to the prelude.

    I also considered AsRef and AsMut, which are very similar to borrows, but I decided against it for the single fact that strings implement both AsRef<str> and AsRef<[u8]>. This would make it ambiguous if ParallelString and ParallelSlice shared any method names, and to drive that point home I went ahead and added par_split and par_split_mut on slices.

    The only remaining private_impl!{} is in rayon::str::Pattern. I don't think we can use a similar trick on that one, but the more I think about it, the more I think we should just hide Pattern altogether as pub-in-private itself. Its API is not great, and not something we really want folks to call directly. All users really need to know are which types implement it, and we can document that on the public splitter functions that use it.

    opened by cuviper 29
  • rayon thread pool coredump

    rayon thread pool coredump

    hey, I have a problem with rayon thread pool,Probabilistic coredump. the core like:

    rayon::ThreadPoolBuilder::new()
                                        .stack_size(8 * 1024 *1024)
                                        .num_threads((num_cpus::get() * 6 / 8).min(32))
                                        .panic_handler(rayon_panic_handler)
                                        .build()
                                        .expect("Failed to initialize a thread pool for worker")
    thread_pool.install(move || {
                                                    loop {
                                                        //debug!("mine_one_unchecked");
                                                        let block_header =
                                                            BlockHeader::mine_once_unchecked(&block_template, &terminator_clone, &mut thread_rng())?;
                                                        //debug!("mine_one_unchecked end");
                                                        // Ensure the share difficulty target is met.
                                                        if N::posw().verify(
                                                            block_header.height(),
                                                            share_difficulty,
                                                            &[*block_header.to_header_root().unwrap(), *block_header.nonce()],
                                                            block_header.proof(),
                                                        ) {
                                                            return Ok::<(N::PoSWNonce, PoSWProof<N>, u64), anyhow::Error>((
                                                                block_header.nonce(),
                                                                block_header.proof().clone(),
                                                                block_header.proof().to_proof_difficulty()?,
                                                            ));
                                                        }
                                                    }
                                                })
    

    the backtrace: gdb) bt #0 <alloc::vec::Vec<T,A> as core::ops::deref::Deref>::deref (self=0xf58017d4cd80e144) at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/alloc/src/vec/mod.rs:2402 #1 <alloc::vec::Vec<T,A> as core::ops::index::Index<I>>::index (self=0xf58017d4cd80e144, index=1) at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/alloc/src/vec/mod.rs:2496 #2 rayon_core::sleep::Sleep::wake_specific_thread (self=0xf58017d4cd80e134, index=1) at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/sleep/mod.rs:355 #3 0x000055e3c542dbe0 in rayon_core::sleep::Sleep::notify_worker_latch_is_set (self=0xf58017d4cd80e134, target_worker_index=1) at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/sleep/mod.rs:245 #4 rayon_core::registry::Registry::notify_worker_latch_is_set (target_worker_index=1, self=<optimized out>) at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/registry.rs:544 #5 <rayon_core::latch::SpinLatch as rayon_core::latch::Latch>::set (self=0x7facd9bec448) at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/latch.rs:214 #6 <rayon_core::job::StackJob<L,F,R> as rayon_core::job::Job>::execute (this=0x7facd9bec448) at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/job.rs:123 #7 0x000055e3c53da4b1 in rayon_core::job::JobRef::execute (self=<optimized out>) at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/job.rs:59 #8 rayon_core::registry::WorkerThread::execute (self=<optimized out>, job=...) at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/registry.rs:749 #9 rayon_core::registry::WorkerThread::wait_until_cold (self=<optimized out>, latch=<optimized out>) at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/registry.rs:726 #10 0x000055e3c5633534 in rayon_core::registry::WorkerThread::wait_until (self=0x7facd8be4800, latch=<optimized out>) at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/registry.rs:700 #11 rayon_core::registry::main_loop (registry=..., index=9, worker=...) at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/registry.rs:833 #12 rayon_core::registry::ThreadBuilder::run (self=...) at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/registry.rs:55 #13 0x000055e3c5635581 in <rayon_core::registry::DefaultSpawn as rayon_core::registry::ThreadSpawn>::spawn::{{closure}} () at /mnt/fstar/.aleo/aleo1/.cargo/registry/src/mirrors.sjtug.sjtu.edu.cn-7a04d2510079875b/rayon-core-1.9.1/src/registry.rs:100 #14 std::sys_common::backtrace::__rust_begin_short_backtrace (f=...) at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/sys_common/backtrace.rs:123 #15 0x000055e3c5630994 in std::thread::Builder::spawn_unchecked::{{closure}}::{{closure}} () at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/thread/mod.rs:483 #16 <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once (self=..., _args=<optimized out>) at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/panic/unwind_safe.rs:271 #17 std::panicking::try::do_call (data=<optimized out>) at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:403 #18 std::panicking::try (f=...) at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:367 #19 std::panic::catch_unwind (f=...) at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panic.rs:133 #20 std::thread::Builder::spawn_unchecked::{{closure}} () at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/thread/mod.rs:482 #21 core::ops::function::FnOnce::call_once{{vtable-shim}} () at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/ops/function.rs:227 #22 0x000055e3c585ce05 in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/alloc/src/boxed.rs:1691 #23 <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/alloc/src/boxed.rs:1691 #24 std::sys::unix::thread::Thread::new::thread_start () at library/std/src/sys/unix/thread.rs:106 #25 0x00007fb41d7696db in start_thread (arg=0x7facd8be5700) at pthread_create.c:463 #26 0x00007fb41cef061f in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

    opened by scuwan 27
  • RFC: adaptive

    RFC: adaptive "thief splitting" algorithm for par_iter

    This introduces an adaptive algorithm for splitting par_iter jobs, used only as a default case when no weights are specified. Initially, it will split into enough jobs to fill every thread. Then whenever a job is stolen, that job will again be split enough for every thread.

    This is roughly based on @Amanieu's description in the users forum of the algorithm used by Async++ and TBB.

    opened by cuviper 27
  • Build without +nightly

    Build without +nightly

    First, thanks for a great library everybody! 😄

    I happened to see this, and I tried the examples. They seem to run just fine on stable Rust nowadays. Any particular reason why we'd want to keep running these on nightly, or is just a remnant from the past?

    opened by perlun 25
  • Limit number of  threads Rayon is allowed to use

    Limit number of threads Rayon is allowed to use

    • This allows the user to specify a maximum number of threads that Rayon is allowed to use.
    • The user can call the rayon::initialize(max_num) function to specify the limit. This breaks the API since this function didn't expect any arguments before. This function should have been used for benchmarking only, so the number of affected users should be small.
    • Alternatively a new initialize function could have been created, for example rayon::initialize_limit(max_num) and no API would be broken.
    • If the user does not call rayon::initialize(max_num) the old behavior of using all the cores available is given.
    • If the user specifies 0 as limit, Rayon ends up in a infinite loop. Thus an assert! checks the user input. Alternatively instead of a panic Rayon could just use the all the cores available, but this has the disadvantage that the user in not informed about the erroneous input.
    opened by willi-kappler 24
  • Question: Concatenate list of indexed parallel iterators?

    Question: Concatenate list of indexed parallel iterators?

    This is admittedly more of a question than a pull request but I thought that it might be easiest to answer this with the code at hand.

    I have a situation in which I basically want to concatenate a

    Vec<I> where I: IndexedParallelIterator
    

    into a single indexed parallel iterator with the same semantics as if I would .into_iter().flatten() the vector.

    I tried to implement this by looking at the existing Chain adapter which however handles a fixed number of inner iterators with possibly heterogeneous types instead of a unknown number of iterators with homogeneous types.

    I am currently stuck at trying to collect the producers created by the inner iterators into a vector. This does not seem to work as it seems possible that each invocation of

    <I as IndexParallelIterator>::with_producer
    

    will call the given callback with a different producer type.

    Leaving aside the overhead, boxing does not seem possible due to Producer not being object safe. I did consider defining a simpler DynProducer trait that would be, but I think this would end with

    trait IntoIter<T>: Iterator<Item = T> + DoubleEndedIterator + ExactSizeIterator;
    type IntoIter = Box<dyn IntoIter<Self::Item>>;
    

    which feels prohibitively expensive.

    I also considered building a producer out of a list of iterators also recording a list of split positions for each one and only turn this into producers and split those when e.g. Producer::into_iter is actually called. But that does seem to imply at least sharing (and hence synchronizing) the original list between tasks if a split happens to saddle the boundary between two of original iterators. Also, I am not sure if this would avoid the issue of different producer types eventually.

    Does anybody know whether this is possible at all and if so how?

    opened by adamreichold 23
  • Fork into `rayon-core` and a stable `rayon` facade

    Fork into `rayon-core` and a stable `rayon` facade

    The root crate is now rayon-core, with all functionality, and under rayon-stable/ is the new rayon crate that only publicizes select really-stable interfaces -- those we feel are nearly ready for 1.0. For the most part, this means the APIs for using rayon, while we can continue tweaking the APIs for implementing rayon traits in the core with proper pre-1.0 semver bumps.

    (The directories are structured this way just to minimize the code churn.)

    Most of the traits are now split, e.g. ParallelIterator and a new ParallelIteratorImpl, so only the former is publicized in rayon. Methods like drive and with_producer, anything that implementors have to provide, are isolated in *Impl.

    opened by cuviper 23
  • new scheduler from RFC 5

    new scheduler from RFC 5

    Implementation of the scheduler described in https://github.com/rayon-rs/rfcs/pull/5 -- modulo the fact that the RFC is mildly out of date. There is a walkthrough video available.

    To Do List:

    • [x] Fix the cargo lock
    • [x] Address use of AtomicU64
    • [x] Document the handling of rollover and wakeups and convince ourselves it's sound
    • [ ] Adopt and document the proposed scheme for the job event counter
    • [ ] Review RFC and list out the places where it differs from the branch
    opened by nikomatsakis 22
  • Introduce IndexedParallelIterator::mapfold_collect_into_vec

    Introduce IndexedParallelIterator::mapfold_collect_into_vec

    Nit: do bikeshed the name, I hate it anyway.

    This method introduces a neat way to collect some parallel iterator in a vec in an allocation-efficient way, while still being able to do some operations on the data being collected. It uses a new trait, MapFolder<Item>, which is a generalised closure for the classic mapfold operation.

    Given the very raison d'être of parallel iterators is to be able to do work by batches, the result of the various MapFolder::complete calls are passed to a Reducer and returned from mapfold_collect_into_vec.

    Why

    Because, as usual, I need this as part of some Servo-related stuff. :) In Victor, we build in parallel a vector of fragments from a vector of boxes:

    https://github.com/SimonSapin/victor/blob/6ddce7345030ae4d25f846ca757d6b50f3f8aeac/victor/src/layout/mod.rs#L56-L59

    Some CSS features (among which absolute positioning, in case you were curious) require us to make some of those fragments go up the fragment tree to where they should actually belong. To do this, we would like to be able map the boxes into fragments as usual, but also collect the absolutely positioned ones into a separate vec that we can then append to the parent's own list of absolutely positioned fragments, until we reach the final one. There are way fewer absolutely positioned boxes than normal ones so it's not a problem to concat them as we traverse the tree. This method allows us to achieve that this way:

    #[derive(Clone)]
    struct BoxMapFolder(Vec<AbsposFragment>);
    
    impl<'a> MapFolder<&'a BlockLevelBox> for BoxMapFolder {
        type Output = Fragment;
        type Result = Vec<AbsposFragment>;
    
        fn consume(mut self, block_level_box: &'a BlockLevelBox) -> (Self, Self::Output) {
            let (fragment, mut absolutely_positioned_fragments) = block_level_box.layout();
            self.0.append(&mut absolutely_positioned_fragments);
            (self, fragment)
        }
    
        fn complete(self) -> Self::Result {
            self.0
        }
    }
    
    #[derive(Clone)]
    struct BoxReducer;
    
    impl Reducer<Vec<AbsposFragment>> for BoxReducer {
        fn reduce(
            self,
            mut left: Vec<AbsposFragment>,
            mut right: Vec<AbsposFragment>,
        ) -> Vec<AbsposFragment> {
            left.append(&mut right);
            left
        }
    }
    
    let mut child_fragments = vec![];
    let absolutely_positioned_fragments = child_boxes
        .par_iter()
        .mapfold_collect_into_vec(
            &mut child_fragments,
            BoxMapFolder(vec![]),
            BoxReducer,
        );
    

    Unresolved questions

    Should this method really be relying on plumbing?

    Probably not, I guess we should have a variety of different methods like map_init, map_with, fold_with etc, but as a proof of concept I didn't want to lose too much time on how the functionality should be exposed before I make the PR. Those methods will also require names, and I can't find any which doesn't annoy me.

    Should it be named that way?

    Probably not.

    Is there a way to avoid the need for a new trait MapFolder?

    I don't think so but I am certainly not sure of that.

    opened by nox 22
  • Rayon uses a lot of CPU when there's not a lot of work to do

    Rayon uses a lot of CPU when there's not a lot of work to do

    The following program uses about 30% of the CPU on 4 core (8 HT) Linux and Mac machines

    fn main() {
        while true {
            std::thread::sleep(std::time::Duration::from_millis(10));
            rayon::spawn(move || {  } );
        }
    }
    

    Reducing the sleep duration to 1ms pushes the CPU usage up to 200%.

    opened by jrmuizel 22
  • Latency optimized scope API

    Latency optimized scope API

    I ran into an issue where a scope, after reaching the end of execution having spawned all of its tasks, turns to stealing tasks originating elsewhere instead of returning execution to the caller. This behavior is useful because the calling thread can usually participate in remaining tasks, but in my case it delays kicking off even more work coming up after that scope, leading to less parallelism in aggregate.

    I haven't tested all of the API variants, but I saw this happen most recently with in_place_scope_fifo , calling it from a rayon thread.

    I've created a super janky workaround, and although it's definitely unsound, it performs better in my tests which are sensitive to the tail latency of scope() calls.
    pub(crate) struct LatencyOptimizedScopeFifo {
        mutex: Arc<Mutex<usize>>,
        condvar: Arc<Condvar>,
    }
    
    pub(crate) fn latency_optimized_in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
    where
        OP: FnOnce(&LatencyOptimizedScopeFifo) -> R,
    {
        let scope = LatencyOptimizedScopeFifo {
            mutex: Arc::new(Mutex::new(0)),
            condvar: Arc::new(Condvar::new()),
        };
        let result = op(&scope);
        scope.finish();
        result
    }
    
    struct ScopePtr<T>(*const T);
    unsafe impl<T> Send for ScopePtr<T> {}
    unsafe impl<T> Sync for ScopePtr<T> {}
    
    // Needed to extend 'scope -> 'static as rayon doesn't provide a function like std::thread::Builder::spawn_unchecked
    unsafe fn extend<'a, OP>(func: OP) -> Box<dyn FnOnce(&LatencyOptimizedScopeFifo) + Send + 'static>
    where
        OP: FnOnce(&LatencyOptimizedScopeFifo) + Send + 'a,
    {
        let boxed: Box<dyn FnOnce(&LatencyOptimizedScopeFifo) + Send + 'a> = Box::new(func);
        std::mem::transmute(boxed)
    }
    
    impl LatencyOptimizedScopeFifo {
        pub(crate) fn spawn_fifo<'scope, OP>(&self, op: OP)
        where
            OP: FnOnce(&LatencyOptimizedScopeFifo) + Send + 'scope,
        {
            *self.mutex.lock() += 1;
            let op = unsafe { extend(op) };
            let unsafe_ref = ScopePtr(self as *const LatencyOptimizedScopeFifo);
    
            rayon::spawn(move || {
                let unsafe_ref = unsafe_ref;
                let scope = unsafe {
                    std::mem::transmute::<*const LatencyOptimizedScopeFifo, &LatencyOptimizedScopeFifo>(unsafe_ref.0)
                };
    
                op(scope);
    
                let mut lock = scope.mutex.lock();
                *lock -= 1;
                if *lock == 0 {
                    scope.condvar.notify_one();
                }
            });
        }
    
        fn finish(self) {
            let mut lock = self.mutex.lock();
            if *lock > 0 {
                self.condvar.wait(&mut lock);
            }
        }
    }
    
    
    opened by farnoy 0
  • Clone+Send closures instead of Send+Sync closures?

    Clone+Send closures instead of Send+Sync closures?

    Say I have a container foo, and for each element in foo I want to lookup another container associated with that element, and process them all one by one. I might write something like:

    fn yield_inner(foo: &Foo, table: &ContainerTable) -> Iterator<Item=&Bar> {
        foo.iter().flat_map(|x| { table.lookup(x).iter() } )
    }
    

    But if the lookup uses a thread local container table this breaks:

    thread_local!(static TABLE: ContainerTable = ContainerTable::new(); )
    fn yield_inner2(foo: &Foo) -> Iterator<Item=&Bar> {
        foo.iter().flat_map(|x| { 
            TABLE.with(|table| {
                table.lookup(x).iter() 
            })
        })
    }
    

    yield_inner2 will fail to compile because table has a lifetime that is limited to the body of the closure. This is annoying because most of the time you're not doing any of the things that lead thread local to need an "internal iteration" style design -- you're not accessing it from drop, causing things to come back to life, the thread is definitely going to outlive your use of the data, etc. I could work around the issue by using Rc and Option:

    thread_local!(static TABLE: Rc<ContainerTable> = Rc::new(ContainerTable::new()); )
    fn yield_inner3(foo: &Foo) -> Iterator<Item=&Bar> {
        let table = None;
        foo.iter().flat_map(move |x| { // `move` stores `table` on closure
            table = Some(TABLE.with(|table_rc| { table_rc.clone() }));
            table.as_ref().unwrap().lookup(x).iter()
        })
    }
    

    Now I think everything is fine as long as this iteration is single threaded. Since the move causes the closure to keep a clone of the Rc, the table will stay alive as long as the closure does, and the closure will be stored on the FlatMap returned by flat_map so the closure will stay alive as long as the overall iterator does. But since we're using thread local obviously our goal is to use multiple threads with rayon and par_iter:

    thread_local!(static TABLE: Rc<ContainerTable> = Rc::new(ContainerTable::new()); )
    fn yield_inner4(foo: &Foo) -> Iterator<Item=&Bar> {
        let table = None;
        foo.par_iter().flat_map_iter(move |x| { // `move` stores `table` on closure
            table = Some(TABLE.with(|table_rc| { table_rc.clone() }));
            table.as_ref().unwrap().lookup(x).iter()
        })
    }
    

    This doesn't compile -- I was expecting that each worker thread would get its own copy of the closure. However, ParallelIterator::flat_map_iter instead enforces that the closure is Send+Sync. What I think I want is a flat_map where instead the closure is Clone+Send, and every worker thread gets their own clone that they then reuse as they process elements. I could try writing a ParallelIteraterExt::flat_map_iter_clone but is that necessary or is there a better way? I haven't dug into implementing my own ParallelIterator enough to know if it's feasible with the current API or if internally it assumes Sync closures.

    Edit: fixed some s/map/flat_map

    opened by jgarvin 2
  • rayon and itertools

    rayon and itertools

    hello on this link which will be at the end of this post i have some methods that i would like to speedup with itertools and rayon and i would like to ask if there is something i can do? https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=6ddc6873616f259bc999fae1ed1aaba2

    opened by bobi6666 3
  • The possibility of dynamic add/remove threads as need in threadpool?

    The possibility of dynamic add/remove threads as need in threadpool?

    Assuming I have built a thread pool with n threads.I will spawn n tasks, which is a loop of processing computation-heavy works.These tasks continuously processing until received the finished signal of all works.

    Because creating working environment is expensive, i don't want to create task for each work. Importantly,some task may block due to work.I think it's a waste on the cpu cycles of blocking thread.

    So I want to dynamic add thread when have blocking thread,and remove redundant thread when blocking thread resumes,just as the docs of ThreadPoolBuilder::num_threads .

    Do we have space to implement this behaviour or any available suggestions?

    opened by Fomalhauthmj 3
  • Rayon citation

    Rayon citation

    Sorry to clog up issues with this question, but I am unsure of where else to ask this.

    I am using rayon for a research project and would like to cite rayon for its great benefit in seamlessly parallelizing my research code. Is there a desired way to cite rayon in my paper(s)?

    Thanks!

    opened by 2pt0 1
  • par_chunks might use the main thread

    par_chunks might use the main thread

    I noticed that par_chunks() does not spawn new threads when chunk_size is the same or smaller than the amount of elements. Rayon will just use the main thread.

    This is probably a good performance optimization, however it can lead to surprises when expectations are made about the threads like the stack size.

    I need a lot of stack size for my application so I set this:

    rayon::ThreadPoolBuilder::new()
        .stack_size(16 * 1024 * 1024)
        .build_global()
        .unwrap();
    

    However, the use of the main thread led to a stack overflow because the stack of the main thread is too small.

    Maybe this behavior should at least be documented in the API to avoid surprises?

    opened by Fee0 4
Owner
null
A Modern Real-Time Data Processing & Analytics DBMS with Cloud-Native Architecture, built to make the Data Cloud easy

A Modern Real-Time Data Processing & Analytics DBMS with Cloud-Native Architecture, built to make the Data Cloud easy

Datafuse Labs 5k Jan 9, 2023
New generation decentralized data warehouse and streaming data pipeline

World's first decentralized real-time data warehouse, on your laptop Docs | Demo | Tutorials | Examples | FAQ | Chat Get Started Watch this introducto

kamu 184 Dec 22, 2022
ConnectorX - Fastest library to load data from DB to DataFrames in Rust and Python

ConnectorX enables you to load data from databases into Python in the fastest and most memory efficient way.

SFU Database Group 939 Jan 5, 2023
A cross-platform library to retrieve performance statistics data.

A toolkit designed to be a foundation for applications to monitor their performance.

Lark Technologies Pte. Ltd. 155 Nov 12, 2022
Dataflow is a data processing library, primarily for machine learning

Dataflow Dataflow is a data processing library, primarily for machine learning. It provides efficient pipeline primitives to build a directed acyclic

Sidekick AI 9 Dec 19, 2022
DataFrame / Series data processing in Rust

black-jack While PRs are welcome, the approach taken only allows for concrete types (String, f64, i64, ...) I'm not sure this is the way to go. I want

Miles Granger 30 Dec 10, 2022
A Modern Real-Time Data Processing & Analytics DBMS with Cloud-Native Architecture, written in Rust

Datafuse Modern Real-Time Data Processing & Analytics DBMS with Cloud-Native Architecture Datafuse is a Real-Time Data Processing & Analytics DBMS wit

Datafuse Labs 5k Jan 4, 2023
Fill Apache Arrow record batches from an ODBC data source in Rust.

arrow-odbc Fill Apache Arrow arrays from ODBC data sources. This crate is build on top of the arrow and odbc-api crate and enables you to read the dat

Markus Klein 21 Dec 27, 2022
A fast, powerful, flexible and easy to use open source data analysis and manipulation tool written in Rust

fisher-rs fisher-rs is a Rust library that brings powerful data manipulation and analysis capabilities to Rust developers, inspired by the popular pan

Syed Vilayat Ali Rizvi 5 Aug 31, 2023
A fast, powerful, flexible and easy to use open source data analysis and manipulation tool written in Rust

fisher-rs fisher-rs is a Rust library that brings powerful data manipulation and analysis capabilities to Rust developers, inspired by the popular pan

null 5 Sep 6, 2023
High-performance runtime for data analytics applications

Weld Documentation Weld is a language and runtime for improving the performance of data-intensive applications. It optimizes across libraries and func

Weld 2.9k Dec 28, 2022
A high-performance, high-reliability observability data pipeline.

Quickstart • Docs • Guides • Integrations • Chat • Download What is Vector? Vector is a high-performance, end-to-end (agent & aggregator) observabilit

Timber 12.1k Jan 2, 2023
Quickwit is a big data search engine.

Quickwit This repository will host Quickwit, the big data search engine developed by Quickwit Inc. We will progressively polish and opensource our cod

Quickwit Inc. 2.9k Jan 7, 2023
A highly efficient daemon for streaming data from Kafka into Delta Lake

A highly efficient daemon for streaming data from Kafka into Delta Lake

Delta Lake 172 Dec 23, 2022
TensorBase is a new big data warehousing with modern efforts.

TensorBase is a new big data warehousing with modern efforts.

null 1.3k Jan 4, 2023
Analysis of Canadian Federal Elections Data

Canadian Federal Elections election is a small Rust program for processing vote data from Canadian Federal Elections. After building, see election --h

Colin Woodbury 2 Sep 26, 2021
📊 Cube.js — Open-Source Analytics API for Building Data Apps

?? Cube.js — Open-Source Analytics API for Building Data Apps

Cube.js 14.4k Jan 8, 2023
Provides a way to use enums to describe and execute ordered data pipelines. 🦀🐾

enum_pipline Provides a way to use enums to describe and execute ordered data pipelines. ?? ?? I needed a succinct way to describe 2d pixel map operat

Ben Greenier 0 Oct 29, 2021
Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. 🚀

flaco Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. ?? Have a gander at the initial benchmarks

Miles Granger 14 Oct 31, 2022