Apache Arrow DataFusion and Ballista query engines

Overview

DataFusion

DataFusion is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.

DataFusion supports both an SQL and a DataFrame API for building logical query plans as well as a query optimizer and execution engine capable of parallel execution against partitioned data sources (CSV and Parquet) using threads.

Use Cases

DataFusion is used to create modern, fast and efficient data pipelines, ETL processes, and database systems, which need the performance of Rust and Apache Arrow and want to provide their users the convenience of an SQL interface or a DataFrame API.

Why DataFusion?

  • High Performance: Leveraging Rust and Arrow's memory model, DataFusion achieves very high performance
  • Easy to Connect: Being part of the Apache Arrow ecosystem (Arrow, Parquet and Flight), DataFusion works well with the rest of the big data ecosystem
  • Easy to Embed: Allowing extension at almost any point in its design, DataFusion can be tailored for your specific usecase
  • High Quality: Extensively tested, both by itself and with the rest of the Arrow ecosystem, DataFusion can be used as the foundation for production systems.

Known Uses

Here are some of the projects known to use DataFusion:

(if you know of another project, please submit a PR to add a link!)

Example Usage

Run a SQL query against data stored in a CSV:

use datafusion::prelude::*;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
  // register the table
  let mut ctx = ExecutionContext::new();
  ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())?;

  // create a plan to run a SQL query
  let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;

  // execute and print results
  let results: Vec<RecordBatch> = df.collect().await?;
  print_batches(&results)?;
  Ok(())
}

Use the DataFrame API to process data stored in a CSV:

use datafusion::prelude::*;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
  // create the dataframe
  let mut ctx = ExecutionContext::new();
  let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;

  let df = df.filter(col("a").lt_eq(col("b")))?
          .aggregate(vec![col("a")], vec![min(col("b"))])?
          .limit(100)?;

  // execute and print results
  let results: Vec<RecordBatch> = df.collect().await?;
  print_batches(&results)?;
  Ok(())
}

Both of these examples will produce

+---+--------+
| a | MIN(b) |
+---+--------+
| 1 | 2      |
+---+--------+

Using DataFusion as a library

DataFusion is published on crates.io, and is well documented on docs.rs.

To get started, add the following to your Cargo.toml file:

[dependencies]
datafusion = "4.0.0-SNAPSHOT"

Using DataFusion as a binary

DataFusion also includes a simple command-line interactive SQL utility. See the CLI reference for more information.

Status

General

  • SQL Parser
  • SQL Query Planner
  • Query Optimizer
  • Constant folding
  • Join Reordering
  • Limit Pushdown
  • Projection push down
  • Predicate push down
  • Type coercion
  • Parallel query execution

SQL Support

  • Projection
  • Filter (WHERE)
  • Filter post-aggregate (HAVING)
  • Limit
  • Aggregate
  • Common math functions
  • cast
  • try_cast
  • Postgres compatible String functions
    • ascii
    • bit_length
    • btrim
    • char_length
    • character_length
    • chr
    • concat
    • concat_ws
    • initcap
    • left
    • length
    • lpad
    • ltrim
    • octet_length
    • regexp_replace
    • repeat
    • replace
    • reverse
    • right
    • rpad
    • rtrim
    • split_part
    • starts_with
    • strpos
    • substr
    • to_hex
    • translate
    • trim
  • Miscellaneous/Boolean functions
    • nullif
  • Common date/time functions
    • Basic date functions
    • Basic time functions
    • Basic timestamp functions
  • nested functions
    • Array of columns
  • Schema Queries
    • SHOW TABLES
    • SHOW COLUMNS
    • information_schema.{tables, columns}
    • information_schema other views
  • Sorting
  • Nested types
  • Lists
  • Subqueries
  • Common table expressions
  • Set Operations
    • UNION ALL
    • UNION
    • INTERSECT
    • MINUS
  • Joins
    • INNER JOIN
    • LEFT JOIN
    • RIGHT JOIN
    • CROSS JOIN
    • OUTER JOIN
  • Window

Data Sources

  • CSV
  • Parquet primitive types
  • Parquet nested types

Extensibility

DataFusion is designed to be extensible at all points. To that end, you can provide your own custom:

  • User Defined Functions (UDFs)
  • User Defined Aggregate Functions (UDAFs)
  • User Defined Table Source (TableProvider) for tables
  • User Defined Optimizer passes (plan rewrites)
  • User Defined LogicalPlan nodes
  • User Defined ExecutionPlan nodes

Supported SQL

This library currently supports many SQL constructs, including

  • CREATE EXTERNAL TABLE X STORED AS PARQUET LOCATION '...'; to register a table's locations
  • SELECT ... FROM ... together with any expression
  • ALIAS to name an expression
  • CAST to change types, including e.g. Timestamp(Nanosecond, None)
  • most mathematical unary and binary expressions such as +, /, sqrt, tan, >=.
  • WHERE to filter
  • GROUP BY together with one of the following aggregations: MIN, MAX, COUNT, SUM, AVG
  • ORDER BY together with an expression and optional ASC or DESC and also optional NULLS FIRST or NULLS LAST

