Dataflow is a data processing library, primarily for machine learning

Overview

Dataflow

CI Status Current Crates.io Version Documentation

Dataflow is a data processing library, primarily for machine learning. It provides efficient pipeline primitives to build a directed acyclic dataflow graph, and a dataloader to run the graph in a seperate thread. It also provides common tokenizers and batching tools to work with textual data.

Usage

To build a pipeline, first start with a loader Node:

use dataflow::pipeline::RandomLoader;

fn main() {
  let pipeline = RandomLoader::new(vec!["file1.txt".to_string(), "file2.txt".to_string()]);
}

The RandomLoader by default loads individual lines randomly from files. Next add a transformation to it with the add_fn() function:

let pipeline = RandomLoader::new(vec!["file1.txt".to_string(), "file2.txt".to_string()])
      .add_fn(|lines| lines.into_iter().map(|line| format!("Hello {}", line)).collect());

This creates a new Stateless Node, which just runs the closure on the data. The closure needs to take a batch of data, which allows you to do batch processing. In this case, we just process sequentially with .into_iter().map().collect()

Now we've added "Hello " to every line, let's create a Stateful Node to hold a Tokenizer and make it tokenize our data:

) and tokenizes all of them, outputting Vec > }, tokenizer // The state we want this Stateful Node to have ) ); ">
use dataflow::pipeline::Stateful;

// Our tokenizer
let tokenizer = dataflow::tokenization::WordpieceTokenizer::load();

// Our pipeline
let pipeline = RandomLoader::new(vec!["file1.txt".to_string(), "file2.txt".to_string()])
      .add_fn(|lines| lines.into_iter().map(|line| format!("Hello {}", line)).collect())
      .add_node(
            Stateful::new(
                  |(lines, tokenizer)| {
                        tokenizer.batch_tokenize(lines) // batch_tokenize takes in many sentences (Vec
     
      ) and tokenizes all of them, outputting Vec
      
       
        >
       
      
     
                  },
                  tokenizer // The state we want this Stateful Node to have
            )
      );

Great! Now our data gets efficiently tokenized as a batch.

Loader Nodes

So far it seems we've only used two types of Nodes, Stateless and Stateful (Stateless was generated when we used .add_fn()). Actually we used three, because RandomLoader is a Node as well! It takes as input Vec<()>, which is what the pipeline will start with, and produces data (Vec) to send through the pipeline.

Custom Nodes

In fact, you can implement your own Nodes as well, by implementing the Node trait! Just implement fn process(Vec) -> Vec in the trait, and optionally fn reset(&mut) which gets called at the beginning of an epoch, and fn data_remaining(&self) -> usize which should return how much data remains availiable to the node (the number of lines we haven't loaded yet for RandomLoader, or usize::MAX for a non-loader Node) and you have your own Node to integrate into the pipeline!

Dataloader

Since we built this cool pipeline, what can we do with it? Well for starters, we could simply call process() and feed in some data, but let's do something cooler. Let's put it in a Dataloader and use it in an ML training loop:

// Make the dataloader
let mut dataloader = dataflow::dataloader::Dataloader(pipeline, 64); // We use 64 as the batch size

// Training loop
for example in &mut dataloader {
   // Now example is a vector of tokenized strings!
   // Do with them what you may...
}

To Do:

  • Make tokenizer loading more efficient
  • Make auto-parallel pipeline Node using rayon
You might also like...
A high-performance, high-reliability observability data pipeline.

Quickstart • Docs • Guides • Integrations • Chat • Download What is Vector? Vector is a high-performance, end-to-end (agent & aggregator) observabilit

Quickwit is a big data search engine.

Quickwit This repository will host Quickwit, the big data search engine developed by Quickwit Inc. We will progressively polish and opensource our cod

A highly efficient daemon for streaming data from Kafka into Delta Lake

A highly efficient daemon for streaming data from Kafka into Delta Lake

TensorBase is a new big data warehousing with modern efforts.
TensorBase is a new big data warehousing with modern efforts.

TensorBase is a new big data warehousing with modern efforts.

Fill Apache Arrow record batches from an ODBC data source in Rust.

arrow-odbc Fill Apache Arrow arrays from ODBC data sources. This crate is build on top of the arrow and odbc-api crate and enables you to read the dat

Analysis of Canadian Federal Elections Data

Canadian Federal Elections election is a small Rust program for processing vote data from Canadian Federal Elections. After building, see election --h

📊  Cube.js — Open-Source Analytics API for Building Data Apps
📊 Cube.js — Open-Source Analytics API for Building Data Apps

📊 Cube.js — Open-Source Analytics API for Building Data Apps

Provides a way to use enums to describe and execute ordered data pipelines. 🦀🐾

enum_pipline Provides a way to use enums to describe and execute ordered data pipelines. 🦀 🐾 I needed a succinct way to describe 2d pixel map operat

Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. 🚀

flaco Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. 🚀 Have a gander at the initial benchmarks

Owner
Sidekick AI
Sidekick AI
DataFrame / Series data processing in Rust

black-jack While PRs are welcome, the approach taken only allows for concrete types (String, f64, i64, ...) I'm not sure this is the way to go. I want

Miles Granger 30 Dec 10, 2022
A Modern Real-Time Data Processing & Analytics DBMS with Cloud-Native Architecture, written in Rust

Datafuse Modern Real-Time Data Processing & Analytics DBMS with Cloud-Native Architecture Datafuse is a Real-Time Data Processing & Analytics DBMS wit

Datafuse Labs 5k Jan 4, 2023
Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing

Apache Arrow Powering In-Memory Analytics Apache Arrow is a development platform for in-memory analytics. It contains a set of technologies that enabl

The Apache Software Foundation 10.9k Jan 6, 2023
Xaynet represents an agnostic Federated Machine Learning framework to build privacy-preserving AI applications.

xaynet Xaynet: Train on the Edge with Federated Learning Want a framework that supports federated learning on the edge, in desktop browsers, integrate

XayNet 196 Dec 22, 2022
This library provides a data view for reading and writing data in a byte array.

Docs This library provides a data view for reading and writing data in a byte array. This library requires feature(generic_const_exprs) to be enabled.

null 2 Nov 2, 2022
New generation decentralized data warehouse and streaming data pipeline

World's first decentralized real-time data warehouse, on your laptop Docs | Demo | Tutorials | Examples | FAQ | Chat Get Started Watch this introducto

kamu 184 Dec 22, 2022
Rayon: A data parallelism library for Rust

Rayon Rayon is a data-parallelism library for Rust. It is extremely lightweight and makes it easy to convert a sequential computation into a parallel

null 7.8k Jan 8, 2023
ConnectorX - Fastest library to load data from DB to DataFrames in Rust and Python

ConnectorX enables you to load data from databases into Python in the fastest and most memory efficient way.

SFU Database Group 939 Jan 5, 2023
A cross-platform library to retrieve performance statistics data.

A toolkit designed to be a foundation for applications to monitor their performance.

Lark Technologies Pte. Ltd. 155 Nov 12, 2022
High-performance runtime for data analytics applications

Weld Documentation Weld is a language and runtime for improving the performance of data-intensive applications. It optimizes across libraries and func

Weld 2.9k Dec 28, 2022