A modular implementation of timely dataflow in Rust

Overview

Timely Dataflow

Timely dataflow is a low-latency cyclic dataflow computational model, introduced in the paper Naiad: a timely dataflow system. This project is an extended and more modular implementation of timely dataflow in Rust.

This project is something akin to a distributed data-parallel compute engine, which scales the same program up from a single thread on your laptop to distributed execution across a cluster of computers. The main goals are expressive power and high performance. It is probably strictly more expressive and faster than whatever you are currently using, assuming you aren't yet using timely dataflow.

Be sure to read the documentation for timely dataflow. It is a work in progress, but mostly improving. There is more long-form text in mdbook format with examples tested against the current builds. There is also a series of blog posts (part 1, part 2, part 3) introducing timely dataflow in a different way, though be warned that the examples there may need tweaks to build against the current code.

An example

To use timely dataflow, add the following to the dependencies section of your project's Cargo.toml file:

[dependencies]
timely="*"

This will bring in the timely crate from crates.io, which should allow you to start writing timely dataflow programs like this one (also available in timely/examples/simple.rs):

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x));
    });
}

You can run this example from the root directory of the timely-dataflow repository by typing

% cargo run --example simple
Running `target/debug/examples/simple`
seen: 0
seen: 1
seen: 2
seen: 3
seen: 4
seen: 5
seen: 6
seen: 7
seen: 8
seen: 9

