An implementation of Ngo et al's GenericJoin in timely dataflow.

Overview

dataflow-join

A streaming implementation of Ngo et al's GenericJoin in timely dataflow.


Ngo et al presented a very cool join algorithm, some details of which are described in a blog post. This project (a collaboration with Khaled Ammar and Semih Salihoglu) extends Ngo et al's algorithm to the case where the underlying relations change, allowing us to track the results of complex join queries as they change.

A series of posts describe the ideas behind this implementation.

An example: graph motifs

For an example, the examples/motif.rs program takes the description of a directed graph motif (to be explained) and a list of graph edges, and reports the change in the numbers of these motifs as we stream the edges in. To look for directed triangles of the form (a,b), (a,c), (b,c), using the livejournal graph edges in a random order (any text file where each line has the form src dst), loading the first 68 million edges, and then swinging over the remaining entries in batches of 1,000, we would type:

cargo run --release --example motif -- 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000 inspect

The motif is expressed clumsily at the moment, but has the form num_edges [src dst]^num_edges where you indicate how many edges (3 for triangles) and then repeatedly indicate edges in the motif. There are bugs here, including symmetric edges ((a,b) and (b,a)) not being handled correctly, among others I probably don't know about yet.

If you run the command above you should see something like:

Echidnatron% cargo run --release --example motif -- 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000 inspect
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
     Running `target/release/examples/motif 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000 inspect`
motif:	[(0, 1), (0, 2), (1, 2)]
filename:	"./soc-LiveJournal1.random.txt"
index initialized: (4295196, 68000000)
index initialized: (4474513, 68000000)
Duration { secs: 36, nanos: 995859508 }	[worker 0]	data loaded
Duration { secs: 36, nanos: 995924311 }	[worker 0]	indices merged
(Root, 2): [37695]
(Root, 3): [36745]
(Root, 4): [42000]
...

which reports the input reading, index building (forward and reverse), and then starts reporting the number of changes to the directed motif in each batch of 1,000 edges. In this example all changes are edge additions, so all the motif changes are all additions as well.

As the program continues to run, it should result in something like

...
(Root, 993): [37462]
(Root, 994): [44505]
(Root, 995): [30189]
elapsed: 67.91147424298106	total motifs at this process: 39772836
Echidnatron%

This reports the total number of motif changes observed at this process and an elapsed running time.

Parallelism

Like other timely dataflow programs, we can increase the number of workers to parallelize computation (and printing of things to the screen):

Echidnatron% cargo run --release --example motif -- 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000 inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
     Running `target/release/examples/motif 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000 inspect -w2`
motif:	[(0, 1), (0, 2), (1, 2)]
filename:	"./soc-LiveJournal1.random.txt"
motif:	[(0, 1), (0, 2), (1, 2)]
filename:	"./soc-LiveJournal1.random.txt"
index initialized: (2147644, 34060151)
index initialized: (2147552, 33939849)
index initialized: (2237328, 34057137)
index initialized: (2237185, 33942863)
Duration { secs: 22, nanos: 149237598 }	[worker 1]	data loaded
Duration { secs: 22, nanos: 149245453 }	[worker 0]	data loaded
Duration { secs: 22, nanos: 149295018 }	[worker 0]	indices merged
Duration { secs: 22, nanos: 149296396 }	[worker 1]	indices merged
(Root, 2): [15863]
(Root, 2): [21832]
(Root, 3): [16715]
(Root, 3): [20030]
...
(Root, 994): [22018]
(Root, 994): [22487]
(Root, 995): [16231]
(Root, 995): [13958]
elapsed: 40.51301470899489	total motifs at this process: 39772836
Echidnatron%

Each worker reads the input file, separates lines, but only parses lines whose index equals their worker id modulo the number of workers. For now this means that each machine needs access to a copy of the file, which should be improved.

Reporting

The inspect argument attaches a dataflow fragment to count each of the changes, and by omitting it performance improves somewhat (by shuttling around much less data).

Echidnatron% cargo run --release --example motif -- 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000 -w2
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
     Running `target/release/examples/motif 3 0 1 0 2 1 2 ./soc-LiveJournal1.random.txt 68000000 1000 -w2`
