A modular implementation of timely dataflow in Rust

Last update: Jun 20, 2022

Timely Dataflow

Timely dataflow is a low-latency cyclic dataflow computational model, introduced in the paper Naiad: a timely dataflow system. This project is an extended and more modular implementation of timely dataflow in Rust.

This project is something akin to a distributed data-parallel compute engine, which scales the same program up from a single thread on your laptop to distributed execution across a cluster of computers. The main goals are expressive power and high performance. It is probably strictly more expressive and faster than whatever you are currently using, assuming you aren't yet using timely dataflow.

Be sure to read the documentation for timely dataflow. It is a work in progress, but mostly improving. There is more long-form text in mdbook format with examples tested against the current builds. There is also a series of blog posts (part 1, part 2, part 3) introducing timely dataflow in a different way, though be warned that the examples there may need tweaks to build against the current code.

An example

To use timely dataflow, add the following to the dependencies section of your project's Cargo.toml file:

[dependencies]
timely="*"

This will bring in the timely crate from crates.io, which should allow you to start writing timely dataflow programs like this one (also available in timely/examples/simple.rs):

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x));
    });
}

You can run this example from the root directory of the timely-dataflow repository by typing

% cargo run --example simple
Running `target/debug/examples/simple`
seen: 0
seen: 1
seen: 2
seen: 3
seen: 4
seen: 5
seen: 6
seen: 7
seen: 8
seen: 9

