Apache Arrow DataFusion and Ballista query engines

Last update: Jun 18, 2022

DataFusion

DataFusion is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.

DataFusion supports both an SQL and a DataFrame API for building logical query plans as well as a query optimizer and execution engine capable of parallel execution against partitioned data sources (CSV and Parquet) using threads.

Use Cases

DataFusion is used to create modern, fast and efficient data pipelines, ETL processes, and database systems, which need the performance of Rust and Apache Arrow and want to provide their users the convenience of an SQL interface or a DataFrame API.

Why DataFusion?

  • High Performance: Leveraging Rust and Arrow's memory model, DataFusion achieves very high performance
  • Easy to Connect: Being part of the Apache Arrow ecosystem (Arrow, Parquet and Flight), DataFusion works well with the rest of the big data ecosystem
  • Easy to Embed: Allowing extension at almost any point in its design, DataFusion can be tailored for your specific usecase
  • High Quality: Extensively tested, both by itself and with the rest of the Arrow ecosystem, DataFusion can be used as the foundation for production systems.

Known Uses

Here are some of the projects known to use DataFusion:

(if you know of another project, please submit a PR to add a link!)

Example Usage

Run a SQL query against data stored in a CSV:

use datafusion::prelude::*;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
  // register the table
  let mut ctx = ExecutionContext::new();
  ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())?;

  // create a plan to run a SQL query
  let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;

  // execute and print results
  let results: Vec<RecordBatch> = df.collect().await?;
  print_batches(&results)?;
  Ok(())
}

Use the DataFrame API to process data stored in a CSV:

use datafusion::prelude::*;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
  // create the dataframe
  let mut ctx = ExecutionContext::new();
  let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;

  let df = df.filter(col("a").lt_eq(col("b")))?
          .aggregate(vec![col("a")], vec![min(col("b"))])?
          .limit(100)?;

  // execute and print results
  let results: Vec<RecordBatch> = df.collect().await?;
  print_batches(&results)?;
  Ok(())
}

Both of these examples will produce

+---+--------+
| a | MIN(b) |
+---+--------+
| 1 | 2      |
+---+--------+

Using DataFusion as a library

DataFusion is published on crates.io, and is well documented on docs.rs.

To get started, add the following to your Cargo.toml file:

[dependencies]
datafusion = "4.0.0-SNAPSHOT"

Using DataFusion as a binary

DataFusion also includes a simple command-line interactive SQL utility. See the CLI reference for more information.

Status

General

  • SQL Parser
  • SQL Query Planner
  • Query Optimizer
  • Constant folding
  • Join Reordering
  • Limit Pushdown
  • Projection push down
  • Predicate push down
  • Type coercion
  • Parallel query execution

SQL Support

  • Projection
  • Filter (WHERE)
  • Filter post-aggregate (HAVING)
  • Limit
  • Aggregate
  • Common math functions
  • cast
  • try_cast
  • Postgres compatible String functions
    • ascii
    • bit_length
    • btrim
    • char_length
    • character_length
    • chr
    • concat
    • concat_ws
    • initcap
    • left
    • length
    • lpad
    • ltrim
    • octet_length
    • regexp_replace
    • repeat
    • replace
    • reverse
    • right
    • rpad
    • rtrim
    • split_part
    • starts_with
    • strpos
    • substr
    • to_hex
    • translate
    • trim
  • Miscellaneous/Boolean functions
    • nullif
  • Common date/time functions
    • Basic date functions
    • Basic time functions
    • Basic timestamp functions
  • nested functions
    • Array of columns
  • Schema Queries
    • SHOW TABLES
    • SHOW COLUMNS
    • information_schema.{tables, columns}
    • information_schema other views
  • Sorting
  • Nested types
  • Lists
  • Subqueries
  • Common table expressions
  • Set Operations
    • UNION ALL
    • UNION
    • INTERSECT
    • MINUS
  • Joins
    • INNER JOIN
    • LEFT JOIN
    • RIGHT JOIN
    • CROSS JOIN
    • OUTER JOIN
  • Window

Data Sources

  • CSV
  • Parquet primitive types
  • Parquet nested types

Extensibility

DataFusion is designed to be extensible at all points. To that end, you can provide your own custom:

  • User Defined Functions (UDFs)
  • User Defined Aggregate Functions (UDAFs)
  • User Defined Table Source (TableProvider) for tables
  • User Defined Optimizer passes (plan rewrites)
  • User Defined LogicalPlan nodes
  • User Defined ExecutionPlan nodes

Supported SQL