motif:	[(0, 1), (0, 2), (1, 2)]
motif:	[(0, 1), (0, 2), (1, 2)]
filename:	"./soc-LiveJournal1.random.txt"
filename:	"./soc-LiveJournal1.random.txt"
index initialized: (2147552, 33939849)
index initialized: (2147644, 34060151)
index initialized: (2237185, 33942863)
index initialized: (2237328, 34057137)
Duration { secs: 21, nanos: 904351750 }	[worker 0]	data loaded
Duration { secs: 21, nanos: 904355535 }	[worker 1]	data loaded
Duration { secs: 21, nanos: 904398564 }	[worker 1]	indices merged
Duration { secs: 21, nanos: 904412222 }	[worker 0]	indices merged
elapsed: 35.086652864003554	total motifs at this process: 0
Echidnatron%

The total reported is zero, because we detached the counting infrastructure.

Other example motifs

The infrastructure decouples loading graph data from observing changes in motif counts, allowing us to observe the change in motif counts without paying the cost of determining the original count, allowing us to track relatively complex motifs whose computation would otherwise be rather painful.

Here are some examples:

Directed 3-cycles

These are also triangles, but ones that form a cycle, rather than pointing from a to b to c.

Echidnatron% cargo run --release --example motif -- 3 0 1 1 2 2 0 ./soc-LiveJournal1.random.txt 68000000 1000 inspect
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
     Running `target/release/examples/motif 3 0 1 1 2 2 0 ./soc-LiveJournal1.random.txt 68000000 1000 inspect`
motif:	[(0, 1), (1, 2), (2, 0)]
filename:	"./soc-LiveJournal1.random.txt"
...
(Root, 994): [35917]
(Root, 995): [24601]
elapsed: 62.80372881703079	total motifs at this process: 31171871
Echidnatron%

Directed 4-cliques

Generalizing directed triangles to 4-cliques, where we have four nodes a, b, c, and d that each point to those nodes after them, and pre-loading a bit more data so we don't have to wait as long for the result,

Echidnatron% cargo run --release --example motif -- 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000 inspect
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
     Running `target/release/examples/motif 6 0 1 0 2 0 3 1 2 1 3 2 3 ./soc-LiveJournal1.random.txt 68900000 1000 inspect`
