Fastest and safest Rust implementation of parquet. `unsafe` free. Integration-tested against pyarrow

Overview

Parquet2

This is a re-write of the official parquet crate with performance, parallelism and safety in mind.

The five main differentiators in comparison with parquet are:

  • does not use unsafe
  • delegates parallelism downstream
  • decouples reading (IO intensive) from computing (CPU intensive)
  • deletages decompressing and decoding batches downstream
  • it is faster (10-20x when reading to arrow format)
  • Is integration-tested against pyarrow 3 and (py)spark 3

The overall idea is to offer the ability to read compressed parquet pages and a toolkit to decompress them to their favourite in-memory format.

This allows this crate's iterators to perform minimal CPU work, thereby maximizing throughput. It is up to the consumers to decide whether they want to take advantage of this through parallelism at the expense of memory usage (e.g. decompress and deserialize pages in threads) or not.

Functionality implemented

  • Read dictionary pages
  • Read and write V1 pages
  • Read and write V2 pages
  • Compression and de-compression (all)

Functionality not (yet) implemented

The parquet format has multiple encoding strategies for the different physical types. This crate currently reads from almost all of them, and supports encoding to a subset of them. They are:

Supported decoding

Delta-encodings are still experimental, as I have been unable to generate large pages encoded with them from spark, thereby hindering robust integration tests.

Encoding

Organization

  • read: read metadata and pages
  • write: write metadata and pages
  • metadata: parquet files metadata (e.g. FileMetaData)
  • schema: types metadata declaration (e.g. ConvertedType)
  • types: physical type declaration (i.e. how things are represented in memory). So far unused.
  • compression: compression (e.g. Gzip)
  • error: errors declaration
  • serialization: convert from bytes to rust native typed buffers (Vec<Option<T>>).

Note that serialization is not very robust. It serves as a playground to understand the specification and how to serialize and deserialize pages.

Run unit tests

There are tests being run against parquet files generated by pyarrow. To ignore these, use PARQUET2_IGNORE_PYARROW_TESTS= cargo test. To run then, you will need to run

python3 -m venv venv
venv/bin/pip install pip --upgrade
venv/bin/pip install pyarrow==3
venv/bin/python integration/write_pyarrow.py

before. This is only needed once (per change in the integration/write_pyarrow.py).

How to use

use std::fs::File;

use parquet2::read::{Page, read_metadata, get_page_iterator};

let mut file = File::open("testing/parquet-testing/data/alltypes_plain.parquet").unwrap();

/// here we read the metadata.
let metadata = read_metadata(&mut file)?;

/// Here we get an iterator of pages (each page has its own data)
/// This can be heavily parallelized; not even the same `file` is needed here...
/// feel free to wrap `metadata` under an `Arc`
let row_group = 0;
let column = 0;
let mut iter = get_page_iterator(&metadata, row_group, column, &mut file)?;

/// A page. It is just (compressed) bytes at this point.
let page = iter.next().unwrap().unwrap();
println!("{:#?}", page);

/// from here, we can do different things. One of them is to convert its buffers to native Rust.
/// This consumes the page.
use parquet2::serialization::native::page_to_array;
let array = page_to_array(page, &descriptor).unwrap();

How to implement page readers

In general, the in-memory format used to consume parquet pages strongly influences how the pages should be deserialized. As such, this crate does not commit to a particular in-memory format. Consumers are responsible for converting pages to their target in-memory format.

There is an implementation that uses the arrow format here.

Higher Parallelism

The function above creates an iterator over a row group, iter. In Arrow, this corresponds to a RecordBatch, divided in Parquet pages. Typically, converting a page into in-memory is expensive and thus consider how to distribute work across threads. E.g.

let handles = vec![];
for column in columns {
    let compressed_pages = get_page_iterator(&metadata, row_group, column, &mut file, file)?.collect()?;
    // each compressed_page has a buffer; cloning is expensive(!). We move it so that the memory
    // is released at the end of the processing.
    handles.push(thread::spawn move {
        page_iter_to_array(compressed_pages.into_iter())
    })
}
let columns_from_all_groups = handles.join_all();