This library currently supports many SQL constructs, including

  • CREATE EXTERNAL TABLE X STORED AS PARQUET LOCATION '...'; to register a table's locations
  • SELECT ... FROM ... together with any expression
  • ALIAS to name an expression
  • CAST to change types, including e.g. Timestamp(Nanosecond, None)
  • most mathematical unary and binary expressions such as +, /, sqrt, tan, >=.
  • WHERE to filter
  • GROUP BY together with one of the following aggregations: MIN, MAX, COUNT, SUM, AVG
  • ORDER BY together with an expression and optional ASC or DESC and also optional NULLS FIRST or NULLS LAST

Supported Functions

DataFusion strives to implement a subset of the PostgreSQL SQL dialect where possible. We explicitly choose a single dialect to maximize interoperability with other tools and allow reuse of the PostgreSQL documents and tutorials as much as possible.

Currently, only a subset of the PosgreSQL dialect is implemented, and we will document any deviations.

Schema Metadata / Information Schema Support

DataFusion supports the showing metadata about the tables available. This information can be accessed using the views of the ISO SQL information_schema schema or the DataFusion specific SHOW TABLES and SHOW COLUMNS commands.

More information can be found in the Postgres docs).

To show tables available for use in DataFusion, use the SHOW TABLES command or the information_schema.tables view:

> show tables;
+---------------+--------------------+------------+------------+
| table_catalog | table_schema       | table_name | table_type |
+---------------+--------------------+------------+------------+
| datafusion    | public             | t          | BASE TABLE |
| datafusion    | information_schema | tables     | VIEW       |
+---------------+--------------------+------------+------------+

> select * from information_schema.tables;

+---------------+--------------------+------------+--------------+
| table_catalog | table_schema       | table_name | table_type   |
+---------------+--------------------+------------+--------------+
| datafusion    | public             | t          | BASE TABLE   |
| datafusion    | information_schema | TABLES     | SYSTEM TABLE |
+---------------+--------------------+------------+--------------+

To show the schema of a table in DataFusion, use the SHOW COLUMNS command or the or information_schema.columns view:

> show columns from t;
+---------------+--------------+------------+-------------+-----------+-------------+
| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |
+---------------+--------------+------------+-------------+-----------+-------------+
| datafusion    | public       | t          | a           | Int32     | NO          |
| datafusion    | public       | t          | b           | Utf8      | NO          |
| datafusion    | public       | t          | c           | Float32   | NO          |
+---------------+--------------+------------+-------------+-----------+-------------+

>   select table_name, column_name, ordinal_position, is_nullable, data_type from information_schema.columns;
+------------+-------------+------------------+-------------+-----------+
| table_name | column_name | ordinal_position | is_nullable | data_type |
+------------+-------------+------------------+-------------+-----------+
| t          | a           | 0                | NO          | Int32     |
| t          | b           | 1                | NO          | Utf8      |
| t          | c           | 2                | NO          | Float32   |
+------------+-------------+------------------+-------------+-----------+

Supported Data Types

DataFusion uses Arrow, and thus the Arrow type system, for query execution. The SQL types from sqlparser-rs are mapped to Arrow types according to the following table

SQL Data Type Arrow DataType
CHAR Utf8
VARCHAR Utf8
UUID Not yet supported
CLOB Not yet supported
BINARY Not yet supported
VARBINARY Not yet supported
DECIMAL Float64
FLOAT Float32
SMALLINT Int16
INT Int32
BIGINT Int64
REAL Float64
DOUBLE Float64
BOOLEAN Boolean
DATE Date32
TIME Time64(TimeUnit::Millisecond)
TIMESTAMP Date64
INTERVAL Not yet supported
REGCLASS Not yet supported
TEXT Not yet supported
BYTEA Not yet supported
CUSTOM Not yet supported
ARRAY Not yet supported

Architecture Overview

There is no formal document describing DataFusion's architecture yet, but the following presentations offer a good overview of its different components and how they interact together.

  • (March 2021): The DataFusion architecture is described in Query Engine Design and the Rust-Based DataFusion in Apache Arrow: recording (DataFusion content starts ~ 15 minutes in) and slides
  • (Feburary 2021): How DataFusion is used within the Ballista Project is described in *Ballista: Distributed Compute with Rust and Apache Arrow: recording

Developer's guide

Please see Developers Guide for information about developing DataFusion.

GitHub

