Dataflow
Dataflow is a data processing library, primarily for machine learning. It provides efficient pipeline primitives to build a directed acyclic dataflow graph, and a dataloader to run the graph in a seperate thread. It also provides common tokenizers and batching tools to work with textual data.
Usage
To build a pipeline, first start with a loader Node:
use dataflow::pipeline::RandomLoader;
fn main() {
let pipeline = RandomLoader::new(vec!["file1.txt".to_string(), "file2.txt".to_string()]);
}
The RandomLoader by default loads individual lines randomly from files. Next add a transformation to it with the add_fn() function:
let pipeline = RandomLoader::new(vec!["file1.txt".to_string(), "file2.txt".to_string()])
.add_fn(|lines| lines.into_iter().map(|line| format!("Hello {}", line)).collect());
This creates a new Stateless Node, which just runs the closure on the data. The closure needs to take a batch of data, which allows you to do batch processing. In this case, we just process sequentially with .into_iter().map().collect()
Now we've added "Hello " to every line, let's create a Stateful Node to hold a Tokenizer and make it tokenize our data:
use dataflow::pipeline::Stateful; // Our tokenizer let tokenizer = dataflow::tokenization::WordpieceTokenizer::load(); // Our pipeline let pipeline = RandomLoader::new(vec!["file1.txt".to_string(), "file2.txt".to_string()]) .add_fn(|lines| lines.into_iter().map(|line| format!("Hello {}", line)).collect()) .add_node( Stateful::new( |(lines, tokenizer)| { tokenizer.batch_tokenize(lines) // batch_tokenize takes in many sentences (Vec) and tokenizes all of them, outputting Vec }, tokenizer // The state we want this Stateful Node to have ) );>
Great! Now our data gets efficiently tokenized as a batch.
Loader Nodes
So far it seems we've only used two types of Nodes, Stateless and Stateful (Stateless was generated when we used .add_fn()). Actually we used three, because RandomLoader is a Node as well! It takes as input Vec<()>, which is what the pipeline will start with, and produces data (Vec) to send through the pipeline.
Custom Nodes
In fact, you can implement your own Nodes as well, by implementing the Node
trait! Just implement fn process(Vec) -> Vec
in the trait, and optionally fn reset(&mut)
which gets called at the beginning of an epoch, and fn data_remaining(&self) -> usize
which should return how much data remains availiable to the node (the number of lines we haven't loaded yet for RandomLoader, or usize::MAX for a non-loader Node) and you have your own Node to integrate into the pipeline!
Dataloader
Since we built this cool pipeline, what can we do with it? Well for starters, we could simply call process() and feed in some data, but let's do something cooler. Let's put it in a Dataloader and use it in an ML training loop:
// Make the dataloader
let mut dataloader = dataflow::dataloader::Dataloader(pipeline, 64); // We use 64 as the batch size
// Training loop
for example in &mut dataloader {
// Now example is a vector of tokenized strings!
// Do with them what you may...
}
To Do:
- Make tokenizer loading more efficient
- Make auto-parallel pipeline Node using rayon