Supported Functions

DataFusion strives to implement a subset of the PostgreSQL SQL dialect where possible. We explicitly choose a single dialect to maximize interoperability with other tools and allow reuse of the PostgreSQL documents and tutorials as much as possible.

Currently, only a subset of the PosgreSQL dialect is implemented, and we will document any deviations.

Schema Metadata / Information Schema Support

DataFusion supports the showing metadata about the tables available. This information can be accessed using the views of the ISO SQL information_schema schema or the DataFusion specific SHOW TABLES and SHOW COLUMNS commands.

More information can be found in the Postgres docs).

To show tables available for use in DataFusion, use the SHOW TABLES command or the information_schema.tables view:

> show tables;
+---------------+--------------------+------------+------------+
| table_catalog | table_schema       | table_name | table_type |
+---------------+--------------------+------------+------------+
| datafusion    | public             | t          | BASE TABLE |
| datafusion    | information_schema | tables     | VIEW       |
+---------------+--------------------+------------+------------+

> select * from information_schema.tables;

+---------------+--------------------+------------+--------------+
| table_catalog | table_schema       | table_name | table_type   |
+---------------+--------------------+------------+--------------+
| datafusion    | public             | t          | BASE TABLE   |
| datafusion    | information_schema | TABLES     | SYSTEM TABLE |
+---------------+--------------------+------------+--------------+

To show the schema of a table in DataFusion, use the SHOW COLUMNS command or the or information_schema.columns view:

> show columns from t;
+---------------+--------------+------------+-------------+-----------+-------------+
| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |
+---------------+--------------+------------+-------------+-----------+-------------+
| datafusion    | public       | t          | a           | Int32     | NO          |
| datafusion    | public       | t          | b           | Utf8      | NO          |
| datafusion    | public       | t          | c           | Float32   | NO          |
+---------------+--------------+------------+-------------+-----------+-------------+

>   select table_name, column_name, ordinal_position, is_nullable, data_type from information_schema.columns;
+------------+-------------+------------------+-------------+-----------+
| table_name | column_name | ordinal_position | is_nullable | data_type |
+------------+-------------+------------------+-------------+-----------+
| t          | a           | 0                | NO          | Int32     |
| t          | b           | 1                | NO          | Utf8      |
| t          | c           | 2                | NO          | Float32   |
+------------+-------------+------------------+-------------+-----------+

Supported Data Types

DataFusion uses Arrow, and thus the Arrow type system, for query execution. The SQL types from sqlparser-rs are mapped to Arrow types according to the following table

SQL Data Type Arrow DataType
CHAR Utf8
VARCHAR Utf8
UUID Not yet supported
CLOB Not yet supported
BINARY Not yet supported
VARBINARY Not yet supported
DECIMAL Float64
FLOAT Float32
SMALLINT Int16
INT Int32
BIGINT Int64
REAL Float64
DOUBLE Float64
BOOLEAN Boolean
DATE Date32
TIME Time64(TimeUnit::Millisecond)
TIMESTAMP Date64
INTERVAL Not yet supported
REGCLASS Not yet supported
TEXT Not yet supported
BYTEA Not yet supported
CUSTOM Not yet supported
ARRAY Not yet supported

Architecture Overview

There is no formal document describing DataFusion's architecture yet, but the following presentations offer a good overview of its different components and how they interact together.

  • (March 2021): The DataFusion architecture is described in Query Engine Design and the Rust-Based DataFusion in Apache Arrow: recording (DataFusion content starts ~ 15 minutes in) and slides
  • (Feburary 2021): How DataFusion is used within the Ballista Project is described in *Ballista: Distributed Compute with Rust and Apache Arrow: recording

Developer's guide

Please see Developers Guide for information about developing DataFusion.