this will read the file as quickly as possible in the main thread and send CPU-intensive work to other threads, thereby maximizing IO reads (at the cost of storing multiple compressed pages in memory; buffering is also an option here).

Decoding flow

Generally, a parquet file is read as follows:

  1. Read metadata
  2. Seek a row group and column
  3. iterate over (compressed) pages within that (group, column)

This is IO-intensive, requires parsing thrift, and seeking within a file.

Once a compressed page is loaded into memory, it can be decompressed, decoded and deserialized into a specific in-memory format. All of these operations are CPU-intensive and are thus left to consumers to perform, as they may want to send this work to threads.

read -> compressed page -> decompressed page -> decoded bytes -> deserialized

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Comments
  • use lz4_flex?

    use lz4_flex?

    I like your no unsafe approach, the LZ4 implementation of lz4_flex also uses foorbid(unsafe_code) with default feature flags. If you switch to it, even more parts of code would be completely safe.

    In the Readme I posted some benchmarks, the performance is a little bit slower than the unsafe version, but still decent.

    feature no-changelog 
    opened by PSeitz 12
  • WIP: Use lz4_flex for `wasm32`

    WIP: Use lz4_flex for `wasm32`

    Disclaimer: I'm new to Rust but motivated to get LZ4 Parquet decompression working in wasm. ๐Ÿ™‚

    I'm working on wasm bindings to arrow2/parquet2. So far it appears that all other compressions other than LZ4 are working. I tried to switch to lz4_flex on this branch but I'm not sure I gated the wasm target correctly. Tests seem to pass on this branch but when I try to load a Parquet file with LZ4 encoding on the web I get an error Uncaught (in promise) External format error: underlying IO error: WrongMagicNumber. Maybe I'm using the wrong lz4_flex APIs.

    Ref #85

    opened by kylebarron 10
  • Simple parquet-tools bin

    Simple parquet-tools bin

    Parquet tool companion for the parquet2 crate. This bin creates an executable that can be used to extract information from a parquet file. It has the next subcommands:

    • rowcount: show row count from parquet file
    • meta: shows metadata from file
    • dump: sample data from columns
    opened by elferherrera 10
  • Added `serde` support for `RowGroupMetaData`.

    Added `serde` support for `RowGroupMetaData`.

    use thrift for types in parquet_format_safe.

    https://github.com/jorgecarleitao/parquet2/issues/200

    it turns out that there are as many as 16 types that need to impl De/Serialize, so it is very troublesome to work it around outside.

    feature 
    opened by youngsofun 7
  • Reading large metadata-only `_metadata` file much slower than PyArrow

    Reading large metadata-only `_metadata` file much slower than PyArrow

    ๐Ÿ‘‹

    I'm working with some large partitioned Parquet datasets that have a top-level _metadata file that contains the FileMetaData for every row group in every Parquet file in the directory. This _metadata file can have up to 30,000 row groups. In my experience, parsing these files with parquet2::read::read_metadata can be up to 70x slower than with pyarrow.parquet.read_metadata.

    Python:

    In [1]: import pyarrow.parquet as pq
    
    In [2]: %timeit pq.read_metadata('_metadata')
    20.1 ms ยฑ 762 ยตs per loop (mean ยฑ std. dev. of 7 runs, 100 loops each)
    

    Arrow2:

    use std::{fs::File, time::Instant};
    
    use parquet2::read::read_metadata;
    
    fn main() {
        let mut file = File::open("_metadata").unwrap();
    
        let now = Instant::now();
        let meta = read_metadata(&mut file).unwrap();
        println!("Time to parse metadata: {}", now.elapsed().as_secs_f32());
    }
    
    > cargo run
    Time to parse metadata: 1.465529
    

    Anecdotally, for a _metadata file internally with 30,000 row groups, it was taking ~11s to parse in arrow2 and ~160ms to parse in pyarrow. (Though in the making of this repro example, I learned that pyarrow.parquet.write_metadata is O(n^2) ๐Ÿ˜ฌ, so I didn't create a full 30,000 setup for this example.)

    I haven't looked at the code for read_metadata yet; do you have any ideas where this might be slower than with pyarrow?

    Repro:

    from io import BytesIO
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    
    
    def create_example_file_meta_data():
        data = {
            "str": pa.array(["a", "b", "c", "d"], type=pa.string()),
            "uint8": pa.array([1, 2, 3, 4], type=pa.uint8()),
            "int32": pa.array([0, -2147483638, 2147483637, 1], type=pa.int32()),
            "bool": pa.array([True, True, False, False], type=pa.bool_()),
        }
        table = pa.table(data)
        metadata_collector = []
        pq.write_table(table, BytesIO(), metadata_collector=metadata_collector)
        return table.schema, metadata_collector[0]
    
    def main():
        schema, meta = create_example_file_meta_data()
        print('created collector')
        metadata_collector = [meta] * 5_000
        print('writing meta')
        pq.write_metadata(schema, '_metadata', metadata_collector=metadata_collector)
    
    if __name__ == '__main__':
        main()
    
    question no-changelog 
    opened by kylebarron 7
  • Compression levels for Parquet Compression

    Compression levels for Parquet Compression

    Would there be a desire to allow compression levels for parquet compression?

    For example, to enable Zstd compression, an option for the interface could be something like:

    pub enum Compression {
        Uncompressed,
        Snappy,
        Gzip,
        Lzo,
        Brotli,
        Lz4,
        Zstd(ZstdLevel),
    }
    
    pub struct ZstdLevel(u8);
    
    // Maintains 'compatibility'
    impl Default for ZstdLevel {
        fn default() -> Self {
             Self(1)
        }
    }
    
    impl ZstdLevel {
        pub fn try_new(level: u8) -> Result<Self, ()>;
    }
    

    However, that would be a breaking change. I'd be happy to set it up and open a PR if this seems worthwhile.

    feature no-changelog 
    opened by TurnOfACard 7
  • Removed `AsyncSeek` requirement from page stream

    Removed `AsyncSeek` requirement from page stream

    When streaming pages over the network it is quite awkward to fulfill the AsyncSeek constraint.

    This PR exposes methods that work nicely with usages that only provide AsyncRead (for example, a Rusoto S3 StreamingBody /w TryStreamExt::into_async_read).

    cargo test passes, but I note there are no tests that exercise the page filtering. A naive attempt to add additional pages in test_column_async didn't work. If the lack of tests here are of concern then I need some guidance on where/how to add them.

    enhancement 
    opened by medwards 6
  • Added optional support for LZ4 via LZ4-flex crate (thus enabling wasm)

    Added optional support for LZ4 via LZ4-flex crate (thus enabling wasm)

    This PR was done together with @kylebarron and adds an optional dependency lz4-flex by @PSeitz as a LZ4 compressor/decompressor.

    This implementation a bit slower but uses no unsafe and is written in native Rust, therefore supporting being compiled to wasm.

    feature 
    opened by jorgecarleitao 6
  • Issue decoding Parquet file with a column ending with a null page

    Issue decoding Parquet file with a column ending with a null page

    When trying to decompress a numeric column in a parquet file that ends with a page of nulls, parquet2 panics. Here is a test case that reproduces the conditions of the failure.

    https://github.com/jorgecarleitao/parquet2/commit/85a092f68326da1d555742f59e6aac2caf91b53e

    The backtrace of the error is here:

    ---- stage::tests::retrieve_s3_parquet_test stdout ----
    thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value', /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.9.0/src/encoding/hybrid_rle/mod.rs:41:32
    stack backtrace:
       0: rust_begin_unwind
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:517:5
       1: core::panicking::panic_fmt
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/panicking.rs:100:14
       2: core::panicking::panic
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/panicking.rs:50:5
       3: core::option::Option<T>::unwrap
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/option.rs:746:21
       4: parquet2::encoding::hybrid_rle::read_next
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.9.0/src/encoding/hybrid_rle/mod.rs:41:17
       5: parquet2::encoding::hybrid_rle::HybridRleDecoder::new
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.9.0/src/encoding/hybrid_rle/mod.rs:63:21
       6: arrow2::io::parquet::read::primitive::basic::read_dict_buffer_optional
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.9.0/src/io/parquet/read/primitive/basic.rs:38:9
       7: arrow2::io::parquet::read::primitive::basic::extend_from_page
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.9.0/src/io/parquet/read/primitive/basic.rs:184:13
       8: arrow2::io::parquet::read::primitive::iter_to_array
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.9.0/src/io/parquet/read/primitive/mod.rs:86:13
       9: arrow2::io::parquet::read::page_iter_to_array
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.9.0/src/io/parquet/read/mod.rs:359:20
      10: arrow2::io::parquet::read::column_iter_to_array
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.9.0/src/io/parquet/read/mod.rs:438:25
      11: ingest::stage::load_parquet_row_group::{{closure}}::{{closure}}::{{closure}}
                 at ./src/stage.rs:404:17
      12: tokio::runtime::enter::exit
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/enter.rs:90:9
      13: tokio::runtime::thread_pool::worker::block_in_place
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:332:9
      14: tokio::task::blocking::block_in_place
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/blocking.rs:77:9
      15: ingest::stage::load_parquet_row_group::{{closure}}::{{closure}}
                 at ./src/stage.rs:396:37
      16: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      17: ingest::stage::load_parquet_row_group::{{closure}}
                 at ./src/stage.rs:357:1
      18: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      19: ingest::stage::load_parquet_shard::{{closure}}::{{closure}}
                 at ./src/stage.rs:335:25
      20: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      21: ingest::stage::load_parquet_shard::{{closure}}
                 at ./src/stage.rs:311:1
      22: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      23: ingest::stage::load_shard::{{closure}}::{{closure}}
                 at ./src/stage.rs:156:17
      24: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      25: ingest::stage::load_shard::{{closure}}
                 at ./src/stage.rs:116:1
      26: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      27: <sentry_core::futures::SentryFuture<F> as core::future::future::Future>::poll::{{closure}}
                 at /home/ubuntu/.cargo/git/checkouts/sentry-rust-523ec8d3fcd6cdc5/2a15988/sentry-core/src/futures.rs:40:30
      28: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/panic/unwind_safe.rs:271:9
      29: std::panicking::try::do_call
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:403:40
      30: __rust_try
      31: std::panicking::try
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:367:19
      32: std::panic::catch_unwind
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panic.rs:133:14
      33: sentry_core::hub::Hub::run
                 at /home/ubuntu/.cargo/git/checkouts/sentry-rust-523ec8d3fcd6cdc5/2a15988/sentry-core/src/hub.rs:229:26
      34: <sentry_core::futures::SentryFuture<F> as core::future::future::Future>::poll
                 at /home/ubuntu/.cargo/git/checkouts/sentry-rust-523ec8d3fcd6cdc5/2a15988/sentry-core/src/futures.rs:40:13
      35: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/task_local.rs:280:28
      36: tokio::task::task_local::TaskLocalFuture<T,F>::with_task
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/task_local.rs:272:9
      37: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/task_local.rs:280:9
      38: service::context::Context::scope::{{closure}}
                 at /home/ubuntu/core/service/src/context.rs:111:9
      39: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      40: service::context::InContext::in_current_context::{{closure}}
                 at /home/ubuntu/core/service/src/context.rs:364:9
      41: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      42: <core::pin::Pin<P> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/future.rs:119:9
      43: <tracing::instrument::Instrumented<T> as core::future::future::Future>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-0.1.29/src/instrument.rs:272:9
      44: tokio::runtime::task::core::CoreStage<T>::poll::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/core.rs:161:17
      45: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/loom/std/unsafe_cell.rs:14:9
      46: tokio::runtime::task::core::CoreStage<T>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/core.rs:151:13
      47: tokio::runtime::task::harness::poll_future::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:461:19
      48: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/panic/unwind_safe.rs:271:9
      49: std::panicking::try::do_call
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:403:40
      50: __rust_try
      51: std::panicking::try
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:367:19
      52: std::panic::catch_unwind
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panic.rs:133:14
      53: tokio::runtime::task::harness::poll_future
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:449:18
      54: tokio::runtime::task::harness::Harness<T,S>::poll_inner
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:98:27
      55: tokio::runtime::task::harness::Harness<T,S>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:53:15
      56: tokio::runtime::task::raw::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/raw.rs:113:5
      57: tokio::runtime::task::raw::RawTask::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/raw.rs:70:18
      58: tokio::runtime::task::LocalNotified<S>::run
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/mod.rs:343:9
      59: tokio::runtime::thread_pool::worker::Context::run_task::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:420:13
      60: tokio::coop::with_budget::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/coop.rs:106:9
      61: std::thread::local::LocalKey<T>::try_with
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/thread/local.rs:399:16
      62: std::thread::local::LocalKey<T>::with
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/thread/local.rs:375:9
      63: tokio::coop::with_budget
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/coop.rs:99:5
      64: tokio::coop::budget
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/coop.rs:76:5
      65: tokio::runtime::thread_pool::worker::Context::run_task
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:419:9
      66: tokio::runtime::thread_pool::worker::Context::run
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:386:24
      67: tokio::runtime::thread_pool::worker::run::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:371:17
      68: tokio::macros::scoped_tls::ScopedKey<T>::set
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/macros/scoped_tls.rs:61:9
      69: tokio::runtime::thread_pool::worker::run
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:368:5
      70: tokio::runtime::thread_pool::worker::block_in_place::{{closure}}::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:324:41
      71: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/blocking/task.rs:42:21
      72: tokio::runtime::task::core::CoreStage<T>::poll::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/core.rs:161:17
      73: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/loom/std/unsafe_cell.rs:14:9
      74: tokio::runtime::task::core::CoreStage<T>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/core.rs:151:13
      75: tokio::runtime::task::harness::poll_future::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:461:19
      76: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/panic/unwind_safe.rs:271:9
      77: std::panicking::try::do_call
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:403:40
      78: __rust_try
      79: std::panicking::try
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:367:19
      80: std::panic::catch_unwind
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panic.rs:133:14
      81: tokio::runtime::task::harness::poll_future
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:449:18
      82: tokio::runtime::task::harness::Harness<T,S>::poll_inner
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:98:27
      83: tokio::runtime::task::harness::Harness<T,S>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:53:15
      84: tokio::runtime::task::raw::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/raw.rs:113:5
      85: tokio::runtime::task::raw::RawTask::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/raw.rs:70:18
      86: tokio::runtime::task::UnownedTask<S>::run
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/mod.rs:379:9
      87: tokio::runtime::blocking::pool::Inner::run
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/blocking/pool.rs:264:17
      88: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/blocking/pool.rs:244:17
    

    The problematic file is sensitive, but the it's a mostly-null numeric column. The last page contains ~300 null values.

    bug no-changelog 
    opened by mdrach 6
  • Simplified API to write files

    Simplified API to write files

    This simplifies the API to write a file by giving the user more control over when to emit a Row group to write.

    Thanks to /u/dexterduck for the feedback on reddit that led to this PR!

    backwards-incompatible 
    opened by jorgecarleitao 5
  • duckdb fails to read generated files

    duckdb fails to read generated files

    duckdb won't read files generated by this library, with OPTIONAL/ZSTD columns.

    I think it's attempting to read the validity map as page data; it appears to try to decompress an array of vec![0xffu8; 3000] as if it was compressed page data.

    I haven't had a go at reproducing this with a reasonable size file, nor checked whether it's duckdb or parquet2 which deviates from the specification, as I have absolutely no idea what I'm doing!

    I will attempt to fill this bug report with some vaguely useful information later, but I thought this was better than nothing.


    That is, duckdb prints:

    Error: ZSTD Decompression failure

    ..for any column in my file, all of which are Utf8Array<i32> with (optional) nulls.

    question 
    opened by FauxFaux 5
  • Enbale setting `selected_rows` in the runtime.

    Enbale setting `selected_rows` in the runtime.

    It's useful to let user change the selected_rows during iteration.

    For example:

    // Prefetch one column and apply predicates to it to get a bitmap.
    let bitmap = pre_fetch_and_filter(pre);
    let bitmap = Arc::new(Mutex::new(bitmap));
    // Use this bitmap to iterate the remaining column(s) and select rows;
    let pages = PageReader::new_with_page_meta(
            reader,
            reader_meta,
            pages_filter,
            scratch,
            max_header_size,
        )
        .map(move |page| {
            page.map(|page| {
                page.select_rows(use_bitmap(bitmap));
            })
        });
    
    let array_iter = column_iter_to_arrays(pages, ...);
    // ...
    
    
    opened by RinChanNOWWW 1
  • Unnecessarily slow writes due to repeated flushes when writing to a BufWriter

    Unnecessarily slow writes due to repeated flushes when writing to a BufWriter

    I have an implementation using this library that writes parquet to an s3 bucket. I haven't particularly noticed the slowness of writing parquet earlier, I'll admit, but when writing batches over requests it became quite noticeable.

    Since I'm using a BufWriter to wrap my output Write-implementation, I would expect that the buffer would be used as much as possible so I can define the size of each request sent to S3. However, this part here in write/column_chunk.rs:

        let mut protocol = TCompactOutputProtocol::new(writer);
        bytes_written += column_chunk
            .meta_data
            .as_ref()
            .unwrap()
            .write_to_out_protocol(&mut protocol)? as u64;
        protocol.flush()?;
    

    .. ensures that the BufWriter flushes on each column-chunk written, which is detrimental to performance. I haven't looked too closely at the implementation of TCompactOutputProtocol, but I wouldn't think flushing the underlying writer is necessary here?

    enhancement 
    opened by cyr 2
  • Reading pages with large statistics with an IndexedPageReader fails

    Reading pages with large statistics with an IndexedPageReader fails

    IndexedPageReader puts a hard limit of 1MB on the size of the page headers it can deserialize https://github.com/jorgecarleitao/parquet2/blob/main/src/read/page/indexed_reader.rs#L63

    If a page contains a value larger than 512KB and is written out with statistics, the page header will be larger than 1MB.

    This is not a problem when using an unfiltered PageReader whose limit scales with the size of the page https://github.com/jorgecarleitao/arrow2/blob/main/src/io/parquet/read/row_group.rs#L240

    This would not be much of an issue to me if I could disable statistics per field, but parquet2 can only either write all pages with statistics or none of them https://github.com/jorgecarleitao/parquet2/blob/main/src/write/indexes/serialize.rs#L47

    Repro

    use arrow2::array::Utf8Array;
    use arrow2::chunk::Chunk;
    use arrow2::datatypes::{DataType, Field, Schema};
    use arrow2::io::parquet::read;
    use arrow2::io::parquet::write::{
        transverse, CompressionOptions, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
    };
    use parquet2::indexes;
    use parquet2::indexes::Interval;
    use std::error::Error;
    use std::fs::File;
    
    #[test]
    fn write_large_statistics() -> Result<(), Box<dyn Error>> {
        let array = Utf8Array::<i32>::from_slice(["foo".repeat(1_000_000)]);
        let field = Field::new("strings", DataType::Utf8, false);
        let schema = Schema::from(vec![field.clone()]);
        let chunk = Chunk::new(vec![array.boxed()]);
    
        let options = WriteOptions {
            write_statistics: true,
            compression: CompressionOptions::Uncompressed,
            version: Version::V2,
        };
    
        let iter = vec![Ok(chunk)];
    
        let encodings = schema
            .fields
            .iter()
            .map(|f| transverse(&f.data_type, |_| Encoding::Plain))
            .collect();
    
        let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?;
    
        let path = "large_statistics.parquet";
        let mut file = File::create(path)?;
        let mut writer = FileWriter::try_new(&mut file, schema, options)?;
        for group in row_groups {
            writer.write(group?)?;
        }
        writer.end(None)?;
    
        let mut reader = File::open(path)?;
        let metadata = read::read_metadata(&mut reader)?;
        let target_group = &metadata.row_groups[0];
        let intervals = vec![Interval {
            start: 0,
            length: target_group.num_rows(),
        }];
        let locations = read::read_pages_locations(&mut reader, target_group.columns())?;
        let columns = read::read_columns(&mut reader, target_group.columns(), &field.name)?;
        let field_pages = read::get_field_pages(target_group.columns(), &locations, &field.name);
        let filtered_pages = field_pages
            .into_iter()
            .map(|field_page| indexes::select_pages(&intervals, field_page, target_group.num_rows()))
            .collect::<Result<Vec<_>, _>>()?;
    
        let mut iter = read::to_deserializer(
            columns,
            field,
            target_group.num_rows(),
            None,
            Some(filtered_pages),
        )?;
        let array = iter.next().unwrap()?;
        println!("{:?}", array);
        Ok(())
    }
    
    bug 
    opened by tjwilson90 1
  • Updating parquet-tools

    Updating parquet-tools

    Im trying to update the parquet-tools with the changes after the Delayed dictionary (#160) PR.

    Im using the read::decompress command to extract the page and then Im using this function to decode the buffer

    pub fn read<T: NativeType>(
        buf: &[u8],
        num_values: usize,
        _is_sorted: bool,
    ) -> Result<PrimitivePageDict<T>> {
        let size_of = std::mem::size_of::<T>();
    
        let typed_size = num_values.wrapping_mul(size_of);
    
        let values = buf.get(..typed_size).ok_or_else(|| {
            Error::OutOfSpec(
                "The number of values declared in the dict page does not match the length of the page"
                    .to_string(),
            )
        })?;
    
        let values = values.chunks_exact(size_of).map(decode::<T>).collect();
    
        Ok(PrimitivePageDict::new(values))
    }
    

    Which is the same function used to decode the page previously.

    However, the read values from a sample file are wrong and do not represent the saved values in the file.

    Am I missing something during the page decompression stage?

    question 
    opened by elferherrera 1
  • Support async reading without file's content length

    Support async reading without file's content length

    Currently, any async reading using parquet2 requires knowing the content length of the remote resource, such as: https://github.com/jorgecarleitao/parquet2/blob/7be3cd6e14a8b7c444cb608ee664c9798c194d82/examples/s3/src/main.rs#L21-L22

    However, for any API that follows the Range HTTP request header spec, knowing the content length of the file in advance is unnecessary because:

    1. negative ranges (counting from the end) are supported by the spec, i.e. Range=-4096 will fetch the last 4096 bytes of the file
    2. The last bytes of the Parquet footer contains the total number of bytes in the footer. Therefore after that initial metadata request, you know if you need another request before parsing the metadata. You should never need to know the total number of bytes in the file because every ColumnChunkMetaData contains the absolute start and end ranges of each column buffer.

    In particular, the downside of needing to know the file's content length is an extra HEAD request, which in environments like a client-side browser could have significant latency.

    Do you have opinions on an API where the content length is not needed (or at least is optional)? I see that the AsyncSeek trait that is required on the reader depends on the SeekFrom enum, which includes End(i64). Therefore it would seem to me that the existing AsyncSeek trait would be enough to use only negative ranges...? Or am I missing something?

    enhancement 
    opened by kylebarron 8
Releases(v0.17.0)
Owner
Jorge Leitao
Jorge Leitao
Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. ๐Ÿš€

flaco Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. ?? Have a gander at the initial benchmarks

Miles Granger 14 Oct 31, 2022
ConnectorX - Fastest library to load data from DB to DataFrames in Rust and Python

ConnectorX enables you to load data from databases into Python in the fastest and most memory efficient way.

SFU Database Group 888 Nov 25, 2022
Library for scripting analyses against crates.io's database dumps

crates.io database dumps Library for scripting analyses against crates.io's database dumps. These database dumps contain all information exposed by th

David Tolnay 52 Nov 12, 2022
A Rust DataFrame implementation, built on Apache Arrow

Rust DataFrame A dataframe implementation in Rust, powered by Apache Arrow. What is a dataframe? A dataframe is a 2-dimensional tabular data structure

Wakahisa 287 Nov 11, 2022
Official Rust implementation of Apache Arrow

Native Rust implementation of Apache Arrow Welcome to the implementation of Arrow, the popular in-memory columnar format, in Rust. This part of the Ar

The Apache Software Foundation 1.2k Nov 29, 2022
A new arguably faster implementation of Apache Spark from scratch in Rust

vega Previously known as native_spark. Documentation A new, arguably faster, implementation of Apache Spark from scratch in Rust. WIP Framework tested

raja sekar 2.1k Nov 24, 2022
bspipe A Rust implementation of Bidirectional Secure Pipe

bspipe A Rust implementation of Bidirectional Secure Pipe

xufanglu 2 Nov 14, 2022
Dataframe structure and operations in Rust

Utah Utah is a Rust crate backed by ndarray for type-conscious, tabular data manipulation with an expressive, functional interface. Note: This crate w

Suchin 139 Sep 26, 2022
A Rust crate that reads and writes tfrecord files

tfrecord-rust The crate provides the functionality to serialize and deserialize TFRecord data format from TensorFlow. Features Provide both high level

null 22 Nov 3, 2022
ndarray: an N-dimensional array with array views, multidimensional slicing, and efficient operations

ndarray The ndarray crate provides an n-dimensional container for general elements and for numerics. Please read the API documentation on docs.rs or t

null 2.5k Nov 23, 2022
Apache Arrow DataFusion and Ballista query engines

DataFusion is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.

The Apache Software Foundation 2.8k Dec 2, 2022
Orkhon: ML Inference Framework and Server Runtime

Orkhon: ML Inference Framework and Server Runtime Latest Release License Build Status Downloads Gitter What is it? Orkhon is Rust framework for Machin

Theo M. Bulut 127 Nov 20, 2022
Tiny, no-nonsense, self-contained, Tensorflow and ONNX inference

Sonos' Neural Network inference engine. This project used to be called tfdeploy, or Tensorflow-deploy-rust. What ? tract is a Neural Network inference

Sonos, Inc. 1.4k Nov 29, 2022
Provides a way to use enums to describe and execute ordered data pipelines. ๐Ÿฆ€๐Ÿพ

enum_pipline Provides a way to use enums to describe and execute ordered data pipelines. ?? ?? I needed a succinct way to describe 2d pixel map operat

Ben Greenier 0 Oct 29, 2021
AppFlowy is an open-source alternative to Notion. You are in charge of your data and customizations

AppFlowy is an open-source alternative to Notion. You are in charge of your data and customizations. Built with Flutter and Rust.

null 29.6k Nov 26, 2022
New generation decentralized data warehouse and streaming data pipeline

World's first decentralized real-time data warehouse, on your laptop Docs | Demo | Tutorials | Examples | FAQ | Chat Get Started Watch this introducto

kamu 181 Nov 26, 2022
An example repository on how to start building graph applications on streaming data. Just clone and start building ๐Ÿ’ป ๐Ÿ’ช

An example repository on how to start building graph applications on streaming data. Just clone and start building ?? ??

Memgraph 39 Nov 10, 2022
Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing

Apache Arrow Powering In-Memory Analytics Apache Arrow is a development platform for in-memory analytics. It contains a set of technologies that enabl

The Apache Software Foundation 10.7k Dec 2, 2022
This library provides a data view for reading and writing data in a byte array.

Docs This library provides a data view for reading and writing data in a byte array. This library requires feature(generic_const_exprs) to be enabled.

null 2 Nov 2, 2022