Distributed compute platform implemented in Rust, and powered by Apache Arrow.

Overview

Ballista: Distributed Compute Platform

License Crates.io Discord chat

Overview

Ballista is a distributed compute platform primarily implemented in Rust, powered by Apache Arrow. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.

Technologies

The foundational technologies in Ballista are:

Ballista can be deployed as a standalone cluster and also supports Kubernetes. In either case, the scheduler can be configured to use etcd as a backing store to (eventually) provide redundancy in the case of a scheduler failing.

Architecture

The following diagram highlights some of the integrations that will be possible with this unique architecture. Note that not all components shown here are available yet.

Ballista Architecture Diagram

How does this compare to Apache Spark?

Although Ballista is largely inspired by Apache Spark, there are some key differences.

  • The choice of Rust as the main execution language means that memory usage is deterministic and avoids the overhead of GC pauses.
  • Ballista is designed from the ground up to use columnar data, enabling a number of efficiencies such as vectorized processing (SIMD and GPU) and efficient compression. Although Spark does have some columnar support, it is still largely row-based today.
  • The combination of Rust and Arrow provides excellent memory efficiency and memory usage can be 5x - 10x lower than Apache Spark in some cases, which means that more processing can fit on a single node, reducing the overhead of distributed compute.
  • The use of Apache Arrow as the memory model and network protocol means that data can be exchanged between executors in any programming language with minimal serialization overhead.

Examples

The following examples should help illustrate the current capabilities of Ballista:

Project Status

To follow the progress of this project, please refer to the "This Week in Ballista" series of blog posts. Follow @BallistaCompute on Twitter to receive notifications when the blog is updated.

Releases

Ballista releases are now available on crates.io, Maven Central and Docker Hub. Please refer to the user guide for instructions on using a released version of Ballista.

Documentation

The user guide is hosted at https://ballistacompute.org, along with the blog where news and release notes are posted.

Developer documentation can be found in the docs directory.

Contributing

See CONTRIBUTING.md for information on contributing to this project.