https://github.com/apache/arrow-datafusion
Comments
  • 1. Experimenting with arrow2

    I have been experimenting using arrow2 & parquet2 as backends for DataFusion. This is WIP and does not compile, but I would like to give some visibility to this work.

    So far, I was able to keep all functionality, with a guaranteed increase in security and potentially some performance.

    Goals:

    • compile
    • tests pass
    • re-write readers and writers to leverage parallelism of both parquet2 and the CSV reader in arrow2
    • ???
    • profit

    Some notes:

    • I removed CastOptions because casting does not need to be fallible; we can make any non-castable null and recover the set of failed casts from the differences in validity between the original array and the casted array, if so we wish.

    • most kernels in Arrow2 return a Box<dyn Array> for some reasons; we use result.into() to convert to Arc. This is very cheap and the best that arrow has to offer without the unstable channel.

    • There are some changes in min and max for floats. Essentially, arrow2 guarantees that the comparison operator used in sort for floats is the same as the one used in min/max, which required this small change due to Ord for floats still be part of unstable rust.

    Reviewed by jorgecarleitao at 2021-04-25 14:23
  • 2. add udf/udaf plugin

    closes https://github.com/apache/arrow-datafusion/issues/1882

    In this PR, I have implemented the plug-in of UDF. In the next PR, I will complete the serialization and deserialization of UDF / udaf by ballista relying on UDF plugin.

    Reviewed by EricJoy2048 at 2022-02-25 06:57
  • 3. Rework GroupByHash for faster performance and support grouping by nulls

    Rationale

    1. The current GroupByHash operator does not take NULL into account and thus produces incorrect answers when grouping on columns that contain NULL, as described in #782 and #781.
    2. Without additional changes, adding support for NULL in grouping will likely both slow down group by hashing as well as increase the memory overhead per group (e.g. see #786)

    Thus this ticket proposes to rearrange the GroupByHash code to be more efficient in both space and time, thus providing us with a performance budget to add NULL support without an overall regression

    Overview of Current GroupByHash

    This section explains the current state of GroupByHash on master at https://github.com/apache/arrow-datafusion/commit/54163410da05e8e6c68af55d699bf6a89e229bb6

    At a high level, the group by hash does the following for each input row, in a vectorized fashion:

    1. Compute the group by key values (the expressions that appear in the GROUP BY clause)
    2. Form a key out of the group by values
    3. Find/Create an entry in hash map of (key values) --> (accumulators)
    4. Update the accumulators (one for each aggregate, such as COUNT that appears in the query) with the arguments

    When all the input has been processed, then the hash table is drained, producing one row for each entry in the hash table, in the following manner:

    (group key1, group key2, ...) (aggregate1, aggregrate2, ...)
    

    So for example, given a query such as

    SELECT SUM(c1)
    FROM t
    GROUP BY k1, abs(k2)
    

    This looks something like

     ┌────┐ ┌────┐               ┌────┐ ┌────┐         ┌─────────────┐
     │    │ │    │               │    │ │    │────────▶│  key 1234   │
     │    │ │    │               │    │ │    │         └─────────────┘
     │    │ │    │               │    │ │    │
     │    │ │    │               │    │ │    │         ┌─────────────┐
     │    │ │    │               │    │ │    │────────▶│   key 23    │
     │    │ │    │               │    │ │abs │         └─────────────┘
     │ k1 │ │ k2 │──────────────▶│ k1 │ │(k2)│
     │    │ │    │               │    │ │    │
     │    │ │    │               │    │ │    │        ...
     │    │ │    │               │    │ │    │
     │    │ │    │               │    │ │    │
     │    │ │    │               │    │ │    │         ┌─────────────┐
     │    │ │    │               │    │ │    │────────▶│   key 321   │
     └────┘ └────┘               └────┘ └────┘         └─────────────┘
                                  group by key
      input data                     values
                      step 1:    (group_values)        step 2:
                     evaluate               create a variable sized hash
                     gby exprs                 table key for each row
    

    The hash table that is formed looks something like this:

    ┌───────────────┐         ┌──────────────────┬────────────┬─────────────────┐
    │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │
    ││  key 1234   ├┼────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     │
    │└─────────────┘│         │ formed key 1234  │            │ (scratch space) │
    │               │         └──────────────────┴────────────┴─────────────────┘
    │               │
    │     ...       │
    │               │                  ...
    │               │
    │               │
    │               │
    │               │         ┌──────────────────┬────────────┬─────────────────┐
    │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │
    ││   key 321   │├────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     │
    │└─────────────┘│         │  formed key 321  │            │ (scratch space) │
    └───────────────┘         └──────────────────┴────────────┴─────────────────┘
      hash table
    "accumulators"
    
    
               Step 3:                   NOTE:  Each entry in the hash table has
     The keys are used to find an        1. The original group keys
       entry in the hash table           2. The accumulators
        which then are mapped            3. row_indexes scratch space
    

    Key Formation

    The current state of the art, introduced in 93de66ae67a33764ac4029b0f825c415b9b2e92d / https://github.com/apache/arrow/pull/8863 by @Dandandan is quite clever. The code in create_key, packs data from the group keys together into a single mut Vec which is then used as the key for the hash table

    For example, if the input row was:

    {
      k1: "foo"
      k2: 0x1234 as u16
    }
    

    The resuling key is a 13 byte Vec, 11 bytes for "foo" (8 bytes for the length + 3 bytes for "foo") and 2 bytes for 0x1234, a 16 bit integer:

                            │        │
         string len                      0x1234
        (as usize le)       │  "foo" │    as le
    ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
    │00│00│00│00│00│00│00│03│"f│"o│"o│34│12│
    └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘   byte
     0  1  2  3  4  5  6  7 │8  9  10│11 12   offset
    
                            │        │
    
    

    However, there are at least a few downsides of this approach:

    1. There is no way to represent NULL as mentioned by @Dandandan on https://github.com/apache/arrow-datafusion/issues/782#issuecomment-887698068
    2. The data for each group key value is currently stored twice -- once in the Vec key and once in the values as a GroupByScalar used to produce the output -- resulting in memory overhead, especially for variable length (e.g. string) values

    Proposal

    Modeled after what I think @Dandandan is suggesting in https://github.com/apache/arrow-datafusion/pull/786#issuecomment-888229063:

    The HashTable would not store the key or aggregate accumulators directly, but instead would map "signatures" computed from the group by keys to list offsets in a mutable storage area that contained the values and aggregates.

    The "signature" is simply a hash of the values (and validitiy bit) of the group key values. The term "signature" is used to avoid confusion with the hash used in the hash table. It would be computed as a u64 or similar directly from the group by key values

    ┌────┐ ┌────┐         ┌─────────────┐
    │    │ │    │────────▶│   0x1133    │
    │    │ │    │         └─────────────┘
    │    │ │    │         ┌─────────────┐
    │    │ │    │────────▶│   0x432A    │
    │    │ │    │         └─────────────┘
    │    │ │abs │
    │ k1 │ │(k2)│
    │    │ │    │         ...
    │    │ │    │
    │    │ │    │
    │    │ │    │
    │    │ │    │         ┌─────────────┐
    │    │ │    │────────▶│   0x432A    │
    └────┘ └────┘         └─────────────┘
     group by key
        values
    (group_values)
    
    Step 2: Create a FIXED LENGTH signature
      (e.g. u64) by hashing the values in
                the group by key
    

    The hashtable composition would be different. Each entry is a SmallVec (non allocating Vec) containing a list of indicies into a "mutable storage" area

    ┌───────────────┐
    │┌─────────────┐│    ┌───────┐                       ┌──────┐┌──────┐ ┌────────────┐
    ││   0x1133    ├┼───▶│  [1]  │─ ─ ─ ─ ─ ─            │      ││      │ │            │
    │└─────────────┘│    └───────┘           └ ─ ─ ─ ─ ─ │      ││      │ │            │
    │               │                                    │  k1  ││ abs  │ │            │
    │               │                         ─ ─ ─ ─ ─ ▶│values││ (k2) │ │            │
    │     ...       │                        │           │      ││values│ │Accumulators│
    │               │       ...                          │ (and ││      │ │  for SUM   │
    │               │                        │           │valid ││ (and │ │            │
    │               │                         ─ ─ ─ ─ ─ ▶│mask) ││valid │ │            │
    │               │                        │           │      ││mask) │ │            │
    │               │                                    │      ││      │ │            │
    │┌─────────────┐│    ┌───────┐           │           │      ││      │ │            │
    ││   0x432A    │├───▶│ [2,4] │─ ─ ─ ─ ─ ─            └──────┘└──────┘ └────────────┘
    │└─────────────┘│    └───────┘
    └───────────────┘               values are lists
     keys are gby key               (SmallVec) of         mutable storage
     signatures                     offsets into
                                    storage tables
    hashtable
    

    Edit: Collisions are handled by the fact that the entry in the hash table is a list of indices into the mutable storage area -- if there are multiple values in that list each entry in the mutable area needs to be checked for equality to find the correct one.

    The mutable storage area contains:

    1. A Vec of ScalarValues for each group key column
    2. The Vec of accumulators for each grouping

    For example, this is one example of how (logically) this mutable storage would work

            valid          valid
             bit            bit
            mask           mask
     ┌────┐┌────┐   ┌────┐┌────┐   ┌────┐
     │"D" ││ t  │   │ 1  ││ t  │   │ 11 │
     ├────┤├────┤   ├────┤├────┤   ├────┤
     │"C" ││ t  │   │ 3  ││ t  │   │ 3  │
     ├────┤├────┤   ├────┤├────┤   ├────┤
     │"A" ││ t  │   │ 1  ││ t  │   │ 27 │
     ├────┤├────┤   ├────┤├────┤   ├────┤
     │"D" ││ t  │   │ 2  ││ t  │   │ 2  │
     ├────┤├────┤   ├────┤├────┤   ├────┤
     │ "" ││ t  │   │ 0  ││ f  │   │ 4  │
     └────┘└────┘   └────┘└────┘   └────┘
    
       group by key storage      Accumulator
            (5 groups)             for SUM
                                (aggregates)
    
    
       Example showing how groups
      (D,1), (C,3), (A,1), (D,2),
          (NULL, 0) are stored
    

    I propose using Vec<ScalarValue> to store the group key values in the mutable area as there is no equivalent of a mutable Array in arrow-rs yet (though I think there is MutablePrimitiveArray in arrow2). If/when we get access to a mutable array in datafusion, we can potentially switch to using that representation for the mutable storage area, which would likely both take less memory for some data types, but also allow for faster output generation.

    Alternatives considered

    One alternate that would require fewer changes but be slightly slower would be to append a validity bitmap on the end of both the keys and values in the hash table. For example

          Alternate Design
     ┌───────────────┐         ┌──────────────────┬────────────┬─────────────────╦ ═ ═ ═ ═ ═ ═ ═ ═ ╗
     │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │  Null Bitmask
     ││  key 1234   ├┼────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     ║      (NEW)      ║
     │└─────────────┘│         │ formed key 1234  │            │ (scratch space) │
     │               │         └──────────────────┴────────────┴─────────────────╩ ═ ═ ═ ═ ═ ═ ═ ═ ╝
     │               │
     │     ...       │
     │               │                  ...
     │               │
     │               │
     │               │
     │               │         ┌──────────────────┬────────────┬─────────────────╦ ═ ═ ═ ═ ═ ═ ═ ═ ╗
     │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │  Null Bitmask
     ││  key 3211   │├────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     ║      (NEW)      ║
     │└─────────────┘│         │ formed key 3211  │            │ (scratch space) │
     └───────────────┘         └──────────────────┴────────────┴─────────────────╩ ═ ═ ═ ═ ═ ═ ═ ═ ╝
    

    And the keys would have a null bitmask appended on the end:

                                               │        │
                            string len                      0x1234
                           (as usize le)       │  "foo" │    as le
    {                  ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──╦ ═
      k1: "foo"        │00│00│00│00│00│00│00│03│"f│"o│"0│34│12│00║◀ ─ ─
      k2: 0x1234u16    └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──╩ ═      │
    }                   0  1  2  3  4  5  6  7 │8  9  10│11 12 13
                                                                       │
                                               │        │
                                                                       │
    
                                                                New bitmask at
                                                                 end of each
                                                                     key
    
    
    Reviewed by alamb at 2021-07-28 17:49
  • 4. S3 Support

    Now that there is a PR in-flight for remote storage systems (#811 ), I wanted to ask what the plan was to implement s3 support. Is this something that will live within the datafusion project or maintained outside? Also, does anyone plan on working on this, or is this something that you are looking for a contributor for?

    Also, I'm new to the project, so what is the best way to ask these kind of questions...github issues or on the email list? thanks!

    Reviewed by ehenry2 at 2021-08-19 14:30
  • 5. Support qualified columns in queries

    This turned out to be a much larger/destructive PR than I initially expected. Would like to get some early feedback on the approach before I spend more time working on the clean up. So far I have been able to get all unit tests to pass.

    TODO:

    • [x] Address FIXMEs and TODOs
    • [x] Check integration tests
    • [x] Rebase to latest master

    Which issue does this PR close?

    closes #56 closes #57 closes #311

    What changes are included in this PR?

    Here is the main design change introduced in this PR:

    • Physical column expression now references columns by unique indices instead of names
    • Logical column expression now wraps around a newly defined Column struct that can represent both qualified and unqualified columns

    Query execution flow change:

    1. When a TableScan plan has table_name set to Some(name), all of the fields in its schema will be created as fully qualified fields.
    2. Logical plan builder is responsible for normalizing all logical column expressions by adding qualifier based on schema wherever applicable.
    3. Logical plan optimizer operates on normalized column expressions.
    4. During physical planning, logical column expressions are resolved to physical column expressions with corresponding indices based on logical plan schemas. Notice a side effect of this is we don't look up column index during execution anymore. It is now done at planning time.
    5. During physical planning, physical schema (arrow schema) has all column qualifiers stripped.

    Some other changes introduced in this PR to help make all tests pass:

    • avoid coalesce for hash repartition plan
    • added partitioned hash join tests to hash_join module
    • added support for join with alias (for self join) https://github.com/apache/arrow-datafusion/pull/547
    • added join_using method to logical plan builder
    • fixed cross join handling in projection push down in optimizer (schema fields not trimmed based on pushed down table scan projections)
    • fixed join handling in projection push down in optimizer (schema fields not trimmed based on pushed down table scan projections)
    • produce unique join columns when using join constraint is being used
    • ser/de physical plans in ballista without going through physical planner
    • fixed couple other bugs here and there along the way, but couldn't remember :(

    Output field name are not 100% conforming to https://github.com/apache/arrow-datafusion/blob/master/docs/specification/output-field-name-semantic.md yet. The remaining differences are all minor formatting issues like converting function names to lower cases, so I decided to leave that to follow up PRs to reduce the diff.

    Are there any user-facing changes?

    breaking api changes:

    • Column expression now wraps Column struct instead of String
    • TableScan plan now takes table_name as Option instead of String
    • Various dataframe scan method now takes table name as Option instead of &str
    • Physical Column expression now requires index field
    • logical planer builder join method now takes left and right keys as Vec<impl Into<Column>> instead of &[&str]
    Reviewed by houqp at 2021-04-25 09:15
  • 6. Release a new version of DataFusion to crates.io

    Is your feature request related to a problem or challenge? Please describe what you are trying to do. The current version of datafusion on crates.io is 4.0.0 (which among other things doesn't work with the newly released arrow 5.0.0)

    Describe the solution you'd like It would be great if we released a DataFusion 5.0.0 (or some other number if we want to diverge from arrow) -- doing so would likely involve porting the arrow-rs release scripts, from https://github.com/apache/arrow-rs/tree/master/dev/release and then sending it to the dev mailing list for a formal vote

    Starting this ticket to gather some feedback

    Reviewed by alamb at 2021-07-23 19:51
  • 7. Avro Table Provider

    Which issue does this PR close?

    Closes #903.

    Rationale for this change

    Enables loading avro data files through datafusion.

    What changes are included in this PR?

    Avro is added as a table provider and a supported file format. Avro schemas can be translated into arrow schemas.

    Are there any user-facing changes?

    Yes, as one can now call register_avro on df and 'STORED AS AVRO' in sqsl.

    N.B.:

    • Need to add tests in avro_to_arrow/arrow_array_reader.rs

    Missing :

    • Writing back to avro

    I find there is duplication between modules with these additions, I should probably do some refactoring.

    Reviewed by Igosuki at 2021-08-20 17:23
  • 8. Release of DataFusion: 7.0.0

    I wonder if it is time to release a new version of datafusion to crates.io?

    It would be great to crowdsource:

    1. Update readme / changelog
    2. Update version
    3. (maybe) a blog post?

    I am happy to handle creating a release candidate / doing the official voting process.

    Reviewed by alamb at 2022-01-16 12:14
  • 9. Release Datafusion 6.0.0

    Is your feature request related to a problem or challenge? Please describe what you are trying to do.

    We had some oversights in the 5.0.0 release (https://github.com/apache/arrow-datafusion/issues/771) causing us not able to release the python binding and datafusion-cli.

    Describe the solution you'd like

    Release Datafusion 5.1.0 with an improved process to support python binding and cli releasse.

    Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

    Additional context see https://github.com/apache/arrow-datafusion/issues/887, https://github.com/apache/arrow-datafusion/issues/883 and https://github.com/apache/arrow-datafusion/issues/837

    Reviewed by houqp at 2021-08-15 19:10
  • 10. Add support for reading remote storage systems

    Which issue does this PR close?

    Closes #616

    Rationale for this change

    Currently, we can only read files from LocalFS since we use std::fs in ParquetExec. It would be nice to add support to read files that reside on storage sources such as HDFS, Amazon S3, etc.

    What changes are included in this PR?

    Introduce ObjectStore API as an abstraction of the underlying storage systems, such as local filesystem, HDFS, S3, etc. And make the ObjectStore implementation pluggable through the ObjectStoreRegistery in Datafusion's ExecutionContext.

    Are there any user-facing changes?

    Users can provide implementations for the ObjectStore trait, register it into ExecutionContext's registry, and run queries against data that resides in remote storage systems as well as local fs (the only default implementation of ObjectStore).

    Reviewed by yjshen at 2021-08-02 05:44
  • 11. Implement basic common subexpression eliminate optimization

    Which issue does this PR close?

    Resolve #566.

    Rationale for this change

    This pull request trying to implement the common subexpression eliminate optimization. The current implementation only supports detecting & eliminating common subexpressions under one logical plan, not cross plans.

    This optimizes is consists of two parts: detecting and replacing. It first scans all expressions we are going to rewrite (all expressions under one plan here), generates identifiers for each Expr node for later comparison. Then for the same group of expressions, we extract common expressions, put them into a Projection plan, and replace the original expression with a column Expr.

    This implementation doesn't cover the situation of eliminating the whole plan's common subexpression. This requires considering not only the expression itself, but also sharing data with different plans or some plan will modify the data (e.g. filter) or other aspects I haven't realize. In the beginning I plan to make it cross plans but it seems better to me to do them in another PR.

    What changes are included in this PR?

    Add optimize rule CommonSubexprEliminate.

    Are there any user-facing changes?

    I think there is, as the optimization rule will rewrite input SQL.

    Status

    This is still a work in progress. I have only covered part of plans and am trying to make it work with the entire execution procedure.

    Reviewed by waynexia at 2021-07-29 16:50
  • 12. InList: Don't treat Null as UTF8(None)

    Is it worth filing a follow on ticket to address? I am not 100% sure I understand what the end user facing bug would be (is it null is the wrong type?)

    Originally posted by @alamb in https://github.com/apache/arrow-datafusion/pull/2764#discussion_r905386562

    Reviewed by liukun4515 at 2022-06-24 01:41
  • 13. Update to arrow 17.0.0

    Draft until arrow 17.0.0 is released (see https://github.com/apache/arrow-rs/issues/1925)

    Major sources if challenge:

    1. https://github.com/apache/arrow-rs/pull/1871 (change to Decimal128 cc @viirya @liukun4515 )
    2. https://github.com/apache/arrow-rs/issues/1888 (enforce RecordBatch matches schema definition -- cc @andygrove )

    TODO:

    • [ ] Complete getting the tests to pass
    • [ ] File / fix issue for join output schema nullability
    Reviewed by alamb at 2022-06-23 18:55
  • 14. Draft: Automated generation of configs.md

    Which issue does this PR close?

    Closes #2770.

    Rationale for this change

    What changes are included in this PR?

    • Added a binary - print_config_docs to datafusion core which generates the config markdown table and prints it.
    • Added a script to dev (should this be in docs instead?) which runs this binary to update the configs.md file.

    Posting this as a draft because I'm not sure yet what the best plan is for fully automating updates to the config documentation. I looked into build.rs but I'm not sure it's the right fit for this since it runs before compilation and can't import from the module it is part of.

    One idea I had was to run the script in a github action alongside the tests. If the action runs and there are changes to configs.md, it could suggest the diff on the PR and then fail. It would then be a single click to update the PR to include the changes and restart the tests. There are a few actions on the marketplace for doing this, this one looks promising: https://github.com/marketplace/actions/action-git-diff-suggestions. Open to thoughts and alternatives 🙂 Edit doesn't look like this will work as I had hoped, since Github doesn't allow suggestions on files that haven't been touched by the PR. Maybe the best we can do is to fail if there are changes that need making.

    Are there any user-facing changes?

    Reviewed by mrob95 at 2022-06-23 15:58
  • 15. Allow configuration settings to be specified with environment variables

    Is your feature request related to a problem or challenge? Please describe what you are trying to do. Per https://12factor.net/config we should allow any configuration settings to be overridden by environment variables.

    For example, it should be possible to set the configuration option datafusion.execution.batch_size with the environment variable DATAFUSION_EXECUTION_BATCH_SIZE.

    Describe the solution you'd like As described above.

    Describe alternatives you've considered None

    Additional context None

    Reviewed by andygrove at 2022-06-23 15:33
  • 16. Support multiple paths for ListingTableScanNode

    Which issue does this PR close?

    Closes #2768.

    Rationale for this change

    Sometimes we need pass multi-path in ListingTableScanNode, like in spark https://github.com/apache/spark/blob/a8a765b3f302c078cb9519c4a17912cd38b9680c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala#L36-L43

    trait FileIndex {
    
    
      /**
       * Returns the list of root input paths from which the catalog will get files. There may be a
       * single root path from which partitions are discovered, or individual partitions may be
       * specified by each path.
       */
      def rootPaths: Seq[Path]
    

    Are there any user-facing changes?

    pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
            &self.table_paths
        }
    

    will return a vec

    Reviewed by Ted-Jiang at 2022-06-23 04:12
  • 17. InList: don't need to treat Null as UTF8 data type

    From this test case, we can know that we should not treat Null as UTF8 data type

    Originally posted by @liukun4515 in https://github.com/apache/arrow-datafusion/pull/2764#discussion_r904468132

    From the implemention:

    1. https://github.com/apache/arrow-datafusion/blob/3da59e8877620a095efae102b0177e7e6a76f65b/datafusion/physical-expr/src/expressions/in_list.rs#L163

    2. https://github.com/apache/arrow-datafusion/blob/3da59e8877620a095efae102b0177e7e6a76f65b/datafusion/physical-expr/src/expressions/in_list.rs#L111

    Reviewed by liukun4515 at 2022-06-23 02:46
A Rust DataFrame implementation, built on Apache Arrow

Rust DataFrame A dataframe implementation in Rust, powered by Apache Arrow. What is a dataframe? A dataframe is a 2-dimensional tabular data structure

May 1, 2022
Official Rust implementation of Apache Arrow

Native Rust implementation of Apache Arrow Welcome to the implementation of Arrow, the popular in-memory columnar format, in Rust. This part of the Ar

Jun 21, 2022
Fill Apache Arrow record batches from an ODBC data source in Rust.

arrow-odbc Fill Apache Arrow arrays from ODBC data sources. This crate is build on top of the arrow and odbc-api crate and enables you to read the dat

Jun 13, 2022
Polars is a blazingly fast DataFrames library implemented in Rust using Apache Arrow Columnar Format as memory model.
Polars is a blazingly fast DataFrames library implemented in Rust using Apache Arrow Columnar Format as memory model.

Polars Python Documentation | Rust Documentation | User Guide | Discord | StackOverflow Blazingly fast DataFrames in Rust, Python & Node.js Polars is

Jun 24, 2022
Apache TinkerPop from Rust via Rucaja (JNI)

Apache TinkerPop from Rust An example showing how to call Apache TinkerPop from Rust via Rucaja (JNI). This repository contains two directories: java

Mar 16, 2022
A new arguably faster implementation of Apache Spark from scratch in Rust

vega Previously known as native_spark. Documentation A new, arguably faster, implementation of Apache Spark from scratch in Rust. WIP Framework tested

Jun 24, 2022
Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. 🚀

flaco Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. ?? Have a gander at the initial benchmarks

May 4, 2022
ndarray: an N-dimensional array with array views, multidimensional slicing, and efficient operations

ndarray The ndarray crate provides an n-dimensional container for general elements and for numerics. Please read the API documentation on docs.rs or t

Jun 17, 2022
Dataframe structure and operations in Rust

Utah Utah is a Rust crate backed by ndarray for type-conscious, tabular data manipulation with an expressive, functional interface. Note: This crate w

May 2, 2022
A Rust crate that reads and writes tfrecord files

tfrecord-rust The crate provides the functionality to serialize and deserialize TFRecord data format from TensorFlow. Features Provide both high level

Jan 7, 2022
Fastest and safest Rust implementation of parquet. `unsafe` free. Integration-tested against pyarrow

Parquet2 This is a re-write of the official parquet crate with performance, parallelism and safety in mind. The five main differentiators in compariso

Jun 17, 2022
Orkhon: ML Inference Framework and Server Runtime
Orkhon: ML Inference Framework and Server Runtime

Orkhon: ML Inference Framework and Server Runtime Latest Release License Build Status Downloads Gitter What is it? Orkhon is Rust framework for Machin

Apr 28, 2022
Tiny, no-nonsense, self-contained, Tensorflow and ONNX inference
Tiny, no-nonsense, self-contained, Tensorflow and ONNX inference

Sonos' Neural Network inference engine. This project used to be called tfdeploy, or Tensorflow-deploy-rust. What ? tract is a Neural Network inference

Jun 26, 2022
ConnectorX - Fastest library to load data from DB to DataFrames in Rust and Python
ConnectorX - Fastest library to load data from DB to DataFrames in Rust and Python

ConnectorX enables you to load data from databases into Python in the fastest and most memory efficient way.

Jun 21, 2022
Provides a way to use enums to describe and execute ordered data pipelines. 🦀🐾

enum_pipline Provides a way to use enums to describe and execute ordered data pipelines. ?? ?? I needed a succinct way to describe 2d pixel map operat

Oct 29, 2021
AppFlowy is an open-source alternative to Notion. You are in charge of your data and customizations
AppFlowy is an open-source alternative to Notion. You are in charge of your data and customizations

AppFlowy is an open-source alternative to Notion. You are in charge of your data and customizations. Built with Flutter and Rust.

Jun 19, 2022
New generation decentralized data warehouse and streaming data pipeline
New generation decentralized data warehouse and streaming data pipeline

World's first decentralized real-time data warehouse, on your laptop Docs | Demo | Tutorials | Examples | FAQ | Chat Get Started Watch this introducto

Jun 16, 2022
An example repository on how to start building graph applications on streaming data. Just clone and start building 💻 💪
An example repository on how to start building graph applications on streaming data. Just clone and start building 💻 💪

An example repository on how to start building graph applications on streaming data. Just clone and start building ?? ??

May 23, 2022
This library provides a data view for reading and writing data in a byte array.

Docs This library provides a data view for reading and writing data in a byte array. This library requires feature(generic_const_exprs) to be enabled.

Mar 12, 2022