This is a very simple example (it's in the name), which only just suggests at how you might write dataflow programs.

Doing more things

For a more involved example, consider the very similar (but more explicit) examples/hello.rs, which creates and drives the dataflow separately:

extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // create a new input, exchange data, and inspect its output
        worker.dataflow(|scope| {
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
                 .probe_with(&mut probe);
        });

        // introduce data and watch!
        for round in 0..10 {
            if index == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}

This example does a fair bit more, to show off more of what timely can do for you.

We first build a dataflow graph creating an input stream (with input_from), whose output we exchange to drive records between workers (using the data itself to indicate which worker to route to). We inspect the data and print the worker index to indicate which worker received which data, and then probe the result so that each worker can see when all of a given round of data has been processed.

We then drive the computation by repeatedly introducing rounds of data, where the round itself is used as the data. In each round, each worker introduces the same data, and then repeatedly takes dataflow steps until the probe reveals that all workers have processed all work for that epoch, at which point the computation proceeds.

With two workers, the output looks like

% cargo run --example hello -- -w2
Running `target/debug/examples/hello -w2`
worker 0:   hello 0
worker 1:   hello 1
worker 0:   hello 2
worker 1:   hello 3
worker 0:   hello 4
worker 1:   hello 5
worker 0:   hello 6
worker 1:   hello 7
worker 0:   hello 8
worker 1:   hello 9

Note that despite worker zero introducing the data (0..10), each element is routed to a specific worker, as we intended.

Execution

The hello.rs program above will by default use a single worker thread. To use multiple threads in a process, use the -w or --workers options followed by the number of threads you would like to use. (note: the simple.rs program always uses one worker thread; it uses timely::example which ignores user-supplied input).

To use multiple processes, you will need to use the -h or --hostfile option to specify a text file whose lines are hostname:port entries corresponding to the locations you plan on spawning the processes. You will need to use the -n or --processes argument to indicate how many processes you will spawn (a prefix of the host file), and each process must use the -p or --process argument to indicate their index out of this number.

Said differently, you want a hostfile that looks like so,

% cat hostfile.txt
host0:port
host1:port
host2:port
host3:port
...

and then to launch the processes like so:

host0% cargo run -- -w 2 -h hostfile.txt -n 4 -p 0
host1% cargo run -- -w 2 -h hostfile.txt -n 4 -p 1
host2% cargo run -- -w 2 -h hostfile.txt -n 4 -p 2
host3% cargo run -- -w 2 -h hostfile.txt -n 4 -p 3

The number of workers should be the same for each process.

The ecosystem

Timely dataflow is intended to support multiple levels of abstraction, from the lowest level manual dataflow assembly, to higher level "declarative" abstractions.

There are currently a few options for writing timely dataflow programs. Ideally this set will expand with time, as interested people write their own layers (or build on those of others).

  • Timely dataflow: Timely dataflow includes several primitive operators, including standard operators like map, filter, and concat. It also includes more exotic operators for tasks like entering and exiting loops (enter and leave), as well as generic operators whose implementations can be supplied using closures (unary and binary).

  • Differential dataflow: A higher-level language built on timely dataflow, differential dataflow includes operators like group, join, and iterate. Its implementation is fully incrementalized, and the details are pretty cool (if mysterious).

There are also a few applications built on timely dataflow, including a streaming worst-case optimal join implementation and a PageRank implementation, both of which should provide helpful examples of writing timely dataflow programs.

Contributing

If you are interested in working with or helping out with timely dataflow, great!

There are a few classes of work that are helpful for us, and may be interesting for you. There are a few broad categories, and then an ever-shifting pile of issues of various complexity.

  • If you would like to write programs using timely dataflow, this is very interesting for us. Ideally timely dataflow is meant to be an ergonomic approach to a non-trivial class of dataflow computations. As people use it and report back on their experiences, we learn about the classes of bugs they find, the ergonomic pain points, and other things we didn't even imagine ahead of time. Learning about timely dataflow, trying to use it, and reporting back is helpful!

  • If you like writing little example programs or documentation tests, there are many places throughout timely dataflow where the examples are relatively sparse, or do not actually test the demonstrated functionality. These can often be easy to pick up, flesh out, and push without a large up-front obligation. It is probably also a great way to get one of us to explain something in detail to you, if that is what you are looking for.

  • If you like the idea of getting your hands dirty in timely dataflow, the issue tracker has a variety of issues that touch on different levels of the stack. For example:

    • Timely currently does more copies of data than it must, in the interest of appeasing Rust's ownership discipline most directly. Several of these copies could be elided with some more care in the resource management (for example, using shared regions of one Vec<u8> in the way that the bytes crate does). Not everything is obvious here, so there is the chance for a bit of design work too.

    • We recently landed a bunch of logging changes, but there is still a list of nice to have features that haven't made it yet. If you are interested in teasing out how timely works in part by poking around at the infrastructure that records what it does, this could be a good fit! It has the added benefit that the logs are timely streams themselves, so you can even do some log processing on timely. Whoa...

    • There is an open issue on integrating Rust ownership idioms into timely dataflow. Right now, timely streams are of cloneable objects, and when a stream is re-used, items will be cloned. We could make that more explicit, and require calling a .cloned() method to get owned objects in the same way that iterators require it. At the same time, using a reference to a stream without taking ownership should get you the chance to look at the records that go past without taking ownership (and without requiring a clone, as is currently done). This is often plenty for exchange channels which may need to serialize the data and can't take much advantage of ownership anyhow.

    • There is a bunch of interesting work in scheduling timely dataflow operators, where when given the chance to schedule many operators, we might think for a moment and realize that several of them have no work to do and can be skipped. Better, we might maintain the list of operators with anything to do, and do nothing for those without work to do.

There are also some larger themes of work, whose solutions are not immediately obvious and each with the potential to sort out various performance issues:

Rate-controlling output

At the moment, the implementations of unary and binary operators allow their closures to send un-bounded amounts of output. This can cause unwelcome resource exhaustion, and poor performance generally if the runtime needs to allocate lots of new memory to buffer data sent in bulk without being given a chance to digest it. It is commonly the case that when large amounts of data are produced, they are eventually reduced given the opportunity.

With the current interfaces there is not much to be done. One possible change would be to have the input and notificator objects ask for a closure from an input message or timestamp, respectively, to an output iterator. This gives the system the chance to play the iterator at the speed they feel is appropriate. As many operators produce data-parallel output (based on independent keys), it may not be that much of a burden to construct such iterators.

Buffer management

The timely communication layer currently discards most buffers it moves through exchange channels, because it doesn't have a sane way of rate controlling the output, nor a sane way to determine how many buffers should be cached. If either of these problems were fixed, it would make sense to recycle the buffers to avoid random allocations, especially for small batches. These changes have something like a 10%-20% performance impact in the dataflow-join triangle computation workload.

Support for non-serializable types

The communication layer is based on a type Content<T> which can be backed by typed or binary data. Consequently, it requires that the type it supports be serializable, because it needs to have logic for the case that the data is binary, even if this case is not used. It seems like the Stream type should be extendable to be parametric in the type of storage used for the data, so that we can express the fact that some types are not serializable and that this is ok.

NOTE: Differential dataflow demonstrates how to do this at the user level in its operators/arrange.rs, if somewhat sketchily (with a wrapper that lies about the properties of the type it transports).

This would allow us to safely pass Rc types around, as long as we use the Pipeline parallelization contract.

Coarse- vs fine-grained timestamps

The progress tracking machinery involves some non-trivial overhead per timestamp. This means that using very fine-grained timestamps, for example the nanosecond at which a record is processed, can swamp the progress tracking logic. By contrast, the logging infrastructure demotes nanoseconds to data, part of the logged payload, and approximates batches of events with the smallest timestamp in the batch. This is less accurate from a progress tracking point of view, but more performant. It may be possible to generalize this so that users can write programs without thinking about granularity of timestamp, and the system automatically coarsens when possible (essentially boxcar-ing times).

NOTE: Differential dataflow demonstrates how to do this at the user level in its collection.rs. The lack of system support means that the user ends up indicating the granularity, which isn't horrible but could plausibly be improved. It may also be that leaving the user with control of the granularity leaves them with more control over the latency/throughput trade-off, which could be a good thing for the system to do.

GitHub

https://github.com/timelydataflow/timely-dataflow
Comments
  • 1. Timely and bigdata

    Hi,

    I have a few weeks ahead of me to try and implement some bigdata/rust toolbox, and I'm trying to assess how timely could help me.

    I am toying right now with https://amplab.cs.berkeley.edu/benchmark/ , have single-host implementation for Query 1 and Query 2 and starting to put some timely in the mix (focusing on query2, query1 is too boring).

    I must admit I am struggling a bit. I got it running on small datasets, but so far haven't been able to coerce it into doing things in the right order — not loading everything from disk before starting to reduce the data, in order not to overflow memory. Would you be interested in having a look and give me some insight as to what I should do ? The code is there, https://gitlab.zoy.org/kali/dx16 . The most interesting part would be https://gitlab.zoy.org/kali/dx16/blob/master/src/bin/query2_timely.rs.

    I was hoping to grab you on IRC, but haven't seen you there for a while.

    Thanks.

    Reviewed by kali at 2016-01-12 21:00
  • 2. Notifications out of order

    I was under the assumption that the order in which notifications are delivered to an operator is following the partial order defined on timestamps. At least this seemed to be the case in past versions of timely, since our sessionization code relies on this.

    However, the following code (which sometimes requests notification for future times during a notifications, like sessionization) observes notifications in a weird order:

    last: (Root, 0), curr: (Root, 0), frontier: [(Root, 4)]
    last: (Root, 0), curr: (Root, 1), frontier: [(Root, 4)]
    last: (Root, 1), curr: (Root, 2), frontier: [(Root, 4)]
    last: (Root, 2), curr: (Root, 3), frontier: [(Root, 4)]
    last: (Root, 3), curr: (Root, 2), frontier: [(Root, 4)]
    thread 'worker thread 0' panicked at 'assertion failed: last_notification.less_equal(curr.time())', src/bin/bug.rs:22:24
    

    Source code:

    extern crate timely; // 0.3.0
    
    use timely::PartialOrder;
    use timely::dataflow::operators::{Input, Unary, Probe};
    use timely::dataflow::channels::pact::Pipeline;
    use timely::progress::timestamp::RootTimestamp;
    
    fn main() {
        timely::execute_from_args(std::env::args(), move |computation| {
            let (mut input, probe) = computation.dataflow(move |scope| {
                let (input, stream) = scope.new_input::<()>();
    
                let mut last_notification = RootTimestamp::new(0);
                let probe = stream.unary_notify::<(), _, _>(Pipeline, "foo", Vec::new(), 
                    move |input, _, notificator| {
                        input.for_each(|time, _| {
                            notificator.notify_at(time);
                        });
                        notificator.for_each(|curr, _, notif| {
                            println!("last: {:?}, curr: {:?}, frontier: {:?}",
                                last_notification, curr.time(), notif.frontier(0));
                            assert!(last_notification.less_equal(curr.time()));
                            last_notification = curr.time().clone();
                            if *curr == RootTimestamp::new(0) {
                                notif.notify_at(curr.delayed(&RootTimestamp::new(2)));
                            }
                       });
                   }).probe();
                (input, probe)
            });
        
            for epoch in 0..5 {
                input.advance_to(epoch);
                input.send(());
            }
            
            computation.step_while(|| probe.less_than(input.time()))
        }).unwrap();
    }
    
    Reviewed by gandro at 2017-11-22 11:41
  • 3. Minimize copies

    I've been trying to do a bit of review of the copies that go on in the timely and timely communication path. I think several of them can be removed, but first I thought I would try and explain what each of them are.

    Let's go in order from a message received in timely communication, in BinaryReceiver::recv_loop().

    1. There is almost certainly a copy in

      let read = self.reader.read(&mut self.buffer[self.length..]).unwrap_or(0);
      

      where we collect whatever data the kernel has for us. In the absence of some zero-copy interface to the networking, I think this is probably going to stick around. Though, we may have to think a bit harder about where we copy into.

    2. Just a bit below, we have

      target.send(slice[..h_len].to_vec()).unwrap();
      

      This is where we peel out the bytes destined for a specific (worker, dataflow, channel) tuple and send the bytes along to that destination. Because this is a different thread with no lifetime relationship, we invoke .to_vec() to get an owned allocation.

    3. The byte allocation arrives at communication's binary::Puller, where it is not copied. This is a clever moment where we deserialize into the Message type, whose from_bytes method takes ownership of the Vec<u8> buffer and invokes Abomonation's decode method to get references.

    4. This Message gets handed to operator code, and if the operator only needs a &[Record] then no copy needs to happen. However, if the operator needs a &mut Vec<Record> then the DerefMut implementation will invoke a clone() on the &Vec<Record>, which will surely do some allocations. The byte buffer is dropped at this point.

    5. Operators can supply outputs either record-by-record, or as a ready-to-send batch of records. In either case, if they hit a data exchange channel they will need to be moved. This is essentially a copy, but it seems largely unavoidable if we want to put the records destined for remote workers into contiguous memory. This is where the "shuffle" actually needs to happen, and it seems legit.

    6. If serialization is required, then <Message as Serialize>::into_bytes() is invoked, and it will do an allocation of a Vec<u8> and a copy into it. The only way we know how to turn general Vec<Record> types into bytes is using Abomonation's encode, and this copies. In principle, we could "steal" the allocation of the vector itself, and only serialize subsequent owned data.

    7. The header (fixed sized struct) and bytes are sent to BinarySender::send_loop(), in which we write both to a W: Writer. This happens to be a BufWritter wrapped around a network stream, which mean a copy into the buffer, and probably a copy out of the buffer when it eventually gets around to writing at the network stream in bulk (the second of which is intrinsic in the TcpStream api).

    I think three of these are somewhat non-negotiable at the moment: i. the copy from kernel buffers when we read from the network stream (in 1.), ii. the copy as we do the data shuffle (in 5.), and the copy back into kernel buffers (in 7.).

    This leaves us with four potential copies that could be superfluous.

    1. This copy could be avoided using something like the bytes crate, where one hands out multiple references to a common allocation, and the API ensures that the references are to disjoint ranges.

      This could also be avoided by doing smaller reads into independently owned allocations; each read pulls down the next payload and the subsequent header, which tells us how much to read for the next allocation (and could tell us a size and alignment). This has the potential risk that if there are many small methods we do many small reads, possibly doing lots of kernel crossings. In that case, it seems like copies are an unavoidable cost of moving many messages using few kernel crossings.

    2. This wasn't actually a copy, but it has a number so we want to put it here.

    3. This copy is self-inflicted, in that one could write operator code that doesn't even need a mutable reference to the source data. It isn't always natural to do this, but if your code insists on owned data with owned allocations then this is non-negotiable, as we don't control the Rust codegen that needs to work correctly with the data it is handed.

      One candidate bit of cuteness is: if we are handed an owned Vec<u8>, in conflict with the optimization for (2.), we could arrange that the data are laid out so that the same allocation can be used as the spine of the Vec<Record>. This could still mean copying if these types have allocations behind them, and it is important that we got the Vec<u8> as a Vec<Record> because the deallocation logic is allowed to explode if we dealloc with a different size or alignment, but it could be possible for something like this to work.

    4. We decided that the shuffle move was non-optional, but I have to put it here to make markdown numbers work out.

    5. When we go from Vec<Record> to Vec<u8> we have relatively few options. We could grab the spine of the vector and only serialize auxiliary data (perhaps to the same allocation, if there is enough space). This would mean no copies here for data without further owned memory, and in the absence of any further information we would have no choice I think (if each Record contains a bunch of String fields and such).

      One alternative is something like the CapnProto builder patterns, where instead of allocating a Rust object for output you directly serialize the result at a byte buffer. This is possible, though I don't know how ergonomic it ends up being (that is, you could write the code by hand, but you probably wouldn't want to).

    6. One of these copies seems unescapable (the kernel buffer copy), but the BufWriter copy seems optional. It does some very good things for us, in terms of minimizing kernel crossings. This could be somewhat avoided if each worker produced a consolidated Vec<u8> to send to each remote process, rather than separate allocations for each channel, and each remote worker. This seems possible, though again awkward. The shuffle that happens means to colocate data for each worker, and we don't know how large each of these will be before sending them. We could commit to certain spacing (e.g. 1024 records for each worker) and start writing each worker at an offset of a common buffer for each process, with some inefficiency if there is skew of any sort. In any case, operator code currently produces output records one at a time, and we need to do something with each of these records.

    One meta-point, which I don't really know that I understand, is that we may be able to absolve ourselves of copies that do not leave low level caches. Each copy that remains within the L1 cache could be viewed as pretty cheap, relative to all the other operations going on. So, deserialization code for small buffers might be relatively cheap, as compared to copying each frame into a new allocation (2.). I'm slightly making this up, but understanding "zero copy" and which costs it is really avoiding seems like a good thing to do along the way.

    Reviewed by frankmcsherry at 2017-12-22 00:16
  • 4. Capture with `k` workers, replay with `j`, `k ≠ j`

    .capture()-d streams may be sent over sockets to a separate computation.

    It's currently unclear how to replay those streams with a different number of workers compared to the source computation.

    For example, replaying a 1-worker stream in a j-worker stream, might look like this - but will not work.

                    let one_stream: Stream<_, Thing> = if index == 0 {
                        one_stream.lock().unwrap().take().unwrap().replay_into(scope)
                    } else {
                        vec![].to_stream(scope)
                    };
    

    The broken progress behaviour seems due to .to_stream() expecting to exist on every worker, https://github.com/frankmcsherry/timely-dataflow/blob/eae63eb2063a6e1d3da70c781c3e879a5ccd7d93/src/dataflow/operators/generic/operator.rs#L554

    We may just need a source that does not advertise initial capabilities? I'm not sure this checks out with the progress tracking math.

    Reviewed by utaal at 2017-08-15 14:13
  • 5. Exchange, input, tee: Only allocate buffers as needed

    Instead of pre-allocating relatively large buffers for each pusher, only allocate small buffers and grow them as needed. This can lead to more allocations, but by doubling the buffer size we hope to amortize some of the cost.

    Signed-off-by: Moritz Hoffmann [email protected]

    Reviewed by antiguru at 2021-07-12 11:59
  • 6. Further `step_or_park` implementations [WIP]

    This PR is a work in progress of await_events methods for communicators which enable the step_or_park functionality. The first commit adds preliminary support for the intra-process allocator (not the serializing one). It has not been tested beyond the standard test suite (which has a few multi-worker tests, but not many).

    Reviewed by frankmcsherry at 2019-04-26 20:20
  • 7. Reviewing motivation sections in the book

    Finally, the following two statements on "When to use Timely Dataflow" section helped me a lot to understand things. Thank you!

    Timely dataflow is a dataflow system, and this means that at its core it likes to move data around.

    Dataflow systems are also fundamentally about breaking apart the execution of your program into independently operating parts.

    Reviewed by shamrin at 2017-08-17 07:24
  • 8. add step_or_park

    This PR introduces the step_or_park(Duration) method to a worker.

    The design is that the communication fabric allocator now has a method

    await_events(&self, duration: Duration)
    

    which is allowed to park the thread for at most duration, or until self.events() has some events to report (these are events that prompt the scheduling of operators). The implementation of this is a no-op by default (not parking the thread), but the single-threaded communicator has an implementation that parks itself if its event queue is empty. The plan is to add these in for other communicators so that eventually all of them behave well.

    The worker now has a method

    step_or_park(&mut self, duration: Duration) -> bool
    

    which .. drains events from the communicator, and just before it is about to schedule its activations, checks to see if it has any and if not calls in to await_events(duration).

    I haven't really thought through the concurrency design here. It is possible that there is a race condition somewhere, but we can take a look at it and see whether it does the right thing in principle.

    cc @comnik @ryzhyk @benesch

    Reviewed by frankmcsherry at 2019-04-04 19:55
  • 9. Doc issues, or maybe I'm dumb

    I am finally picking up DD/TD because I want to play with Monoids. New to both Rust and DD.

    Going through the mdbook, but at HEAD instead of version 0.7, and am able to explain many differences between my output and the docs.

    Things I can't explain, so far:

    1. Step 1, Write a Program - we put the code in src/main.rs but the suggested command is cargo run --example hello. I get (at both HEAD and v0.7) that there is no such example. Am I missing something or is this just a bug? (The same issue happens for all the other issues in cells I've seen).

    2. Increase the Scale - Don't we need to change the code? The page seems to just have two run commands (that fail for me because of --example hello), with no code changes. I did understand that we probably add --release to get it to go faster.

      Is there some magic linkage to https://github.com/TimelyDataflow/timely-dataflow/blob/master/examples/hello.rs that I've messed up in my setup? How does it know which version of things to run?

    I'm guessing these are just omissions/bugs, but I am open to being completely wrong given that I am new to a lot here.

    Reviewed by dhalperi at 2019-03-13 05:11
  • 10. Performance degradation observed in timely 0.6.0

    Hi, Frank!

    Recently I've built a timely-server in a way similar to bin/server.rs, which set up a dataflow in advance and an outside client feeds inputs to worker 0 and results are routed back to worker 0 and then back to outside client. Your comments helped a lot and I'm very appreciative of that!

    But after upgrading to timely-0.6.0, about 30% performance degradation is observed, with a few lines of changes in my code for API compatibility.

    My timely-server can be simplified as the following:

    extern crate timely;
    
    use std::sync::mpsc::{TryRecvError, channel};
    use std::time::Instant;
    
    use timely::dataflow::InputHandle;
    use timely::dataflow::operators::{Input, Exchange, Map};
    use timely::dataflow::operators::Unary;
    use timely::dataflow::channels::pact;
    use timely::dataflow::operators::generic::OutputHandle;
    use timely::dataflow::channels::pushers::Tee;
    use std::sync::Mutex;
    use std::sync::Arc;
    
    fn main() {
        let args = std::env::args().collect::<Vec<_>>();
        println!("Starts with arguments: {:?}", args);
        let input_num = args[1].parse::<u64>().unwrap_or(1);
    
        let (input_send, input_recv) = channel();
        let (output_send, output_recv) = channel();
    
        let input_recv = Arc::new(Mutex::new(Some(input_recv)));
        let output_send = Arc::new(Mutex::new(Some(output_send)));
    
        let worker_guards = timely::execute_from_args(std::env::args(), move |worker| {
            let index = worker.index();
            let peers = worker.peers();
            let input_recv = if index == 0 { input_recv.lock().unwrap().take() } else { None };
            let output_send = if index == 0 { output_send.lock().unwrap().take() } else { None };
    
            let mut input = InputHandle::new();
            let mut timestamp = 0_u64;
            let route = move |vid: &u64| *vid % peers as u64;
    
            // create a dataflow.
            worker.dataflow(|scope| {
                scope.input_from(&mut input)
                    .exchange(route)
                    .flat_map(move |vid| {
                        (1..10).map(move |i| vid * 1000 + i)
                    })
                    .exchange(route)
                    .flat_map(move |vid| {
                        (1..10).map(move |i| vid * 1000 + i)
                    })
                    .unary_notify(
                        pact::Exchange::new(|_| 0),
                        "ResultSink",
                        vec![],
                        move |input, _output: &mut OutputHandle<_, _, Tee<_, u64>>, notificator| {
                            input.for_each(|time, data| {
                                data.take();
                                notificator.notify_at(time.retain());    // for timely 0.6.0
                                //notificator.notify_at(time);             // for timely 0.5.1
                            });
    
                            notificator.for_each(|cap, _, _| {
                                if index == 0 {
                                    output_send.as_ref().unwrap().send(cap.time().inner).unwrap();
                                }
                            });
                        },
                    );
            });
    
            if index == 0 {
                loop {
                    match input_recv.as_ref().unwrap().try_recv() {
                        Result::Ok(data) => {
                            input.send(data);
                            timestamp += 1;
                            input.advance_to(timestamp);
                        }
                        Result::Err(TryRecvError::Empty) => {}
                        Result::Err(TryRecvError::Disconnected) => { break; }
                    }
                    worker.step();
                }
            }
        }).unwrap();
    
        let start = Instant::now();
        for i in 0..input_num {
            input_send.send(i).unwrap();
            output_recv.recv().unwrap();
        }
        let elapsed = start.elapsed();
    
        drop(input_send);
        worker_guards.join();
    
        let elapsed_nanos = elapsed.as_secs() * 1_000_000_000 + elapsed.subsec_nanos() as u64;
        let qps = input_num as f64 * 1_000_000_000.0 / elapsed_nanos as f64;
        let elapsed_ms = elapsed_nanos as f64 / 1_000_000.0;
        let latency_ms = elapsed_ms / input_num as f64;
        println!("Time elapsed (ms): {:.2}, latency(ms): {:.2}, qps: {:.2}", elapsed_ms, latency_ms, qps);
    }
    

    I run 0.5.1 and 0.6.0 compiled binaries with 16 workers in a single process on a machile which has:

    • CPU: 32 Core / Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz
    • Memory: 128 GB
    • OS: Red Hat 7.2
    • Rust: rustc 1.27.0 (3eda71b00 2018-06-19)

    The expiriment was repeated several times and the result seemed stable. An typicla result like:

    [[email protected] /home/xiafei.qiuxf/learn-rust]
    $./test_6  30000 -w 16
    Starts with arguments: ["./test_6", "30000", "-w", "16"]
    Time elapsed (ms): 2342.53, latency(ms): 0.08, qps: 12806.64
    
    [[email protected] /home/xiafei.qiuxf/learn-rust]
    $./test_5  30000 -w 16
    Starts with arguments: ["./test_5", "30000", "-w", "16"]
    Time elapsed (ms): 1469.59, latency(ms): 0.05, qps: 20413.86
    

    The throughput degrates a lot.

    To reproduce this problem, I tried pushing all input data into InputHandle of worker 0 to mesure end- to end time without waiting for previous query to stop before issue a new one. But there seemed no difference between 0.5.1 and 0.6.0. The code was like:

    timely::execute_from_args(std::env::args(), move |worker| {
        
        // ...
        worker.dataflow(|scope| { /* ......  */});
    
        if index == 0 {
            for round in 0..input_num {
                input.send(round as u64);
                input.advance_to(round + 1);
            }
        }
    }).unwrap().join();
    
    Reviewed by qiuxiafei at 2018-07-11 04:14
  • 11. Provide stream functions to handle Result streams

    Add a ResultStream trait providing functions modeled after the native Result type, if their stream counterpart is meaningful. This includes functions to extract and map Ok or Err variants, as well as unwrapping, and chaining results with and_then.

    Signed-off-by: Moritz Hoffmann [email protected]

    Reviewed by antiguru at 2020-12-29 11:01
  • 12. Activator: Bound memory utilization by compaction and de-duplication

    The Activator object allows one to force the scheduling of Timely operators even in the absence of progress changes. Care must be taken to avoid scheduling the operators too often, as all scheduled activations are stored in memory until the scheduler picks them up.

    To avoid this, we propose the following changes when activating:

    • Record the activation as before. Once the size of the internal data structures exceeds a threshold, say 2x the last compacted size, organize the activations. This essentially sorts activations by the operator's path.
    • Also, de-duplicate the data by collapsing activations of the same operator into one.

    In Materialize, we often have a pattern where we activate an upstream operator once a downstream dataflow operator is dropped, or a source has new data and Timely needs to schedule the source operator. To avoid the issue of sending too many activations, we use a pattern where the operator only gets activated once, and once it is running, it'll need to acknowledge the activation, which enables future activations. This pattern works well, but comes with additional complexity for a developer writing Timely operators. The above solution would eliminate the extra burden, at the cost of some (amortized) overhead. For specific operators, the activate-acknowledge pattern might still be useful.

    Reviewed by antiguru at 2022-06-22 12:17
  • 13. frontier: tighter integration with rust traits

    The frontier module implements a common Rust pattern where there is one type representing a mutable version of something and a separate type representing its immutable counterpart. Examples in std are String and str, PathBuf and Path, OsString and OsStr, and other.

    In all the examples above the mutable version holds onto a heap allocated buffer and the immutable version is just a #[repr(transparent)] wrapper over an raw unsized slice of the data type.

    This patch changes AntichainRef<T> to simply be a transparent wrapper over [T]. This change enables Deref implementations targeting the immutable antichain from both MutableAntichain<T> and Antichain<T>.

    This has a bunch of ergonomic improvements for users (due to auto-deref) and also allows having a single implementation of all methods that only need immutable access to the antichain.

    Specifically, the following methods have been deduplicated and are now implemented in only once place:

    • AntichainRef::less_than
    • AntichainRef::less_equal
    • AntichainRef::dominates
    • AntichainRef::elements
    • <AntichainRef as PartialEq>::eq
    • <AntichainRef as PartialOrd>::partial_cmp
    • <AntichainRef as Hash>::hash

    Finally, this change also enables replacing the inherent Antichain::borrow and Antichain::to_owned methods by implementing the std::borrow::Borrow and std::borrow::ToOwned traits respectively.

    Signed-off-by: Petros Angelatos [email protected]

    Reviewed by petrosagg at 2022-06-18 09:39
  • 14. store one element antichains inline

    Antichains are made to work with partially ordered times but that forces them to allocate a heap vector even in simple cases of totally ordered timestamp types which will never have more than one element in it.

    Short of defining a Frontier trait and abstracting generically over multiple independent frontier implementation this simple PR handles it at runtime by backing Antichains with SmallVec.

    Reviewed by petrosagg at 2022-06-08 13:52
  • 15. Specialize the container type for antichains

    Antichains are currently unconditionally heap-allocated. For totally ordered elements, an antichain will container zero or one elements, but not more. This would allow us to avoid the heap allocation and instead store the element inline.

    To enable this, we add an associated type to Timestamp that defines a container storing an antichain of the type. The container implements the functionality to let the current Antichain type function without change.

    The obvious downside of this change is that we now include a type bound T: Timestamp on the Antichain type, because the elements are based on T's associated type, which infects a bunch of other places.

    Signed-off-by: Moritz Hoffmann [email protected]

    Reviewed by antiguru at 2022-03-31 12:54
An implementation of Ngo et al's GenericJoin in timely dataflow.

dataflow-join A streaming implementation of Ngo et al's GenericJoin in timely dataflow. Ngo et al presented a very cool join algorithm, some details o

Apr 21, 2022
Dataflow system for building self-driving car and robotics applications.

ERDOS ERDOS is a platform for developing self-driving cars and robotics applications. Getting started The easiest way to get ERDOS running is to use o

Jun 12, 2022
Comprehensive DSP graph and synthesis library for developing a modular synthesizer in Rust, such as HexoSynth.

HexoDSP - Comprehensive DSP graph and synthesis library for developing a modular synthesizer in Rust, such as HexoSynth. This project contains the com

Jun 16, 2022
Ray Tracing: The Next Week implementation in Rust
Ray Tracing: The Next Week implementation in Rust

rttnw Ray Tracing: The Next Week implementation in Rust How to run Install Rust: Link here. Run project git clone https://github.com/luliic2/rttnw cd

Apr 26, 2022
Rust implementation of µKanren, a featherweight relational programming language.

µKanren-rs This is a Rust implementation of µKanren, a featherweight relational programming language. See the original Scheme implementation here for

May 28, 2022
An implementation of Olm and Megolm in pure Rust.

A Rust implementation of Olm and Megolm vodozemac is a Rust implementation of libolm, a cryptographic library used for end-to-end encryption in Matrix

Jun 10, 2022
Pure Rust Implementation of secp256k1.

SECP256K1 implementation in pure Rust Cargo Documentation SECP256K1 implementation with no_std support. Currently we have implementation for: Convert

Jun 15, 2022
A Rust implementation of generic prefix tree (trie) map with wildcard capture support

prefix_tree_map A Rust implementation of generic prefix tree (trie) map with wildcard capture support. Design Trie is a good data structure for storin

Apr 22, 2022
fast rust implementation of online nonnegative matrix factorization as laid out in the paper "detect and track latent factors with online nonnegative matrix factorization"

ONMF status: early work in progress. still figuring this out. code still somewhat messy. api still in flux. fast rust implementation of online nonnega

Apr 10, 2020
An alternative broken buggy Nix implementation in Rust + Java (for evaluation)

An alternative broken buggy Nix implementation in Rust + Java (for evaluation)

Feb 12, 2022
A fast lean and clean modern constraint programming solver implementation (in rust)

MaxiCP-rs This project aims at implementing a fast, and clean constraint programming solver with a focus on correctness, simplicity, maintainability a

Jun 8, 2022
Rust implementation of Waku v2 (f.k.a. Whisper)

waku-rs Waku is a p2p messaging protocol tailored for the web3, with origins in Ethereum's Whisper. This Rust implementation is taking reference from

Jun 15, 2022
Experimental Quantum Computer Simulator + Quantum Chess Implementation
Experimental Quantum Computer Simulator + Quantum Chess Implementation

Quantum Chess A somewhat hacky implementation of this paper (made in a week over a holiday). It's not heavily tested and probably has some bugs still

Jan 21, 2022
ESP32 implementation of RustZX Spectrum emulator for ESP32-USB-OTG

RustZX for ESP32 - experimental version Goal of the project: Run ZX Spectrum on ESP32 HW: ESP32 OTG USB with ST7789 display References Rust code for E

Jun 26, 2022
An implementation of a predicative polymorphic language with bidirectional type inference and algebraic data types

Vinilla Lang Vanilla is a pure functional programming language based on System F, a classic but powerful type system. Merits Simple as it is, Vanilla

May 17, 2022
A radix tree implementation for router, and provides CRUD operations.

radixtree A radix tree implementation for router, and provides CRUD operations. Radixtree is part of treemux, on top of which updates and removes are

Apr 11, 2022
A flexible, stateless implementation of the bisection method

Flexibility is achieved by giving the user of this crate control over the input and output types

May 13, 2022
Leetcode Solutions in Rust, Advent of Code Solutions in Rust and more

RUST GYM Rust Solutions Leetcode Solutions in Rust AdventOfCode Solutions in Rust This project demostrates how to create Data Structures and to implem

Jun 25, 2022
Simple autoclicker written in Rust, to learn the Rust language.

RClicker is an autoclicker written in Rust, written to learn more about the Rust programming language. RClicker was was written by me to learn more ab

Mar 1, 2022