Comments
  • Experimenting with arrow2

    Experimenting with arrow2

    I have been experimenting using arrow2 & parquet2 as backends for DataFusion. This is WIP and does not compile, but I would like to give some visibility to this work.

    So far, I was able to keep all functionality, with a guaranteed increase in security and potentially some performance.

    Goals:

    • compile
    • tests pass
    • re-write readers and writers to leverage parallelism of both parquet2 and the CSV reader in arrow2
    • ???
    • profit

    Some notes:

    • I removed CastOptions because casting does not need to be fallible; we can make any non-castable null and recover the set of failed casts from the differences in validity between the original array and the casted array, if so we wish.

    • most kernels in Arrow2 return a Box<dyn Array> for some reasons; we use result.into() to convert to Arc. This is very cheap and the best that arrow has to offer without the unstable channel.

    • There are some changes in min and max for floats. Essentially, arrow2 guarantees that the comparison operator used in sort for floats is the same as the one used in min/max, which required this small change due to Ord for floats still be part of unstable rust.

    documentation ballista datafusion sql python 
    opened by jorgecarleitao 70
  • add udf/udaf plugin

    add udf/udaf plugin

    closes https://github.com/apache/arrow-datafusion/issues/1882

    In this PR, I have implemented the plug-in of UDF. In the next PR, I will complete the serialization and deserialization of UDF / udaf by ballista relying on UDF plugin.

    ballista datafusion 
    opened by EricJoy2048 50
  • Rework GroupByHash for faster performance and support grouping by nulls

    Rework GroupByHash for faster performance and support grouping by nulls

    Rationale

    1. The current GroupByHash operator does not take NULL into account and thus produces incorrect answers when grouping on columns that contain NULL, as described in #782 and #781.
    2. Without additional changes, adding support for NULL in grouping will likely both slow down group by hashing as well as increase the memory overhead per group (e.g. see #786)

    Thus this ticket proposes to rearrange the GroupByHash code to be more efficient in both space and time, thus providing us with a performance budget to add NULL support without an overall regression

    Overview of Current GroupByHash

    This section explains the current state of GroupByHash on master at https://github.com/apache/arrow-datafusion/commit/54163410da05e8e6c68af55d699bf6a89e229bb6

    At a high level, the group by hash does the following for each input row, in a vectorized fashion:

    1. Compute the group by key values (the expressions that appear in the GROUP BY clause)
    2. Form a key out of the group by values
    3. Find/Create an entry in hash map of (key values) --> (accumulators)
    4. Update the accumulators (one for each aggregate, such as COUNT that appears in the query) with the arguments

    When all the input has been processed, then the hash table is drained, producing one row for each entry in the hash table, in the following manner:

    (group key1, group key2, ...) (aggregate1, aggregrate2, ...)
    

    So for example, given a query such as

    SELECT SUM(c1)
    FROM t
    GROUP BY k1, abs(k2)
    

    This looks something like

     ┌────┐ ┌────┐               ┌────┐ ┌────┐         ┌─────────────┐
     │    │ │    │               │    │ │    │────────▶│  key 1234   │
     │    │ │    │               │    │ │    │         └─────────────┘
     │    │ │    │               │    │ │    │
     │    │ │    │               │    │ │    │         ┌─────────────┐
     │    │ │    │               │    │ │    │────────▶│   key 23    │
     │    │ │    │               │    │ │abs │         └─────────────┘
     │ k1 │ │ k2 │──────────────▶│ k1 │ │(k2)│
     │    │ │    │               │    │ │    │
     │    │ │    │               │    │ │    │        ...
     │    │ │    │               │    │ │    │
     │    │ │    │               │    │ │    │
     │    │ │    │               │    │ │    │         ┌─────────────┐
     │    │ │    │               │    │ │    │────────▶│   key 321   │
     └────┘ └────┘               └────┘ └────┘         └─────────────┘
                                  group by key
      input data                     values
                      step 1:    (group_values)        step 2:
                     evaluate               create a variable sized hash
                     gby exprs                 table key for each row
    

    The hash table that is formed looks something like this:

    ┌───────────────┐         ┌──────────────────┬────────────┬─────────────────┐
    │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │
    ││  key 1234   ├┼────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     │
    │└─────────────┘│         │ formed key 1234  │            │ (scratch space) │
    │               │         └──────────────────┴────────────┴─────────────────┘
    │               │
    │     ...       │
    │               │                  ...
    │               │
    │               │
    │               │
    │               │         ┌──────────────────┬────────────┬─────────────────┐
    │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │
    ││   key 321   │├────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     │
    │└─────────────┘│         │  formed key 321  │            │ (scratch space) │
    └───────────────┘         └──────────────────┴────────────┴─────────────────┘
      hash table
    "accumulators"
    
    
               Step 3:                   NOTE:  Each entry in the hash table has
     The keys are used to find an        1. The original group keys
       entry in the hash table           2. The accumulators
        which then are mapped            3. row_indexes scratch space
    

    Key Formation

    The current state of the art, introduced in 93de66ae67a33764ac4029b0f825c415b9b2e92d / https://github.com/apache/arrow/pull/8863 by @Dandandan is quite clever. The code in create_key, packs data from the group keys together into a single mut Vec which is then used as the key for the hash table

    For example, if the input row was:

    {
      k1: "foo"
      k2: 0x1234 as u16
    }
    

    The resuling key is a 13 byte Vec, 11 bytes for "foo" (8 bytes for the length + 3 bytes for "foo") and 2 bytes for 0x1234, a 16 bit integer:

                            │        │
         string len                      0x1234
        (as usize le)       │  "foo" │    as le
    ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
    │00│00│00│00│00│00│00│03│"f│"o│"o│34│12│
    └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘   byte
     0  1  2  3  4  5  6  7 │8  9  10│11 12   offset
    
                            │        │
    
    

    However, there are at least a few downsides of this approach:

    1. There is no way to represent NULL as mentioned by @Dandandan on https://github.com/apache/arrow-datafusion/issues/782#issuecomment-887698068
    2. The data for each group key value is currently stored twice -- once in the Vec key and once in the values as a GroupByScalar used to produce the output -- resulting in memory overhead, especially for variable length (e.g. string) values

    Proposal

    Modeled after what I think @Dandandan is suggesting in https://github.com/apache/arrow-datafusion/pull/786#issuecomment-888229063:

    The HashTable would not store the key or aggregate accumulators directly, but instead would map "signatures" computed from the group by keys to list offsets in a mutable storage area that contained the values and aggregates.

    The "signature" is simply a hash of the values (and validitiy bit) of the group key values. The term "signature" is used to avoid confusion with the hash used in the hash table. It would be computed as a u64 or similar directly from the group by key values

    ┌────┐ ┌────┐         ┌─────────────┐
    │    │ │    │────────▶│   0x1133    │
    │    │ │    │         └─────────────┘
    │    │ │    │         ┌─────────────┐
    │    │ │    │────────▶│   0x432A    │
    │    │ │    │         └─────────────┘
    │    │ │abs │
    │ k1 │ │(k2)│
    │    │ │    │         ...
    │    │ │    │
    │    │ │    │
    │    │ │    │
    │    │ │    │         ┌─────────────┐
    │    │ │    │────────▶│   0x432A    │
    └────┘ └────┘         └─────────────┘
     group by key
        values
    (group_values)
    
    Step 2: Create a FIXED LENGTH signature
      (e.g. u64) by hashing the values in
                the group by key
    

    The hashtable composition would be different. Each entry is a SmallVec (non allocating Vec) containing a list of indicies into a "mutable storage" area

    ┌───────────────┐
    │┌─────────────┐│    ┌───────┐                       ┌──────┐┌──────┐ ┌────────────┐
    ││   0x1133    ├┼───▶│  [1]  │─ ─ ─ ─ ─ ─            │      ││      │ │            │
    │└─────────────┘│    └───────┘           └ ─ ─ ─ ─ ─ │      ││      │ │            │
    │               │                                    │  k1  ││ abs  │ │            │
    │               │                         ─ ─ ─ ─ ─ ▶│values││ (k2) │ │            │
    │     ...       │                        │           │      ││values│ │Accumulators│
    │               │       ...                          │ (and ││      │ │  for SUM   │
    │               │                        │           │valid ││ (and │ │            │
    │               │                         ─ ─ ─ ─ ─ ▶│mask) ││valid │ │            │
    │               │                        │           │      ││mask) │ │            │
    │               │                                    │      ││      │ │            │
    │┌─────────────┐│    ┌───────┐           │           │      ││      │ │            │
    ││   0x432A    │├───▶│ [2,4] │─ ─ ─ ─ ─ ─            └──────┘└──────┘ └────────────┘
    │└─────────────┘│    └───────┘
    └───────────────┘               values are lists
     keys are gby key               (SmallVec) of         mutable storage
     signatures                     offsets into
                                    storage tables
    hashtable
    

    Edit: Collisions are handled by the fact that the entry in the hash table is a list of indices into the mutable storage area -- if there are multiple values in that list each entry in the mutable area needs to be checked for equality to find the correct one.

    The mutable storage area contains:

    1. A Vec of ScalarValues for each group key column
    2. The Vec of accumulators for each grouping

    For example, this is one example of how (logically) this mutable storage would work

            valid          valid
             bit            bit
            mask           mask
     ┌────┐┌────┐   ┌────┐┌────┐   ┌────┐
     │"D" ││ t  │   │ 1  ││ t  │   │ 11 │
     ├────┤├────┤   ├────┤├────┤   ├────┤
     │"C" ││ t  │   │ 3  ││ t  │   │ 3  │
     ├────┤├────┤   ├────┤├────┤   ├────┤
     │"A" ││ t  │   │ 1  ││ t  │   │ 27 │
     ├────┤├────┤   ├────┤├────┤   ├────┤
     │"D" ││ t  │   │ 2  ││ t  │   │ 2  │
     ├────┤├────┤   ├────┤├────┤   ├────┤
     │ "" ││ t  │   │ 0  ││ f  │   │ 4  │
     └────┘└────┘   └────┘└────┘   └────┘
    
       group by key storage      Accumulator
            (5 groups)             for SUM
                                (aggregates)
    
    
       Example showing how groups
      (D,1), (C,3), (A,1), (D,2),
          (NULL, 0) are stored
    

    I propose using Vec<ScalarValue> to store the group key values in the mutable area as there is no equivalent of a mutable Array in arrow-rs yet (though I think there is MutablePrimitiveArray in arrow2). If/when we get access to a mutable array in datafusion, we can potentially switch to using that representation for the mutable storage area, which would likely both take less memory for some data types, but also allow for faster output generation.

    Alternatives considered

    One alternate that would require fewer changes but be slightly slower would be to append a validity bitmap on the end of both the keys and values in the hash table. For example

          Alternate Design
     ┌───────────────┐         ┌──────────────────┬────────────┬─────────────────╦ ═ ═ ═ ═ ═ ═ ═ ═ ╗
     │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │  Null Bitmask
     ││  key 1234   ├┼────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     ║      (NEW)      ║
     │└─────────────┘│         │ formed key 1234  │            │ (scratch space) │
     │               │         └──────────────────┴────────────┴─────────────────╩ ═ ═ ═ ═ ═ ═ ═ ═ ╝
     │               │
     │     ...       │
     │               │                  ...
     │               │
     │               │
     │               │
     │               │         ┌──────────────────┬────────────┬─────────────────╦ ═ ═ ═ ═ ═ ═ ═ ═ ╗
     │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │  Null Bitmask
     ││  key 3211   │├────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     ║      (NEW)      ║
     │└─────────────┘│         │ formed key 3211  │            │ (scratch space) │
     └───────────────┘         └──────────────────┴────────────┴─────────────────╩ ═ ═ ═ ═ ═ ═ ═ ═ ╝
    

    And the keys would have a null bitmask appended on the end:

                                               │        │
                            string len                      0x1234
                           (as usize le)       │  "foo" │    as le
    {                  ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──╦ ═
      k1: "foo"        │00│00│00│00│00│00│00│03│"f│"o│"0│34│12│00║◀ ─ ─
      k2: 0x1234u16    └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──╩ ═      │
    }                   0  1  2  3  4  5  6  7 │8  9  10│11 12 13
                                                                       │
                                               │        │
                                                                       │
    
                                                                New bitmask at
                                                                 end of each
                                                                     key
    
    
    enhancement 
    opened by alamb 48
  • S3 Support

    S3 Support

    Now that there is a PR in-flight for remote storage systems (#811 ), I wanted to ask what the plan was to implement s3 support. Is this something that will live within the datafusion project or maintained outside? Also, does anyone plan on working on this, or is this something that you are looking for a contributor for?

    Also, I'm new to the project, so what is the best way to ask these kind of questions...github issues or on the email list? thanks!

    enhancement 
    opened by ehenry2 39
  • Support qualified columns in queries

    Support qualified columns in queries

    This turned out to be a much larger/destructive PR than I initially expected. Would like to get some early feedback on the approach before I spend more time working on the clean up. So far I have been able to get all unit tests to pass.

    TODO:

    • [x] Address FIXMEs and TODOs
    • [x] Check integration tests
    • [x] Rebase to latest master

    Which issue does this PR close?

    closes #56 closes #57 closes #311

    What changes are included in this PR?

    Here is the main design change introduced in this PR:

    • Physical column expression now references columns by unique indices instead of names
    • Logical column expression now wraps around a newly defined Column struct that can represent both qualified and unqualified columns

    Query execution flow change:

    1. When a TableScan plan has table_name set to Some(name), all of the fields in its schema will be created as fully qualified fields.
    2. Logical plan builder is responsible for normalizing all logical column expressions by adding qualifier based on schema wherever applicable.
    3. Logical plan optimizer operates on normalized column expressions.
    4. During physical planning, logical column expressions are resolved to physical column expressions with corresponding indices based on logical plan schemas. Notice a side effect of this is we don't look up column index during execution anymore. It is now done at planning time.
    5. During physical planning, physical schema (arrow schema) has all column qualifiers stripped.

    Some other changes introduced in this PR to help make all tests pass:

    • avoid coalesce for hash repartition plan
    • added partitioned hash join tests to hash_join module
    • added support for join with alias (for self join) https://github.com/apache/arrow-datafusion/pull/547
    • added join_using method to logical plan builder
    • fixed cross join handling in projection push down in optimizer (schema fields not trimmed based on pushed down table scan projections)
    • fixed join handling in projection push down in optimizer (schema fields not trimmed based on pushed down table scan projections)
    • produce unique join columns when using join constraint is being used
    • ser/de physical plans in ballista without going through physical planner
    • fixed couple other bugs here and there along the way, but couldn't remember :(

    Output field name are not 100% conforming to https://github.com/apache/arrow-datafusion/blob/master/docs/specification/output-field-name-semantic.md yet. The remaining differences are all minor formatting issues like converting function names to lower cases, so I decided to leave that to follow up PRs to reduce the diff.

    Are there any user-facing changes?

    breaking api changes:

    • Column expression now wraps Column struct instead of String
    • TableScan plan now takes table_name as Option instead of String
    • Various dataframe scan method now takes table name as Option instead of &str
    • Physical Column expression now requires index field
    • logical planer builder join method now takes left and right keys as Vec<impl Into<Column>> instead of &[&str]
    enhancement ballista datafusion api change 
    opened by houqp 39
  • Release a new version of DataFusion to crates.io

    Release a new version of DataFusion to crates.io

    Is your feature request related to a problem or challenge? Please describe what you are trying to do. The current version of datafusion on crates.io is 4.0.0 (which among other things doesn't work with the newly released arrow 5.0.0)

    Describe the solution you'd like It would be great if we released a DataFusion 5.0.0 (or some other number if we want to diverge from arrow) -- doing so would likely involve porting the arrow-rs release scripts, from https://github.com/apache/arrow-rs/tree/master/dev/release and then sending it to the dev mailing list for a formal vote

    Starting this ticket to gather some feedback

    enhancement 
    opened by alamb 36
  • Switch to object_store crate (#2489)

    Switch to object_store crate (#2489)

    Which issue does this PR close?

    Closes #2489

    Rationale for this change

    See ticket

    What changes are included in this PR?

    Switches DataFusion to using object_store crate in place

    Are there any user-facing changes?

    Yes this moves to using the object_store crate.

    Does this PR break compatibility with Ballista?

    Possibly

    api change core 
    opened by tustvold 35
  • Avro Table Provider

    Avro Table Provider

    Which issue does this PR close?

    Closes #903.

    Rationale for this change

    Enables loading avro data files through datafusion.

    What changes are included in this PR?

    Avro is added as a table provider and a supported file format. Avro schemas can be translated into arrow schemas.

    Are there any user-facing changes?

    Yes, as one can now call register_avro on df and 'STORED AS AVRO' in sqsl.

    N.B.:

    • Need to add tests in avro_to_arrow/arrow_array_reader.rs

    Missing :

    • Writing back to avro

    I find there is duplication between modules with these additions, I should probably do some refactoring.

    enhancement ballista datafusion sql 
    opened by Igosuki 35
  • Release of DataFusion: 7.0.0

    Release of DataFusion: 7.0.0

    I wonder if it is time to release a new version of datafusion to crates.io?

    It would be great to crowdsource:

    1. Update readme / changelog
    2. Update version
    3. (maybe) a blog post?

    I am happy to handle creating a release candidate / doing the official voting process.

    enhancement 
    opened by alamb 34
  • Release Datafusion 6.0.0

    Release Datafusion 6.0.0

    Is your feature request related to a problem or challenge? Please describe what you are trying to do.

    We had some oversights in the 5.0.0 release (https://github.com/apache/arrow-datafusion/issues/771) causing us not able to release the python binding and datafusion-cli.

    Describe the solution you'd like

    Release Datafusion 5.1.0 with an improved process to support python binding and cli releasse.

    Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

    Additional context see https://github.com/apache/arrow-datafusion/issues/887, https://github.com/apache/arrow-datafusion/issues/883 and https://github.com/apache/arrow-datafusion/issues/837

    enhancement 
    opened by houqp 31
  • Add support for reading remote storage systems

    Add support for reading remote storage systems

    Which issue does this PR close?

    Closes #616

    Rationale for this change

    Currently, we can only read files from LocalFS since we use std::fs in ParquetExec. It would be nice to add support to read files that reside on storage sources such as HDFS, Amazon S3, etc.

    What changes are included in this PR?

    Introduce ObjectStore API as an abstraction of the underlying storage systems, such as local filesystem, HDFS, S3, etc. And make the ObjectStore implementation pluggable through the ObjectStoreRegistery in Datafusion's ExecutionContext.

    Are there any user-facing changes?

    Users can provide implementations for the ObjectStore trait, register it into ExecutionContext's registry, and run queries against data that resides in remote storage systems as well as local fs (the only default implementation of ObjectStore).

    enhancement ballista datafusion 
    opened by yjshen 31
  • test cases in the UT level for page filter

    test cases in the UT level for page filter

        No, because there is no UT for the the `page filter`, but I can add some test cases in the follow up pr for the page filter.
    

    Originally posted by @liukun4515 in https://github.com/apache/arrow-datafusion/pull/4742#discussion_r1059917975

    opened by liukun4515 0
  • fix: ListingSchemaProvider directory paths (related: #4204)

    fix: ListingSchemaProvider directory paths (related: #4204)

    • Append trailing slash to table paths if they are directories

    Which issue does this PR close?

    Is similar to #4204 - inability to use an object store listing for a table schema.

    However this PR addresses tables generated from ListingSchemaProvider.

    Rationale for this change

    I would like to set up an object store (s3) where each directory maps to a single table/schema, with the contents being made up of all files (parquet) inside the directory. By registering the schema provider like:

    Arc::new(ListingSchemaProvider::new(
            String::from(format!("s3://{bucket_name}")),
            "".into(),
            Arc::new(ListingTableFactory::default()),
            s3.clone(),
            String::from("PARQUET"),
            false,
        ));
    

    Then if there is a folder in the bucket, such as userdata, attempting to query against userdata table causes the s3 client to 404, as the provider creates ListingTables with paths set to the raw directory names, eg userdata, and in datafusion/core/src/datasource/listing/url.rs:149, we have:

            // If the prefix is a file, use a head request, otherwise list
            let is_dir = self.url.as_str().ends_with('/');
    

    Since the paths don't end with /, it treats the directories as files and attempts to perform head on them instead of listing them.

    This PR remedies this scenario, allowing the query to succeed.

    What changes are included in this PR?

    ListingSchemaProvider is altered to track whether the table paths it has listed are directories or files. If they are directories, it creates the ListingTables with a '/' appended to the stringified table path, allowing the ListingTable to successfully list its contents.

    Are these changes tested?

    Some light unit tests added.

    Are there any user-facing changes?

    No

    core 
    opened by cfraz89 0
  • Minimize stack space required to plan deeply nested binary expressions

    Minimize stack space required to plan deeply nested binary expressions

    Which issue does this PR close?

    Re https://github.com/apache/arrow-datafusion/issues/4065 from @andygrove

    Rationale for this change

    This is a follow on to https://github.com/apache/arrow-datafusion/pull/4779 which reduced the stack space needed to plan in debug builds. This PR minimizes the stack required as much as I know how.

    What changes are included in this PR?

    Special case BinaryOp (see comments for explination)

    Are these changes tested?

    Yes (existing + new tests)

    Are there any user-facing changes?

    No

    sql 
    opened by alamb 0
  • Stack overflows when planning tpcds 22 in debug mode

    Stack overflows when planning tpcds 22 in debug mode

    Describe the bug While we fixed some stack overflows in https://github.com/apache/arrow-datafusion/issues/4065

    When planning some complex queries in debug mode, DataFusion will overflow its stack

    This happens on the CI builders

    To Reproduce Unignore tests in tpcds_planning suite:

    diff --git a/datafusion/core/tests/tpcds_planning.rs b/datafusion/core/tests/tpcds_planning.rs
    index 7359f3906..1e3cea8be 100644
    --- a/datafusion/core/tests/tpcds_planning.rs
    +++ b/datafusion/core/tests/tpcds_planning.rs
    @@ -343,7 +343,6 @@ async fn tpcds_logical_q63() -> Result<()> {
         create_logical_plan(63).await
     }
     
    -#[ignore] // thread 'q64' has overflowed its stack]
     #[tokio::test]
     async fn tpcds_logical_q64() -> Result<()> {
         create_logical_plan(64).await
    @@ -851,7 +850,6 @@ async fn tpcds_physical_q63() -> Result<()> {
         create_physical_plan(63).await
     }
     
    -#[ignore] // thread 'q64' has overflowed its stack
     #[tokio::test]
     async fn tpcds_physical_q64() -> Result<()> {
         create_physical_plan(64).await
    

    Run on my machine (MacOS) like:

    RUST_MIN_STACK=1000000 cargo  test --test tpcds_planning 
    ...
    running 198 tests
    test tpcds_logical_q22 ... ok
    
    thread 'tpcds_logical_q1' has overflowed its stack
    fatal runtime error: stack overflow
    error: test failed, to rerun pass `-p datafusion --test tpcds_planning`
    

    Expected behavior No stack overflow

    Additional context

    bug 
    opened by alamb 2
  • Minor: Refactor some sql planning code into functions

    Minor: Refactor some sql planning code into functions

    ~Draft as it builds on https://github.com/apache/arrow-datafusion/pull/4779~

    Which issue does this PR close?

    re https://github.com/apache/arrow-datafusion/issues/4065

    Rationale for this change

    I am trying to make the code in sql_expr_to_logical_expr as uniform as possible to avoid reintroducing issues like https://github.com/apache/arrow-datafusion/issues/4065

    What changes are included in this PR?

    Extract a few more functions

    Are these changes tested?

    Existing tests

    Are there any user-facing changes?

    No

    sql 
    opened by alamb 0
  •  Make datafusion-physical-expr compatible with blake3/traits-preview feature.

    Make datafusion-physical-expr compatible with blake3/traits-preview feature.

    Which issue does this PR close?

    Closes #4781.

    What changes are included in this PR?

    Makes datafusion-physical-expr compatible with blake3/traits-preview feature.

    Are there any user-facing changes?

    No

    physical-expr 
    opened by BoredPerson 1
Owner
The Apache Software Foundation
The Apache Software Foundation
A Rust DataFrame implementation, built on Apache Arrow

Rust DataFrame A dataframe implementation in Rust, powered by Apache Arrow. What is a dataframe? A dataframe is a 2-dimensional tabular data structure

Wakahisa 287 Nov 11, 2022
Official Rust implementation of Apache Arrow

Native Rust implementation of Apache Arrow Welcome to the implementation of Arrow, the popular in-memory columnar format, in Rust. This part of the Ar

The Apache Software Foundation 1.3k Jan 9, 2023
Fill Apache Arrow record batches from an ODBC data source in Rust.

arrow-odbc Fill Apache Arrow arrays from ODBC data sources. This crate is build on top of the arrow and odbc-api crate and enables you to read the dat

Markus Klein 21 Dec 27, 2022
Polars is a blazingly fast DataFrames library implemented in Rust using Apache Arrow Columnar Format as memory model.

Polars Python Documentation | Rust Documentation | User Guide | Discord | StackOverflow Blazingly fast DataFrames in Rust, Python & Node.js Polars is

null 11.8k Jan 8, 2023
🦖 Evolve your fixed length data files into Apache Arrow tables, fully parallelized!

?? Evolve your fixed length data files into Apache Arrow tables, fully parallelized! ?? Overview ... ?? Installation The easiest way to install evolut

Firelink Data 3 Dec 22, 2023
Integration between arrow-rs and extendr

arrow_extendr arrow-extendr is a crate that facilitates the transfer of Apache Arrow memory between R and Rust. It utilizes extendr, the {nanoarrow} R

Josiah Parry 8 Nov 24, 2023
Arrow User-Defined Functions Framework on WebAssembly.

Arrow User-Defined Functions Framework on WebAssembly Example Build the WebAssembly module: cargo build --release -p arrow-udf-wasm-example --target w

RisingWave Labs 3 Dec 14, 2023
Apache TinkerPop from Rust via Rucaja (JNI)

Apache TinkerPop from Rust An example showing how to call Apache TinkerPop from Rust via Rucaja (JNI). This repository contains two directories: java

null 8 Sep 27, 2022
A new arguably faster implementation of Apache Spark from scratch in Rust

vega Previously known as native_spark. Documentation A new, arguably faster, implementation of Apache Spark from scratch in Rust. WIP Framework tested

raja sekar 2.1k Jan 5, 2023
Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. 🚀

flaco Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. ?? Have a gander at the initial benchmarks

Miles Granger 14 Oct 31, 2022
A fast, powerful, flexible and easy to use open source data analysis and manipulation tool written in Rust

fisher-rs fisher-rs is a Rust library that brings powerful data manipulation and analysis capabilities to Rust developers, inspired by the popular pan

Syed Vilayat Ali Rizvi 5 Aug 31, 2023
A fast, powerful, flexible and easy to use open source data analysis and manipulation tool written in Rust

fisher-rs fisher-rs is a Rust library that brings powerful data manipulation and analysis capabilities to Rust developers, inspired by the popular pan

null 5 Sep 6, 2023
ndarray: an N-dimensional array with array views, multidimensional slicing, and efficient operations

ndarray The ndarray crate provides an n-dimensional container for general elements and for numerics. Please read the API documentation on docs.rs or t

null 2.6k Jan 7, 2023
Dataframe structure and operations in Rust

Utah Utah is a Rust crate backed by ndarray for type-conscious, tabular data manipulation with an expressive, functional interface. Note: This crate w

Suchin 139 Sep 26, 2022
A Rust crate that reads and writes tfrecord files

tfrecord-rust The crate provides the functionality to serialize and deserialize TFRecord data format from TensorFlow. Features Provide both high level

null 22 Nov 3, 2022
Fastest and safest Rust implementation of parquet. `unsafe` free. Integration-tested against pyarrow

Parquet2 This is a re-write of the official parquet crate with performance, parallelism and safety in mind. The five main differentiators in compariso

Jorge Leitao 237 Jan 1, 2023
Orkhon: ML Inference Framework and Server Runtime

Orkhon: ML Inference Framework and Server Runtime Latest Release License Build Status Downloads Gitter What is it? Orkhon is Rust framework for Machin

Theo M. Bulut 129 Dec 21, 2022
Tiny, no-nonsense, self-contained, Tensorflow and ONNX inference

Sonos' Neural Network inference engine. This project used to be called tfdeploy, or Tensorflow-deploy-rust. What ? tract is a Neural Network inference

Sonos, Inc. 1.5k Jan 2, 2023
ConnectorX - Fastest library to load data from DB to DataFrames in Rust and Python

ConnectorX enables you to load data from databases into Python in the fastest and most memory efficient way.

SFU Database Group 939 Jan 5, 2023