motif:	[(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
filename:	"./soc-LiveJournal1.random.txt"
index initialized: (4307203, 68900000)
index initialized: (4487812, 68900000)
Duration { secs: 38, nanos: 751447929 }	[worker 0]	data loaded
Duration { secs: 38, nanos: 751585622 }	[worker 0]	indices merged
(Root, 2): [6239159]
(Root, 3): [5386443]
...
(Root, 95): [3938957]
elapsed: 233.18197546800366	total motifs at this process: 550759817
Echidnatron%

This took longer, but there were also a larger number of the motifs to process.

Directed 6-cliques

You can imagine where this is going, but I thought I'd do this example because it is something that seems is much too complicated to compute completely and then difference. Here we stream edge updates in one at a time (a batch size of one).

Echidnatron% cargo run --release --example motif -- 15 0 1 0 2 0 3 0 4 0 5 1 2 1 3 1 4 1 5 2 3 2 4 2 5 3 4 3 5 4 5 soc-LiveJournal1.random.txt 68900000 1 inspect
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
     Running `target/release/examples/motif 15 0 1 0 2 0 3 0 4 0 5 1 2 1 3 1 4 1 5 2 3 2 4 2 5 3 4 3 5 4 5 ./soc-LiveJournal1.random.txt 68900000 1 inspect`
motif:	[(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (1, 2), (1, 3), (1, 4), (1, 5), (2, 3), (2, 4), (2, 5), (3, 4), (3, 5), (4, 5)]
filename:	"./soc-LiveJournal1.random.txt"
index initialized: (4307203, 68900000)
index initialized: (4487812, 68900000)
Duration { secs: 38, nanos: 759079572 }	[worker 0]	data loaded
Duration { secs: 38, nanos: 760087081 }	[worker 0]	indices merged
(Root, 2): [473]
(Root, 3): [94]
(Root, 5): [8975]
(Root, 6): [101569]
(Root, 7): [5]
(Root, 8): [1554]
(Root, 9): [128]
(Root, 10): [33446]
(Root, 11): [5339]
Echidnatron%

This program goes to 11.

But just 11. What happened? As it turns out, the 12th update causes so many changed 6-cliques (or candidates along the way) that the process wanders up to 60GB on my laptop and then shuts itself down.

For comparison, here are the output numbers for 5-cliques, where the fatal 12th update produces 48,807,868 updates. Each of these updates are produced along the way in the 6-clique dataflow (which just extends the 5-clique dataflow), and each apparently leads to some large number of further candidates.

Echidnatron% cargo run --release --example motif -- 10 0 1 0 2 0 3 0 4 1 2 1 3 1 4 2 3 2 4 3 4 ./soc-LiveJournal1.random.txt 68900000 1 inspect
    Finished release [optimized + debuginfo] target(s) in 0.0 secs
     Running `target/release/examples/motif 10 0 1 0 2 0 3 0 4 1 2 1 3 1 4 2 3 2 4 3 4 ./soc-LiveJournal1.random.txt 68900000 1 inspect`
motif:	[(0, 1), (0, 2), (0, 3), (0, 4), (1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (3, 4)]
filename:	"./soc-LiveJournal1.random.txt"
index initialized: (4307203, 68900000)
index initialized: (4487812, 68900000)
Duration { secs: 36, nanos: 387479783 }	[worker 0]	data loaded
Duration { secs: 36, nanos: 387815357 }	[worker 0]	indices merged
(Root, 2): [131]
(Root, 3): [53]
(Root, 5): [1570]
(Root, 6): [11081]
(Root, 7): [4]
(Root, 8): [574]
(Root, 9): [67]
(Root, 10): [6197]
(Root, 11): [952]
(Root, 12): [48807868]
(Root, 13): [3612]
(Root, 14): [470]
...

The problem is that, even though we are streaming in single updates, we try and do all of the count, propose, and intersect work for these 48 million tuples at the same time. What we should be doing, in a better world, is stream through the 48 million bits of intermediate work as well. We should stage them so that we don't try and do all of the work at once, but rather retire chunks of updates at a time, keeping our resource use in check.

This is an exciting open area for us, where the answer lies in recent scheduling work with Andrea Lattuada and Zaheer Chothia that prioritizes operators further down the dataflow graph, aggressively draining the dataflow rather than producing more work. In principle we should be able to try this out and see what happens!

You might also like...
An implementation of a predicative polymorphic language with bidirectional type inference and algebraic data types

Vinilla Lang Vanilla is a pure functional programming language based on System F, a classic but powerful type system. Merits Simple as it is, Vanilla

A radix tree implementation for router, and provides CRUD operations.

radixtree A radix tree implementation for router, and provides CRUD operations. Radixtree is part of treemux, on top of which updates and removes are

A flexible, stateless implementation of the bisection method

Flexibility is achieved by giving the user of this crate control over the input and output types

An alternative broken buggy Nix implementation in Rust + Java (for evaluation)

An alternative broken buggy Nix implementation in Rust + Java (for evaluation)

A fast lean and clean modern constraint programming solver implementation (in rust)

MaxiCP-rs This project aims at implementing a fast, and clean constraint programming solver with a focus on correctness, simplicity, maintainability a

Rust implementation of Waku v2 (f.k.a. Whisper)

waku-rs Waku is a p2p messaging protocol tailored for the web3, with origins in Ethereum's Whisper. This Rust implementation is taking reference from

A asynchronous implementation of the invidious innertube aka youtubei API wrapper

A asynchronous implementation of the invidious innertube aka youtubei API wrapper. Using tokio,reqwest, serde and serde_json

The second Rust implementation on GitHub of third-party REST API client for Bilibili.

Bilibili REST API The second Rust implementation on GitHub of third-party REST API client for Bilibili. Designed to be lightweight and efficient. It's

Proc macro implementation of #[naked]

#[naked] Documentation This crate provide a proc macro version of the #[naked] attribute which can be used on stable Rust. Example // The SYSV64 calli

Comments
  • Number of motifs found by examples/motif.rs may be underestimated

    Number of motifs found by examples/motif.rs may be underestimated

    I'm trying out a few examples after reading your blog post on tracking motifs, and unless I'm doing something wrong, it looks like the number of motifs found by examples/motif.rs may be underestimated.

    Using the same livejournal dataset from the blog, and with a "single directed edge" motif, I get results that are a lot smaller than the expected (which I believe should match the number of edges in the graph, or at least be somewhat close to it):

    $ wc -l /data/soc-LiveJournal1.txt
     68993777 /data/soc-LiveJournal1.txt
    $ cargo run --release --example motif -- 1 0 1 /data/soc-LiveJournal1.txt 68000000 1000 inspect
    (...)
    elapsed: Duration { secs: 22, nanos: 760898251 }	total motifs at this process: 993777
    
    $ wc -l /data/soc-LiveJournal1-1000.txt
        1000 /data/soc-LiveJournal1-1000.txt
    $ cargo run --release --example motif -- 1 0 1 /data/soc-LiveJournal1-1000.txt 1000 1000 inspect
    (...)
    elapsed: Duration { secs: 0, nanos: 1657901 }	total motifs at this process: 4
    

    In the first case, for the livejournal graph with close to 69 million edges, the example found less than 1 million "single directed edge" motifs. For a chunk of the same graph which contains only the first 1k edges, it found only 4 instances of that same motif.

    The reason why I started looking at these simple "single directed edge" motifs was because I was trying to debug a feed-forward-loop and other slightly more complex motifs. In particular, I have a few synthetic graphs, which I know contain the motifs I'm looking for, but that example/motif.rs fails to find. Please advise, thanks! --Joana

    opened by jmftrindade 5
  • update the dependent version of timely-dataflow.

    update the dependent version of timely-dataflow.

    Some operations are not supported in the newest timely-dataflow and can lead to compile error, such as input_graph.advance_to(prev_time.inner + 1)(examples/motif.rs:107) and timely::progress::timestamp::RootTimestamp;(examples/triangles.rs:14). So the dependency of timely-dataflow should be changed into an older version.

    opened by HUWEIWEI0105 1
Owner
Frank McSherry
Frank McSherry
A modular implementation of timely dataflow in Rust

Timely Dataflow Timely dataflow is a low-latency cyclic dataflow computational model, introduced in the paper Naiad: a timely dataflow system. This pr

Timely Dataflow 2.7k Dec 30, 2022
Dataflow system for building self-driving car and robotics applications.

ERDOS ERDOS is a platform for developing self-driving cars and robotics applications. Getting started The easiest way to get ERDOS running is to use o

ERDOS 163 Dec 29, 2022
Experimental Quantum Computer Simulator + Quantum Chess Implementation

Quantum Chess A somewhat hacky implementation of this paper (made in a week over a holiday). It's not heavily tested and probably has some bugs still

null 19 Jan 21, 2022
Ray Tracing: The Next Week implementation in Rust

rttnw Ray Tracing: The Next Week implementation in Rust How to run Install Rust: Link here. Run project git clone https://github.com/luliic2/rttnw cd

null 20 Apr 26, 2022
Rust implementation of µKanren, a featherweight relational programming language.

µKanren-rs This is a Rust implementation of µKanren, a featherweight relational programming language. See the original Scheme implementation here for

Eric Zhang 99 Dec 8, 2022
ESP32 implementation of RustZX Spectrum emulator for ESP32-USB-OTG

RustZX for ESP32 - experimental version Goal of the project: Run ZX Spectrum on ESP32 HW: ESP32 OTG USB with ST7789 display References Rust code for E

Juraj Michálek 22 Nov 9, 2022
An implementation of Olm and Megolm in pure Rust.

A Rust implementation of Olm and Megolm vodozemac is a Rust implementation of libolm, a cryptographic library used for end-to-end encryption in Matrix

matrix.org 66 Dec 26, 2022
Pure Rust Implementation of secp256k1.

SECP256K1 implementation in pure Rust Cargo Documentation SECP256K1 implementation with no_std support. Currently we have implementation for: Convert

Parity Technologies 141 Dec 21, 2022
A Rust implementation of generic prefix tree (trie) map with wildcard capture support

prefix_tree_map A Rust implementation of generic prefix tree (trie) map with wildcard capture support. Design Trie is a good data structure for storin

EAimTY 3 Dec 6, 2022
fast rust implementation of online nonnegative matrix factorization as laid out in the paper "detect and track latent factors with online nonnegative matrix factorization"

ONMF status: early work in progress. still figuring this out. code still somewhat messy. api still in flux. fast rust implementation of online nonnega

null 2 Apr 10, 2020