Comments
  • Donate Ballista to Apache Arrow project

    Donate Ballista to Apache Arrow project

    The Ballista project has recently reached a point where I believe that the basic architecture has been proven to work. The project has also suddenly become very popular and generated a lot of interest (more than 2k stars on GitHub).

    For these reasons, I think that the project has grown too large for me to continue maintaining as a personal project and I think it is now time to move the code to a foundation to ensure its continued success.

    Given the deep dependencies on Apache Arrow (the core Arrow, DataFusion, and Parquet crates) and the fact that there is already some overlap between Arrow and Ballista committers, I believe that the obvious choice would be to donate the project to Apache Arrow.

    Some of the benefits of donating the project to Arrow are:

    • Ballista unit tests will be part of Arrow CI which means that any changes to Arrow or DataFusion APIs that Ballista depends on will also require that the corresponding Ballista code is updated as part of the same PR.
    • DataFusion also needs a scheduler and it would probably make sense to push some parts of the Ballista scheduler down a level in the stack so that the same approach is used to scale across cores in DataFusion and to scale across nodes in Ballista.
    • There is a team of committers that understand Arrow and DataFusion that can help with PR reviews so I will no longer be a bottleneck.
    • Companies are more likely to commit resources to contributing to an Apache project compared to a personal project.

    I am going to start a discussion on the Arrow mailing list to propose this idea as well.

    help wanted 
    opened by andygrove 31
  • Decouple planning from execution

    Decouple planning from execution

    Big PR... I got carried away, sorry!

    Here's what this PR does: it takes a step in the direction of https://github.com/ballista-compute/ballista/issues/463 and decouples the planning phase from the execution phase.

    When a client submits a query/plan, the only thing the scheduler does is calculate the stages and saves them to etcd. That enables submitting queries even when there are no executors available: once an executor comes online the job will transition to the Running state. It also enables adding more executors to the cluster and the scheduler will be able to dynamically send them whichever tasks are ready to be run.

    How are tasks run? The executor registration endpoint in the scheduler has been renamed to "poll_work", and it now does various things:

    1. It works as a heartbeat for executor liveness
    2. It allows executors to report the status of the tasks that have finished since the last call to poll_work
    3. It allows executors to request more work from the scheduler

    The changes in the scheduler have been made keeping in mind that we should support a scheduler cluster. I haven't tested it, but right now I'm basically doing a big fat distributed lock through etcd, which is very inefficient but also very easy to get right 😂

    The bad parts of this PR:

    1. ~No unit tests.~ (added!)
    2. ~There seems to be a big perf hit when using etcd. Query 1 takes 5 seconds on standalone and 10 seconds with etcd. Not sure why. The scheduler code can still be heavily optimized (I've preferred correctess and simplicity as a first approach), but that huge impact is somewhat surprising.~ (see next comment)
    opened by edrevo 17
  • Implement web UI for scheduler

    Implement web UI for scheduler

    We should implement a web UI in the scheduler that can be used to monitor the state of the cluster and view information on queries that are pending/running/completed/failed.

    This issue is for creating the basic mechanism and then we can file additional issues for functionality, such as:

    • view list of executors
    • view query plan
    • view query metrics
    • cancel query
    help wanted rust 
    opened by andygrove 14
  • Research supporting WASM for custom code as part of a physical plan

    Research supporting WASM for custom code as part of a physical plan

    As a user of Ballista, I would like the ability to execute arbitrary code as part of my distributed job. I want the ability to use multiple languages depending on my requirements (perhaps there are third party libraries I want to use).

    WASM seems like a good potential choice for this?

    design 
    opened by andygrove 10
  • Add helm chart to the project

    Add helm chart to the project

    Currently we have 4 kubernetes configurations, each with small variations of each other. I think that it would be beneficial to provide an helm chart that templates those configurations thus allowing users to configure the cluster and manage releases.

    IMO this will also allow us to more easily benchmark with a variable number of executors and spark vs rust.

    opened by jorgecarleitao 8
  • Implement minimal REST API in the scheduler

    Implement minimal REST API in the scheduler

    We should start building a REST API in the scheduler using rocket. The web UI can use this REST API and it would also allow users to use other tools (such as curl) to write scripts that get metadata from the scheduler.

    For this issue, I would suggest just implementing one method to return a list of executors. We can then write up issues for additional functionality.

    rust 
    opened by andygrove 7
  • Implement physical plan serde for these expressions

    Implement physical plan serde for these expressions

    See ballista/src/serde/physical_plan/to_proto.rs

            } else if let Some(_expr) = expr.downcast_ref::<Literal>() {
                unimplemented!()
            } else if let Some(_expr) = expr.downcast_ref::<CaseExpr>() {
                unimplemented!()
            } else if let Some(_expr) = expr.downcast_ref::<CastExpr>() {
                unimplemented!()
            } else if let Some(_expr) = expr.downcast_ref::<NotExpr>() {
                unimplemented!()
            } else if let Some(_expr) = expr.downcast_ref::<IsNullExpr>() {
                unimplemented!()
            } else if let Some(_expr) = expr.downcast_ref::<IsNotNullExpr>() {
                unimplemented!()
            } else if let Some(_expr) = expr.downcast_ref::<InListExpr>() {
                unimplemented!()
            } else if let Some(_expr) = expr.downcast_ref::<InListExpr>() {
                unimplemented!()
            } else if let Some(_expr) = expr.downcast_ref::<NegativeExpr>() {
                unimplemented!()
            } else if let Some(_expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
                unimplemented!()
    
    help wanted rust 
    opened by andygrove 7
  • Review use of HashMap in ColumnarBatch in branch-0.4

    Review use of HashMap in ColumnarBatch in branch-0.4

    I tried merging latest from main branch into branch-0.4 this morning and there was an issue with the changes in ColumnarBatch to use a HashMap of field name to column rather than just using a vec. I'd like to understand the motivation for this change and whether we can revert to using a vec.

    The output from any operator should be deterministic and have a fixed schema, IMO, but maybe I am missing something?

    rust 
    opened by andygrove 7
  • [design] generic system testing infrastructure

    [design] generic system testing infrastructure

    Going forward I think Ballista will need some generic system testing infrastructure. I am proposing we write this in Python and make use of the PyUnit framework. While this library is intended for unit testing, I have found it works quite well for generic system testing. I think python is a good choice for this because it is system agnostic and will allow us to write system tests for all of the Ballista components in a single place.

    I have three high level goals:

    • test correctness
    • make debugging for developers easy
    • portability, we don't want to tightly integrate testing infrastructure with CI system and tests should be able to run locally

    The core component is the Harness class which derives from unittest.TestCase. Harnesses for specific tests can derive from our base Harness class, e.g for testing python bindings, rust bindings and JDBC. This provides a nice testing life cycle. For example if we have a harness with testing methods test_x() and test_y() the life cycle looks like:

    • setUpClass() # Set up for the entire Harness - create resources on a kubernetes cluster.
    • setUp() # set up before each test -
    • test_x()
    • tearDown() # tear down after each test
    • setUp()
    • test_y()
    • tearDown()
    • tearDownClass() # tear down after all tests are complete - remove created resources

    In setUpClass() we can spin up a local kubernetes cluster using minikube/microk8s. I plan to write a nice wrapper script for cluster utilities. (cluster.py in the diagram)

    We need an entry point to run our tests (runner.py in the diagram). This script should be usable in CI and locally for debugging. I think we could use something like xml runner to get xml reports which can be consumed by a CI server.

    I am not sure how to integrate this with buildkite yet, but my goal is to write infrastructure that should work anywhere and not integrate too tightly with any one CI system.

    I am going to start with the cluster.py utility.

    image

    design 
    opened by Shamazo 6
  • Can't compile because of `packed_simd` version 0.3.3

    Can't compile because of `packed_simd` version 0.3.3

    Hi guys, the dependency of packed_simd v0.3.3 seemed broken on crates.io[0]

    When I follow the user guide, it can't compile because of packed_simd v0.3.3:

    error[E0432]: unresolved import `crate::arch::x86_64::_mm_shuffle_pi8`
       --> /Users/yxie/.cargo/registry/src/github.com-1ecc6299db9ec823/packed_simd-0.3.3/src/codegen/shuffle1_dyn.rs:40:29
        |
    40  |                         use crate::arch::x86_64::_mm_shuffle_pi8;
        |                             ^^^^^^^^^^^^^^^^^^^^^---------------
        |                             |                    |
        |                             |                    help: a similar name exists in the module: `_mm_shuffle_epi8`
        |                             no `_mm_shuffle_pi8` in `arch::x86_64`
    

    ...

    error: aborting due to 7 previous errors
    
    For more information about this error, try `rustc --explain E0432`.
    error: could not compile `packed_simd`
    

    And it seems that the original packed_simd package will no longer be updated, they choose to use another package name packed_simd_2[1] instead.

    So do we have plans to update this dependency?

    [0] https://github.com/rust-lang/packed_simd#the-cratesio-version-can-no-longer-be-updated [1] https://crates.io/crates/packed_simd_2

    bug rust 
    opened by pengye91 6
  • [Design] High level API proposal for ballista

    [Design] High level API proposal for ballista

    I've been thinking lately about what the best API is for the different parts of ballista. For now, I'm still tabling the discussion around resource managers (K8s, Mesos, YARN, etc.) and I'll focus on scheduler, executors and clients.

    I see that right now the executors implement the flight protocol, which makes perfect sense for arrow data transmission. I think this is a great fit for the "data plane" in ballista: executor<->executor communication and executor <-> client communication (for .collect).

    When it comes to the "control plane", though, I think stream-based mechanisms like the flight protocol (or even the bidirectional streams in gRPC) aren't great: they pin the communication of the client to a specific server (scheduler, in this case), so that makes it harder to dynamically increase the number of instances in a scheduler cluster (you could increase the instances, but all existing clients would still be talking the the same initial scheduler).

    I also think that unary RPCs are easier to implement, and have a higher degree of self-documentation through the protobuf definition.

    So, here's my proposal for the control plane:

    image

    The arrows go from client to server in this above image.

    The scheduler would have an API that would look something like this:

    service SchedulerGrpc {
      // This is the only call that executors need. It works similar to how Kafka's consumer/producers poll the brokers.
      // The executor needs to poll the scheduler through this method every X seconds to be part of the cluster.
      // As a reply, it gets information about the cluster state and configuration, along with a potential work item
      // that needs to be added to the executor's work queue.
      rpc PollWork (ExecutorState) returns (GetExecutorMetadataResult) {}
    
    
      // These methods are for scheduler-client interaction. These are the methods that would need to be called from
      // any client wrappers we want to provide, so they should be kept as simple as possible.
    
      rpc Execute (Action) returns (JobId) {}
    
      // JobStatus includes metadata about the job's progress along with the necessary Flight tickets to be able to
      // retrieve results through the data plane
      rpc GetJobStatus (JobId) returns (JobStatus) {}
    }
    

    I can drill down into the message definitions if you want, but I'm still not 100% sure of the complete amount of info that needs to go there (I know part of it).

    The proposal for data plane is much simpler:

    image

    All of these would be based on the Flight Protocol, but we wouldn't use any of the control messages that flight offers (i.e. no DoAction).

    Thoughts?

    design 
    opened by edrevo 6
Releases(v0.4.1)
Owner
Ballista
Distributed compute platform based on Apache Arrow
Ballista
Python package to compute levensthein distance in rust

Contents Introduction Installation Usage License Introduction Rust implementation of levensthein distance (https://en.wikipedia.org/wiki/Levenshtein_d

Thibault Blanc 2 Feb 21, 2022
Toy library for neural networks in Rust using Vulkan compute shaders

descent Toy library for neural networks in Rust using Vulkan compute shaders. Features Multi-dimensional arrays backed by Vulkan device memory Use Rus

Simon Brown 71 Dec 16, 2022
Allow DataFusion to resolve queries across remote query engines while pushing down as much compute as possible down.

DataFusion Federation The goal of this repo is to allow DataFusion to resolve queries across remote query engines while pushing down as much compute a

null 15 Mar 11, 2024
High performance distributed framework for training deep learning recommendation models based on PyTorch.

PERSIA (Parallel rEcommendation tRaining System with hybrId Acceleration) is developed by AI platform@Kuaishou Technology, collaborating with ETH. It

null 340 Dec 30, 2022
A distributed network for cloud-native services

fólkvangr A distributed network for cloud-native services About Fólkvangr started out as a way to build social networks that were secure and decentral

Ignacy Koper 4 Jan 12, 2023
Pyxirr - Rust-powered collection of financial functions for Python.

PyXIRR Rust-powered collection of financial functions. PyXIRR stands for "Python XIRR" (for historical reasons), but contains many other financial fun

Alexander Volkovsky 82 Jan 2, 2023
BudouX-rs is a rust port of BudouX (machine learning powered line break organizer tool).

BudouX-rs BudouX-rs is a rust port of BudouX (machine learning powered line break organizer tool). Note: This project contains the deliverables of the

null 5 Jan 20, 2022
Label Propagation Algorithm by Rust. Label propagation (LP) is graph-based semi-supervised learning (SSL). LGC and CAMLP have been implemented.

label-propagation-rs Label Propagation Algorithm by Rust. Label propagation (LP) is graph-based semi-supervised learning (SSL). A simple LGC and a mor

vaaaaanquish 4 Sep 15, 2021
LLaMa 7b with CUDA acceleration implemented in rust. Minimal GPU memory needed!

LLaMa 7b in rust This repo contains the popular LLaMa 7b language model, fully implemented in the rust programming language! Uses dfdx tensors and CUD

Corey Lowman 16 May 8, 2023
A fast and cross-platform Signed Distance Function (SDF) viewer, easily integrated with your SDF library.

SDF Viewer (demo below) A fast and cross-platform Signed Distance Function (SDF) viewer, easily integrated with your SDF library. A Signed Distance Fu

null 39 Dec 21, 2022
Ecosystem of libraries and tools for writing and executing extremely fast GPU code fully in Rust.

Ecosystem of libraries and tools for writing and executing extremely fast GPU code fully in Rust.

Riccardo D'Ambrosio 2.1k Jan 5, 2023
Ecosystem of libraries and tools for writing and executing fast GPU code fully in Rust.

The Rust CUDA Project An ecosystem of libraries and tools for writing and executing extremely fast GPU code fully in Rust Guide | Getting Started | Fe

Rust GPU 2.1k Dec 30, 2022
Robust and Fast tokenizations alignment library for Rust and Python

Robust and Fast tokenizations alignment library for Rust and Python

Yohei Tamura 14 Dec 10, 2022
Narwhal and Tusk A DAG-based Mempool and Efficient BFT Consensus.

This repo contains a prototype of Narwhal and Tusk. It supplements the paper Narwhal and Tusk: A DAG-based Mempool and Efficient BFT Consensus.

Facebook Research 134 Dec 8, 2022
MesaTEE GBDT-RS : a fast and secure GBDT library, supporting TEEs such as Intel SGX and ARM TrustZone

MesaTEE GBDT-RS : a fast and secure GBDT library, supporting TEEs such as Intel SGX and ARM TrustZone MesaTEE GBDT-RS is a gradient boost decision tre

MesaLock Linux 179 Nov 18, 2022
[WIP] An experimental Java-like language and it's virtual machine, for learning Java and JVM.

Sky VM An experimental Java-like language and it's virtual machine, for learning Java and JVM. Dependencies Rust (rust-lang/rust) 2021 Edition, dual-l

Kk Shinkai 2 Jan 3, 2022
Some hacks and failed experiments surrounding nvidia's gamestream protocol and sunshine/moonlight implementations

Sunrise This repository contains a bunch of experiments surrounding the nvidia gamestream protocol and reimplementations in the form of sunshine and m

Victoria Brekenfeld 5 Dec 21, 2022
Msgpack serialization/deserialization library for Python, written in Rust using PyO3, and rust-msgpack. Reboot of orjson. msgpack.org[Python]

ormsgpack ormsgpack is a fast msgpack library for Python. It is a fork/reboot of orjson It serializes faster than msgpack-python and deserializes a bi

Aviram Hassan 139 Dec 30, 2022
Tensors and differentiable operations (like TensorFlow) in Rust

autograd Differentiable operations and tensors backed by ndarray. Motivation Machine learning is one of the field where Rust lagging behind other lang

Ryo ASAKURA 403 Dec 25, 2022