This is a very simple example (it's in the name), which only just suggests at how you might write dataflow programs.

Doing more things

For a more involved example, consider the very similar (but more explicit) examples/hello.rs, which creates and drives the dataflow separately:

extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // create a new input, exchange data, and inspect its output
        worker.dataflow(|scope| {
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
                 .probe_with(&mut probe);
        });

        // introduce data and watch!
        for round in 0..10 {
            if index == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}

This example does a fair bit more, to show off more of what timely can do for you.

We first build a dataflow graph creating an input stream (with input_from), whose output we exchange to drive records between workers (using the data itself to indicate which worker to route to). We inspect the data and print the worker index to indicate which worker received which data, and then probe the result so that each worker can see when all of a given round of data has been processed.

We then drive the computation by repeatedly introducing rounds of data, where the round itself is used as the data. In each round, each worker introduces the same data, and then repeatedly takes dataflow steps until the probe reveals that all workers have processed all work for that epoch, at which point the computation proceeds.

With two workers, the output looks like

% cargo run --example hello -- -w2
Running `target/debug/examples/hello -w2`
worker 0:   hello 0
worker 1:   hello 1
worker 0:   hello 2
worker 1:   hello 3
worker 0:   hello 4
worker 1:   hello 5
worker 0:   hello 6
worker 1:   hello 7
worker 0:   hello 8
worker 1:   hello 9

Note that despite worker zero introducing the data (0..10), each element is routed to a specific worker, as we intended.

Execution

The hello.rs program above will by default use a single worker thread. To use multiple threads in a process, use the -w or --workers options followed by the number of threads you would like to use. (note: the simple.rs program always uses one worker thread; it uses timely::example which ignores user-supplied input).

To use multiple processes, you will need to use the -h or --hostfile option to specify a text file whose lines are hostname:port entries corresponding to the locations you plan on spawning the processes. You will need to use the -n or --processes argument to indicate how many processes you will spawn (a prefix of the host file), and each process must use the -p or --process argument to indicate their index out of this number.

Said differently, you want a hostfile that looks like so,

% cat hostfile.txt
host0:port
host1:port
host2:port
host3:port
...

and then to launch the processes like so:

host0% cargo run -- -w 2 -h hostfile.txt -n 4 -p 0
host1% cargo run -- -w 2 -h hostfile.txt -n 4 -p 1
host2% cargo run -- -w 2 -h hostfile.txt -n 4 -p 2
host3% cargo run -- -w 2 -h hostfile.txt -n 4 -p 3

The number of workers should be the same for each process.

The ecosystem

Timely dataflow is intended to support multiple levels of abstraction, from the lowest level manual dataflow assembly, to higher level "declarative" abstractions.

There are currently a few options for writing timely dataflow programs. Ideally this set will expand with time, as interested people write their own layers (or build on those of others).

  • Timely dataflow: Timely dataflow includes several primitive operators, including standard operators like map, filter, and concat. It also includes more exotic operators for tasks like entering and exiting loops (enter and leave), as well as generic operators whose implementations can be supplied using closures (unary and binary).

  • Differential dataflow: A higher-level language built on timely dataflow, differential dataflow includes operators like group, join, and iterate. Its implementation is fully incrementalized, and the details are pretty cool (if mysterious).

There are also a few applications built on timely dataflow, including a streaming worst-case optimal join implementation and a PageRank implementation, both of which should provide helpful examples of writing timely dataflow programs.

Contributing

If you are interested in working with or helping out with timely dataflow, great!

There are a few classes of work that are helpful for us, and may be interesting for you. There are a few broad categories, and then an ever-shifting pile of issues of various complexity.

  • If you would like to write programs using timely dataflow, this is very interesting for us. Ideally timely dataflow is meant to be an ergonomic approach to a non-trivial class of dataflow computations. As people use it and report back on their experiences, we learn about the classes of bugs they find, the ergonomic pain points, and other things we didn't even imagine ahead of time. Learning about timely dataflow, trying to use it, and reporting back is helpful!

  • If you like writing little example programs or documentation tests, there are many places throughout timely dataflow where the examples are relatively sparse, or do not actually test the demonstrated functionality. These can often be easy to pick up, flesh out, and push without a large up-front obligation. It is probably also a great way to get one of us to explain something in detail to you, if that is what you are looking for.

  • If you like the idea of getting your hands dirty in timely dataflow, the issue tracker has a variety of issues that touch on different levels of the stack. For example:

    • Timely currently does more copies of data than it must, in the interest of appeasing Rust's ownership discipline most directly. Several of these copies could be elided with some more care in the resource management (for example, using shared regions of one Vec<u8> in the way that the bytes crate does). Not everything is obvious here, so there is the chance for a bit of design work too.

    • We recently landed a bunch of logging changes, but there is still a list of nice to have features that haven't made it yet. If you are interested in teasing out how timely works in part by poking around at the infrastructure that records what it does, this could be a good fit! It has the added benefit that the logs are timely streams themselves, so you can even do some log processing on timely. Whoa...

    • There is an open issue on integrating Rust ownership idioms into timely dataflow. Right now, timely streams are of cloneable objects, and when a stream is re-used, items will be cloned. We could make that more explicit, and require calling a .cloned() method to get owned objects in the same way that iterators require it. At the same time, using a reference to a stream without taking ownership should get you the chance to look at the records that go past without taking ownership (and without requiring a clone, as is currently done). This is often plenty for exchange channels which may need to serialize the data and can't take much advantage of ownership anyhow.

    • There is a bunch of interesting work in scheduling timely dataflow operators, where when given the chance to schedule many operators, we might think for a moment and realize that several of them have no work to do and can be skipped. Better, we might maintain the list of operators with anything to do, and do nothing for those without work to do.

There are also some larger themes of work, whose solutions are not immediately obvious and each with the potential to sort out various performance issues:

Rate-controlling output

At the moment, the implementations of unary and binary operators allow their closures to send un-bounded amounts of output. This can cause unwelcome resource exhaustion, and poor performance generally if the runtime needs to allocate lots of new memory to buffer data sent in bulk without being given a chance to digest it. It is commonly the case that when large amounts of data are produced, they are eventually reduced given the opportunity.

With the current interfaces there is not much to be done. One possible change would be to have the input and notificator objects ask for a closure from an input message or timestamp, respectively, to an output iterator. This gives the system the chance to play the iterator at the speed they feel is appropriate. As many operators produce data-parallel output (based on independent keys), it may not be that much of a burden to construct such iterators.

Buffer management

The timely communication layer currently discards most buffers it moves through exchange channels, because it doesn't have a sane way of rate controlling the output, nor a sane way to determine how many buffers should be cached. If either of these problems were fixed, it would make sense to recycle the buffers to avoid random allocations, especially for small batches. These changes have something like a 10%-20% performance impact in the dataflow-join triangle computation workload.

Support for non-serializable types

The communication layer is based on a type Content<T> which can be backed by typed or binary data. Consequently, it requires that the type it supports be serializable, because it needs to have logic for the case that the data is binary, even if this case is not used. It seems like the Stream type should be extendable to be parametric in the type of storage used for the data, so that we can express the fact that some types are not serializable and that this is ok.

NOTE: Differential dataflow demonstrates how to do this at the user level in its operators/arrange.rs, if somewhat sketchily (with a wrapper that lies about the properties of the type it transports).

This would allow us to safely pass Rc types around, as long as we use the Pipeline parallelization contract.

Coarse- vs fine-grained timestamps

The progress tracking machinery involves some non-trivial overhead per timestamp. This means that using very fine-grained timestamps, for example the nanosecond at which a record is processed, can swamp the progress tracking logic. By contrast, the logging infrastructure demotes nanoseconds to data, part of the logged payload, and approximates batches of events with the smallest timestamp in the batch. This is less accurate from a progress tracking point of view, but more performant. It may be possible to generalize this so that users can write programs without thinking about granularity of timestamp, and the system automatically coarsens when possible (essentially boxcar-ing times).

NOTE: Differential dataflow demonstrates how to do this at the user level in its collection.rs. The lack of system support means that the user ends up indicating the granularity, which isn't horrible but could plausibly be improved. It may also be that leaving the user with control of the granularity leaves them with more control over the latency/throughput trade-off, which could be a good thing for the system to do.

Comments
  • Timely and bigdata

    Timely and bigdata

    Hi,

    I have a few weeks ahead of me to try and implement some bigdata/rust toolbox, and I'm trying to assess how timely could help me.

    I am toying right now with https://amplab.cs.berkeley.edu/benchmark/ , have single-host implementation for Query 1 and Query 2 and starting to put some timely in the mix (focusing on query2, query1 is too boring).

    I must admit I am struggling a bit. I got it running on small datasets, but so far haven't been able to coerce it into doing things in the right order — not loading everything from disk before starting to reduce the data, in order not to overflow memory. Would you be interested in having a look and give me some insight as to what I should do ? The code is there, https://gitlab.zoy.org/kali/dx16 . The most interesting part would be https://gitlab.zoy.org/kali/dx16/blob/master/src/bin/query2_timely.rs.

    I was hoping to grab you on IRC, but haven't seen you there for a while.

    Thanks.

    opened by kali 19
  • Notifications out of order

    Notifications out of order

    I was under the assumption that the order in which notifications are delivered to an operator is following the partial order defined on timestamps. At least this seemed to be the case in past versions of timely, since our sessionization code relies on this.

    However, the following code (which sometimes requests notification for future times during a notifications, like sessionization) observes notifications in a weird order:

    last: (Root, 0), curr: (Root, 0), frontier: [(Root, 4)]
    last: (Root, 0), curr: (Root, 1), frontier: [(Root, 4)]
    last: (Root, 1), curr: (Root, 2), frontier: [(Root, 4)]
    last: (Root, 2), curr: (Root, 3), frontier: [(Root, 4)]
    last: (Root, 3), curr: (Root, 2), frontier: [(Root, 4)]
    thread 'worker thread 0' panicked at 'assertion failed: last_notification.less_equal(curr.time())', src/bin/bug.rs:22:24
    

    Source code:

    extern crate timely; // 0.3.0
    
    use timely::PartialOrder;
    use timely::dataflow::operators::{Input, Unary, Probe};
    use timely::dataflow::channels::pact::Pipeline;
    use timely::progress::timestamp::RootTimestamp;
    
    fn main() {
        timely::execute_from_args(std::env::args(), move |computation| {
            let (mut input, probe) = computation.dataflow(move |scope| {
                let (input, stream) = scope.new_input::<()>();
    
                let mut last_notification = RootTimestamp::new(0);
                let probe = stream.unary_notify::<(), _, _>(Pipeline, "foo", Vec::new(), 
                    move |input, _, notificator| {
                        input.for_each(|time, _| {
                            notificator.notify_at(time);
                        });
                        notificator.for_each(|curr, _, notif| {
                            println!("last: {:?}, curr: {:?}, frontier: {:?}",
                                last_notification, curr.time(), notif.frontier(0));
                            assert!(last_notification.less_equal(curr.time()));
                            last_notification = curr.time().clone();
                            if *curr == RootTimestamp::new(0) {
                                notif.notify_at(curr.delayed(&RootTimestamp::new(2)));
                            }
                       });
                   }).probe();
                (input, probe)
            });
        
            for epoch in 0..5 {
                input.advance_to(epoch);
                input.send(());
            }
            
            computation.step_while(|| probe.less_than(input.time()))
        }).unwrap();
    }
    
    opened by gandro 18
  • Minimize copies

    Minimize copies

    I've been trying to do a bit of review of the copies that go on in the timely and timely communication path. I think several of them can be removed, but first I thought I would try and explain what each of them are.

    Let's go in order from a message received in timely communication, in BinaryReceiver::recv_loop().

    1. There is almost certainly a copy in

      let read = self.reader.read(&mut self.buffer[self.length..]).unwrap_or(0);
      

      where we collect whatever data the kernel has for us. In the absence of some zero-copy interface to the networking, I think this is probably going to stick around. Though, we may have to think a bit harder about where we copy into.

    2. Just a bit below, we have

      target.send(slice[..h_len].to_vec()).unwrap();
      

      This is where we peel out the bytes destined for a specific (worker, dataflow, channel) tuple and send the bytes along to that destination. Because this is a different thread with no lifetime relationship, we invoke .to_vec() to get an owned allocation.

    3. The byte allocation arrives at communication's binary::Puller, where it is not copied. This is a clever moment where we deserialize into the Message type, whose from_bytes method takes ownership of the Vec<u8> buffer and invokes Abomonation's decode method to get references.

    4. This Message gets handed to operator code, and if the operator only needs a &[Record] then no copy needs to happen. However, if the operator needs a &mut Vec<Record> then the DerefMut implementation will invoke a clone() on the &Vec<Record>, which will surely do some allocations. The byte buffer is dropped at this point.

    5. Operators can supply outputs either record-by-record, or as a ready-to-send batch of records. In either case, if they hit a data exchange channel they will need to be moved. This is essentially a copy, but it seems largely unavoidable if we want to put the records destined for remote workers into contiguous memory. This is where the "shuffle" actually needs to happen, and it seems legit.

    6. If serialization is required, then <Message as Serialize>::into_bytes() is invoked, and it will do an allocation of a Vec<u8> and a copy into it. The only way we know how to turn general Vec<Record> types into bytes is using Abomonation's encode, and this copies. In principle, we could "steal" the allocation of the vector itself, and only serialize subsequent owned data.

    7. The header (fixed sized struct) and bytes are sent to BinarySender::send_loop(), in which we write both to a W: Writer. This happens to be a BufWritter wrapped around a network stream, which mean a copy into the buffer, and probably a copy out of the buffer when it eventually gets around to writing at the network stream in bulk (the second of which is intrinsic in the TcpStream api).

    I think three of these are somewhat non-negotiable at the moment: i. the copy from kernel buffers when we read from the network stream (in 1.), ii. the copy as we do the data shuffle (in 5.), and the copy back into kernel buffers (in 7.).

    This leaves us with four potential copies that could be superfluous.

    1. This copy could be avoided using something like the bytes crate, where one hands out multiple references to a common allocation, and the API ensures that the references are to disjoint ranges.

      This could also be avoided by doing smaller reads into independently owned allocations; each read pulls down the next payload and the subsequent header, which tells us how much to read for the next allocation (and could tell us a size and alignment). This has the potential risk that if there are many small methods we do many small reads, possibly doing lots of kernel crossings. In that case, it seems like copies are an unavoidable cost of moving many messages using few kernel crossings.

    2. This wasn't actually a copy, but it has a number so we want to put it here.

    3. This copy is self-inflicted, in that one could write operator code that doesn't even need a mutable reference to the source data. It isn't always natural to do this, but if your code insists on owned data with owned allocations then this is non-negotiable, as we don't control the Rust codegen that needs to work correctly with the data it is handed.

      One candidate bit of cuteness is: if we are handed an owned Vec<u8>, in conflict with the optimization for (2.), we could arrange that the data are laid out so that the same allocation can be used as the spine of the Vec<Record>. This could still mean copying if these types have allocations behind them, and it is important that we got the Vec<u8> as a Vec<Record> because the deallocation logic is allowed to explode if we dealloc with a different size or alignment, but it could be possible for something like this to work.

    4. We decided that the shuffle move was non-optional, but I have to put it here to make markdown numbers work out.

    5. When we go from Vec<Record> to Vec<u8> we have relatively few options. We could grab the spine of the vector and only serialize auxiliary data (perhaps to the same allocation, if there is enough space). This would mean no copies here for data without further owned memory, and in the absence of any further information we would have no choice I think (if each Record contains a bunch of String fields and such).

      One alternative is something like the CapnProto builder patterns, where instead of allocating a Rust object for output you directly serialize the result at a byte buffer. This is possible, though I don't know how ergonomic it ends up being (that is, you could write the code by hand, but you probably wouldn't want to).

    6. One of these copies seems unescapable (the kernel buffer copy), but the BufWriter copy seems optional. It does some very good things for us, in terms of minimizing kernel crossings. This could be somewhat avoided if each worker produced a consolidated Vec<u8> to send to each remote process, rather than separate allocations for each channel, and each remote worker. This seems possible, though again awkward. The shuffle that happens means to colocate data for each worker, and we don't know how large each of these will be before sending them. We could commit to certain spacing (e.g. 1024 records for each worker) and start writing each worker at an offset of a common buffer for each process, with some inefficiency if there is skew of any sort. In any case, operator code currently produces output records one at a time, and we need to do something with each of these records.

    One meta-point, which I don't really know that I understand, is that we may be able to absolve ourselves of copies that do not leave low level caches. Each copy that remains within the L1 cache could be viewed as pretty cheap, relative to all the other operations going on. So, deserialization code for small buffers might be relatively cheap, as compared to copying each frame into a new allocation (2.). I'm slightly making this up, but understanding "zero copy" and which costs it is really avoiding seems like a good thing to do along the way.

    opened by frankmcsherry 16
  • Capture with `k` workers, replay with `j`, `k ≠ j`

    Capture with `k` workers, replay with `j`, `k ≠ j`

    .capture()-d streams may be sent over sockets to a separate computation.

    It's currently unclear how to replay those streams with a different number of workers compared to the source computation.

    For example, replaying a 1-worker stream in a j-worker stream, might look like this - but will not work.

                    let one_stream: Stream<_, Thing> = if index == 0 {
                        one_stream.lock().unwrap().take().unwrap().replay_into(scope)
                    } else {
                        vec![].to_stream(scope)
                    };
    

    The broken progress behaviour seems due to .to_stream() expecting to exist on every worker, https://github.com/frankmcsherry/timely-dataflow/blob/eae63eb2063a6e1d3da70c781c3e879a5ccd7d93/src/dataflow/operators/generic/operator.rs#L554

    We may just need a source that does not advertise initial capabilities? I'm not sure this checks out with the progress tracking math.

    opened by utaal 14
  • Exchange, input, tee: Only allocate buffers as needed

    Exchange, input, tee: Only allocate buffers as needed

    Instead of pre-allocating relatively large buffers for each pusher, only allocate small buffers and grow them as needed. This can lead to more allocations, but by doubling the buffer size we hope to amortize some of the cost.

    Signed-off-by: Moritz Hoffmann [email protected]

    opened by antiguru 13
  • Failover problem in cluster mode

    Failover problem in cluster mode

    Hi, @frankmcsherry !

    I've already setup timely as a long-running service to serve graph queries, each process holding a portion of graph data. But when some processes get down, workers and I/O thread in other processes seem simply ignore this event and keep running. To address this problem, I have a simple workaround which shuts down all other processes by force and relaunches them again. This works but introduces extensive data loading work, including disk I/O, network traffic, etc. It could be graceful if normal processes can be notified about the dead process and let their workers and I/O thread exit and gives the control back to user code. The user code then make a choice to exit too or wait for the dead process to be relaunched somewhere else. This may looks like:

    loop {
        let guards = Timely::execute(/*...*/);
        let results = guards.join();
        if let Err(OtherProcessDown) = results {
            // wait for relaunched
        }
    }
    
    opened by qiuxiafei 13
  • Further `step_or_park` implementations [WIP]

    Further `step_or_park` implementations [WIP]

    This PR is a work in progress of await_events methods for communicators which enable the step_or_park functionality. The first commit adds preliminary support for the intra-process allocator (not the serializing one). It has not been tested beyond the standard test suite (which has a few multi-worker tests, but not many).

    opened by frankmcsherry 12
  • Reviewing motivation sections in the book

    Reviewing motivation sections in the book

    Finally, the following two statements on "When to use Timely Dataflow" section helped me a lot to understand things. Thank you!

    Timely dataflow is a dataflow system, and this means that at its core it likes to move data around.

    Dataflow systems are also fundamentally about breaking apart the execution of your program into independently operating parts.

    opened by shamrin 12
  • add step_or_park

    add step_or_park

    This PR introduces the step_or_park(Duration) method to a worker.

    The design is that the communication fabric allocator now has a method

    await_events(&self, duration: Duration)
    

    which is allowed to park the thread for at most duration, or until self.events() has some events to report (these are events that prompt the scheduling of operators). The implementation of this is a no-op by default (not parking the thread), but the single-threaded communicator has an implementation that parks itself if its event queue is empty. The plan is to add these in for other communicators so that eventually all of them behave well.

    The worker now has a method

    step_or_park(&mut self, duration: Duration) -> bool
    

    which .. drains events from the communicator, and just before it is about to schedule its activations, checks to see if it has any and if not calls in to await_events(duration).

    I haven't really thought through the concurrency design here. It is possible that there is a race condition somewhere, but we can take a look at it and see whether it does the right thing in principle.

    cc @comnik @ryzhyk @benesch

    opened by frankmcsherry 11
  • Doc issues, or maybe I'm dumb

    Doc issues, or maybe I'm dumb

    I am finally picking up DD/TD because I want to play with Monoids. New to both Rust and DD.

    Going through the mdbook, but at HEAD instead of version 0.7, and am able to explain many differences between my output and the docs.

    Things I can't explain, so far:

    1. Step 1, Write a Program - we put the code in src/main.rs but the suggested command is cargo run --example hello. I get (at both HEAD and v0.7) that there is no such example. Am I missing something or is this just a bug? (The same issue happens for all the other issues in cells I've seen).

    2. Increase the Scale - Don't we need to change the code? The page seems to just have two run commands (that fail for me because of --example hello), with no code changes. I did understand that we probably add --release to get it to go faster.

      Is there some magic linkage to https://github.com/TimelyDataflow/timely-dataflow/blob/master/examples/hello.rs that I've messed up in my setup? How does it know which version of things to run?

    I'm guessing these are just omissions/bugs, but I am open to being completely wrong given that I am new to a lot here.

    opened by dhalperi 10
  • Performance degradation observed in timely 0.6.0

    Performance degradation observed in timely 0.6.0

    Hi, Frank!

    Recently I've built a timely-server in a way similar to bin/server.rs, which set up a dataflow in advance and an outside client feeds inputs to worker 0 and results are routed back to worker 0 and then back to outside client. Your comments helped a lot and I'm very appreciative of that!

    But after upgrading to timely-0.6.0, about 30% performance degradation is observed, with a few lines of changes in my code for API compatibility.

    My timely-server can be simplified as the following:

    extern crate timely;
    
    use std::sync::mpsc::{TryRecvError, channel};
    use std::time::Instant;
    
    use timely::dataflow::InputHandle;
    use timely::dataflow::operators::{Input, Exchange, Map};
    use timely::dataflow::operators::Unary;
    use timely::dataflow::channels::pact;
    use timely::dataflow::operators::generic::OutputHandle;
    use timely::dataflow::channels::pushers::Tee;
    use std::sync::Mutex;
    use std::sync::Arc;
    
    fn main() {
        let args = std::env::args().collect::<Vec<_>>();
        println!("Starts with arguments: {:?}", args);
        let input_num = args[1].parse::<u64>().unwrap_or(1);
    
        let (input_send, input_recv) = channel();
        let (output_send, output_recv) = channel();
    
        let input_recv = Arc::new(Mutex::new(Some(input_recv)));
        let output_send = Arc::new(Mutex::new(Some(output_send)));
    
        let worker_guards = timely::execute_from_args(std::env::args(), move |worker| {
            let index = worker.index();
            let peers = worker.peers();
            let input_recv = if index == 0 { input_recv.lock().unwrap().take() } else { None };
            let output_send = if index == 0 { output_send.lock().unwrap().take() } else { None };
    
            let mut input = InputHandle::new();
            let mut timestamp = 0_u64;
            let route = move |vid: &u64| *vid % peers as u64;
    
            // create a dataflow.
            worker.dataflow(|scope| {
                scope.input_from(&mut input)
                    .exchange(route)
                    .flat_map(move |vid| {
                        (1..10).map(move |i| vid * 1000 + i)
                    })
                    .exchange(route)
                    .flat_map(move |vid| {
                        (1..10).map(move |i| vid * 1000 + i)
                    })
                    .unary_notify(
                        pact::Exchange::new(|_| 0),
                        "ResultSink",
                        vec![],
                        move |input, _output: &mut OutputHandle<_, _, Tee<_, u64>>, notificator| {
                            input.for_each(|time, data| {
                                data.take();
                                notificator.notify_at(time.retain());    // for timely 0.6.0
                                //notificator.notify_at(time);             // for timely 0.5.1
                            });
    
                            notificator.for_each(|cap, _, _| {
                                if index == 0 {
                                    output_send.as_ref().unwrap().send(cap.time().inner).unwrap();
                                }
                            });
                        },
                    );
            });
    
            if index == 0 {
                loop {
                    match input_recv.as_ref().unwrap().try_recv() {
                        Result::Ok(data) => {
                            input.send(data);
                            timestamp += 1;
                            input.advance_to(timestamp);
                        }
                        Result::Err(TryRecvError::Empty) => {}
                        Result::Err(TryRecvError::Disconnected) => { break; }
                    }
                    worker.step();
                }
            }
        }).unwrap();
    
        let start = Instant::now();
        for i in 0..input_num {
            input_send.send(i).unwrap();
            output_recv.recv().unwrap();
        }
        let elapsed = start.elapsed();
    
        drop(input_send);
        worker_guards.join();
    
        let elapsed_nanos = elapsed.as_secs() * 1_000_000_000 + elapsed.subsec_nanos() as u64;
        let qps = input_num as f64 * 1_000_000_000.0 / elapsed_nanos as f64;
        let elapsed_ms = elapsed_nanos as f64 / 1_000_000.0;
        let latency_ms = elapsed_ms / input_num as f64;
        println!("Time elapsed (ms): {:.2}, latency(ms): {:.2}, qps: {:.2}", elapsed_ms, latency_ms, qps);
    }
    

    I run 0.5.1 and 0.6.0 compiled binaries with 16 workers in a single process on a machile which has:

    • CPU: 32 Core / Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz
    • Memory: 128 GB
    • OS: Red Hat 7.2
    • Rust: rustc 1.27.0 (3eda71b00 2018-06-19)

    The expiriment was repeated several times and the result seemed stable. An typicla result like:

    [[email protected] /home/xiafei.qiuxf/learn-rust]
    $./test_6  30000 -w 16
    Starts with arguments: ["./test_6", "30000", "-w", "16"]
    Time elapsed (ms): 2342.53, latency(ms): 0.08, qps: 12806.64
    
    [[email protected] /home/xiafei.qiuxf/learn-rust]
    $./test_5  30000 -w 16
    Starts with arguments: ["./test_5", "30000", "-w", "16"]
    Time elapsed (ms): 1469.59, latency(ms): 0.05, qps: 20413.86
    

    The throughput degrates a lot.

    To reproduce this problem, I tried pushing all input data into InputHandle of worker 0 to mesure end- to end time without waiting for previous query to stop before issue a new one. But there seemed no difference between 0.5.1 and 0.6.0. The code was like:

    timely::execute_from_args(std::env::args(), move |worker| {
        
        // ...
        worker.dataflow(|scope| { /* ......  */});
    
        if index == 0 {
            for round in 0..input_num {
                input.send(round as u64);
                input.advance_to(round + 1);
            }
        }
    }).unwrap().join();
    
    opened by qiuxiafei 10
  • timely: unconstrained lifetime for `CapabilityRef`

    timely: unconstrained lifetime for `CapabilityRef`

    Since the merge of https://github.com/TimelyDataflow/timely-dataflow/pull/429, CapabilityRefs have been made safe to hold onto across operator invocations because that PR made sure that they only decremented their progress counts on Drop. While this allowed async/await based operators to freely hold on to them, it was still very difficult for synchronous based operators to do the same thing, due to the lifetime attached to the CapabilityRef.

    We can observe that the lifetime no longer provides any benefits, which means it can be removed and turn CapabilityRefs into fully owned values. This allows any style of operator to easily hold on to them. The benefit of that isn't just performance (by avoiding the retain() dance), but also about deferring the decision of the output port a given input should flow to to a later time.

    After making this change, the name CapabilityRef felt wrong, since there is no reference to anything anymore. Instead, the main distinction between CapabilityRefs and Capabilities are that the former is associated with an input port and the latter is associated with an output port.

    As such, I have renamed CapabilityRef to InputCapability to signal to users that holding onto one of them represents holding onto a timestamp at the input for which we have not yet determined the output port that it should flow to. This nicely ties up the semantics of the InputCapability::retain_for_output and InputCapability::delayed_for_output methods, which make it clear by their name and signature that this is what "transfers" the capability from input ports to output ports.

    opened by petrosagg 0
  • dataflow: Account for probe handles being dropped

    dataflow: Account for probe handles being dropped

    The lifetime of a probe handle is not attached to any particular dataflow and users are free to connect them to streams of multiple dataflows.

    This PR handles the situation in which a probe handle is connected to two separate dataflows and one of them drops. Previously the handle would have its frontier stuck because the old handle wouldn't provide any updates anymore.

    The implemented solution is that each handle keeps track of its own changes and removes them from the overall calculation on drop.

    opened by petrosagg 0
  • Activate operators that may want to shut down

    Activate operators that may want to shut down

    Operator shutdown was previously pretty loose, and only in response to operator activation. However, the conditions for shutdown can change without prompting an activation if e.g. a frontier becomes empty or a final capability is dropped. This meant that operators that should be shut down would instead linger until the dataflow itself is shut down.

    This PR adds that test as progress information is pushed to operators, in order to better clean up operators mid-dataflow.

    NB: Failing to shut down an operator should not have resulted in non-termination, unless operators were relying on dropping their state to signal something of consequence outward. All progress information would still be correct, and all downstream operators would receive correct frontiers.

    opened by frankmcsherry 4
  • Use `Antichain` for `MutableAntichain::frontier`.

    Use `Antichain` for `MutableAntichain::frontier`.

    This PR converts MutableAntichain::frontier from a Vec<T> to an Antichain<T>. In addition to a bit more code reuse, this allows MutableAntichain to expose a &Antichain<T> type rather than an AntichainRef<T>, which can be annoying to relate back to antichains (e.g. to join with, say). One can always go from an &Antichain<T> to an AntichainRef<T> with the borrow() method, but not the other way around.

    The downside to scrutinize is the moment where we update frontier rebuilding the antichain. To my eyes, we do almost the same linear work per element, except that we additionally consider evicting elements that we know we wont have to evict. I'm ok losing that performance for the type clarity, or perhaps adding some from_ordered_iter to Antichain.

    cc: @jkosh44

    opened by frankmcsherry 0
Releases(v0.12.0)
  • v0.12.0(Mar 10, 2021)

  • v0.10.0(Jul 9, 2019)

  • v0.9.0(Mar 31, 2019)

  • v0.8.0(Dec 3, 2018)

    This release made several breaking modifications to the types associated with scopes, and in particular the generic parameters for the Child<'a, G: ScopeParent, T: Timestamp> type. Where previously the T parameter would be the new coordinate to add to G's timestamp, it is now the new timestamp including G's timestamp as well. This was done to support a broader class of timestamps to be used, beyond always requiring product combinations with new timestamps.

    Beneficial fallouts include our ability to remove RootTimestamp, as dataflows can now be timestamped by usize or other primitive timestamps. Yay!

    Added

    The communication crate now has a bincode feature flag which should swing serialization over to use serde's Serialize trait. While it seems to work the ergonomics are likely in flux, as the choice is crate-wide and doesn't allow you to pick and choose a la carte.

    Timestamps may now implement a new Refines trait which allows one to describe one timestamp as a refinement of another. This is mainly used to describe which timestamps may be used for subscopes of an outer scope. The trait describes how to move between the timestamps (informally: "adding a zero" and "removing the inner coordinate") and how to summarize path summaries for the refining timestamp as those of the refined timestamp.

    Changed

    Many logging events have been rationalized. Operators and Channels should all have a worker-unique identifier that can be used to connect their metadata with events involving them. Previously this was a bit of a shambles.

    The Scope method scoped now allows new scopes with non-Product timestamps. Instead, the new timestamp must implement Refines<_> of the parent timestamp. This is the case for Product timestamps, but each timestamp also refines itself (allowing logical regions w/o changing the timestamp), and other timestamp combinators (e.g. Lexicographic) can be used.

    Root dataflow timestamps no longer need to be Product<RootTimestamp,_>. Instead, the _ can be used as the timestamp.

    The loop_variable operator now takes a timestamp summary for the timestamp of its scope, not just the timestamp extending its parent scope. The old behavior can be recovered with Product::new(Default::default(), summary), but the change allows cycles in more general scopes and seemed worth it. The operator also no longer takes a limit, and if you need to impose a limit other than the summary returning None you should use the branch_when operator.

    Removed

    The RootTimestamp and RootSummary types have been excised. Where you previously used Product<RootTimestamp,T> you can now use Product<(),T>, or even better just T. The requirement of a worker's dataflow() method is that the timestamp type implement Refines<()>, which .. ideally would be true for all timestamps but we can't have a blanket implementation until specialization lands (I believe).

    Several race conditions were "removed" from the communication library. These mostly involved rapid construction of dataflows (data received before a channel was constructed would be dropped) and clean shutdown (a timely computation could drop and fail to ack clean shutdown messages).

    Source code(tar.gz)
    Source code(zip)
  • v0.7.0(Sep 16, 2018)

    New changes largely related to the communication subsystem and logging. Consult CHANGELOG.md for more specific information on current and upcoming changes.

    Source code(tar.gz)
    Source code(zip)
  • v0.6.0(Jun 29, 2018)

    Main changes: Capabilities are now provided to operators as CapabilityRef which contains a lifetime, and which should not be stored without calling .retain(). They are probably hard to store without calling .retain(), and there are probably error messages to this effect.

    Source code(tar.gz)
    Source code(zip)
  • v0.3.0(Sep 10, 2017)

    This release snapshots the state of affairs at the point that we needed to do a release and bump the major version number. It should correspond to the bits on crates.io for version 0.3.0.

    Source code(tar.gz)
    Source code(zip)
Owner
Timely Dataflow
Low latency, high throughput, dataflow computation
Timely Dataflow
An implementation of Ngo et al's GenericJoin in timely dataflow.

dataflow-join A streaming implementation of Ngo et al's GenericJoin in timely dataflow. Ngo et al presented a very cool join algorithm, some details o

Frank McSherry 76 Sep 24, 2022
Dataflow system for building self-driving car and robotics applications.

ERDOS ERDOS is a platform for developing self-driving cars and robotics applications. Getting started The easiest way to get ERDOS running is to use o

ERDOS 163 Dec 29, 2022
Comprehensive DSP graph and synthesis library for developing a modular synthesizer in Rust, such as HexoSynth.

HexoDSP - Comprehensive DSP graph and synthesis library for developing a modular synthesizer in Rust, such as HexoSynth. This project contains the com

Weird Constructor 45 Dec 17, 2022
Manas project aims to create a modular framework and ecosystem to create robust storage servers adhering to Solid protocol in rust.

मनस् | Manas Solid is a web native protocol to enable interoperable, read-write, collaborative, and decentralized web, truer to web's original vision.

Manomayam 17 Oct 5, 2023
Rust implementation of Andrej Karpathy's micrograd for purposes of learning both ML and Rust.

micrograd_rs Rust implementation of Andrej Karpathy's micrograd for purposes of learning both ML and Rust. Main takeaways Basically the same takeaways

null 3 Oct 28, 2022
Ray Tracing: The Next Week implementation in Rust

rttnw Ray Tracing: The Next Week implementation in Rust How to run Install Rust: Link here. Run project git clone https://github.com/luliic2/rttnw cd

null 20 Apr 26, 2022
Rust implementation of µKanren, a featherweight relational programming language.

µKanren-rs This is a Rust implementation of µKanren, a featherweight relational programming language. See the original Scheme implementation here for

Eric Zhang 99 Dec 8, 2022
An implementation of Olm and Megolm in pure Rust.

A Rust implementation of Olm and Megolm vodozemac is a Rust implementation of libolm, a cryptographic library used for end-to-end encryption in Matrix

matrix.org 66 Dec 26, 2022
Pure Rust Implementation of secp256k1.

SECP256K1 implementation in pure Rust Cargo Documentation SECP256K1 implementation with no_std support. Currently we have implementation for: Convert

Parity Technologies 141 Dec 21, 2022
A Rust implementation of generic prefix tree (trie) map with wildcard capture support

prefix_tree_map A Rust implementation of generic prefix tree (trie) map with wildcard capture support. Design Trie is a good data structure for storin

EAimTY 3 Dec 6, 2022
fast rust implementation of online nonnegative matrix factorization as laid out in the paper "detect and track latent factors with online nonnegative matrix factorization"

ONMF status: early work in progress. still figuring this out. code still somewhat messy. api still in flux. fast rust implementation of online nonnega

null 2 Apr 10, 2020
An alternative broken buggy Nix implementation in Rust + Java (for evaluation)

An alternative broken buggy Nix implementation in Rust + Java (for evaluation)

Moritz Hedtke 1 Feb 12, 2022
A fast lean and clean modern constraint programming solver implementation (in rust)

MaxiCP-rs This project aims at implementing a fast, and clean constraint programming solver with a focus on correctness, simplicity, maintainability a

Xavier Gillard 5 Dec 10, 2022
Rust implementation of Waku v2 (f.k.a. Whisper)

waku-rs Waku is a p2p messaging protocol tailored for the web3, with origins in Ethereum's Whisper. This Rust implementation is taking reference from

bernardo 12 Jul 11, 2022
The second Rust implementation on GitHub of third-party REST API client for Bilibili.

Bilibili REST API The second Rust implementation on GitHub of third-party REST API client for Bilibili. Designed to be lightweight and efficient. It's

null 4 Aug 25, 2022
Rust-only ext4 implementation without unsafe code.

Rust-Ext4 Rust-only ext4 implementation without unsafe code. Supporting features no_std Direct/Indirect Block Addressing (RO) Extent Tree Addressing (

null 7 Dec 5, 2022
Rust implementation of DVB-GSE

dvb-gse dvg-se is a Rust implementation of the DVB GSE (Generic Stream Encapsulation) protocol and related protocols. It is mainly intended to be used

Daniel Estévez 4 Dec 15, 2022
Rust implementation of Surging Object DiGraph (SODG)

This Rust library implements a Surging Object DiGraph (SODG) for reo virtual machine for EO programs. Here is how you can create a di-graph: use sodg:

Objectionary 8 Jan 14, 2023
Tvix - A Rust implementation of Nix

Tvix Tvix is a new implementation of the Nix language and package manager. See the announcement post for information about the background of this proj

The Virus Lounge 49 Feb 17, 2023