Fastest and safest Rust implementation of parquet. `unsafe` free. Integration-tested against pyarrow

Overview

Parquet2

This is a re-write of the official parquet crate with performance, parallelism and safety in mind.

The five main differentiators in comparison with parquet are:

  • does not use unsafe
  • delegates parallelism downstream
  • decouples reading (IO intensive) from computing (CPU intensive)
  • deletages decompressing and decoding batches downstream
  • it is faster (10-20x when reading to arrow format)
  • Is integration-tested against pyarrow 3 and (py)spark 3

The overall idea is to offer the ability to read compressed parquet pages and a toolkit to decompress them to their favourite in-memory format.

This allows this crate's iterators to perform minimal CPU work, thereby maximizing throughput. It is up to the consumers to decide whether they want to take advantage of this through parallelism at the expense of memory usage (e.g. decompress and deserialize pages in threads) or not.

Functionality implemented

  • Read dictionary pages
  • Read and write V1 pages
  • Read and write V2 pages
  • Compression and de-compression (all)

Functionality not (yet) implemented

The parquet format has multiple encoding strategies for the different physical types. This crate currently reads from almost all of them, and supports encoding to a subset of them. They are:

Supported decoding

Delta-encodings are still experimental, as I have been unable to generate large pages encoded with them from spark, thereby hindering robust integration tests.

Encoding

Organization

  • read: read metadata and pages
  • write: write metadata and pages
  • metadata: parquet files metadata (e.g. FileMetaData)
  • schema: types metadata declaration (e.g. ConvertedType)
  • types: physical type declaration (i.e. how things are represented in memory). So far unused.
  • compression: compression (e.g. Gzip)
  • error: errors declaration
  • serialization: convert from bytes to rust native typed buffers (Vec<Option<T>>).

Note that serialization is not very robust. It serves as a playground to understand the specification and how to serialize and deserialize pages.

Run unit tests

There are tests being run against parquet files generated by pyarrow. To ignore these, use PARQUET2_IGNORE_PYARROW_TESTS= cargo test. To run then, you will need to run

python3 -m venv venv
venv/bin/pip install pip --upgrade
venv/bin/pip install pyarrow==3
venv/bin/python integration/write_pyarrow.py

before. This is only needed once (per change in the integration/write_pyarrow.py).

How to use

use std::fs::File;

use parquet2::read::{Page, read_metadata, get_page_iterator};

let mut file = File::open("testing/parquet-testing/data/alltypes_plain.parquet").unwrap();

/// here we read the metadata.
let metadata = read_metadata(&mut file)?;

/// Here we get an iterator of pages (each page has its own data)
/// This can be heavily parallelized; not even the same `file` is needed here...
/// feel free to wrap `metadata` under an `Arc`
let row_group = 0;
let column = 0;
let mut iter = get_page_iterator(&metadata, row_group, column, &mut file)?;

/// A page. It is just (compressed) bytes at this point.
let page = iter.next().unwrap().unwrap();
println!("{:#?}", page);

/// from here, we can do different things. One of them is to convert its buffers to native Rust.
/// This consumes the page.
use parquet2::serialization::native::page_to_array;
let array = page_to_array(page, &descriptor).unwrap();

How to implement page readers

In general, the in-memory format used to consume parquet pages strongly influences how the pages should be deserialized. As such, this crate does not commit to a particular in-memory format. Consumers are responsible for converting pages to their target in-memory format.

There is an implementation that uses the arrow format here.

Higher Parallelism

The function above creates an iterator over a row group, iter. In Arrow, this corresponds to a RecordBatch, divided in Parquet pages. Typically, converting a page into in-memory is expensive and thus consider how to distribute work across threads. E.g.

let handles = vec![];
for column in columns {
    let compressed_pages = get_page_iterator(&metadata, row_group, column, &mut file, file)?.collect()?;
    // each compressed_page has a buffer; cloning is expensive(!). We move it so that the memory
    // is released at the end of the processing.
    handles.push(thread::spawn move {
        page_iter_to_array(compressed_pages.into_iter())
    })
}
let columns_from_all_groups = handles.join_all();

this will read the file as quickly as possible in the main thread and send CPU-intensive work to other threads, thereby maximizing IO reads (at the cost of storing multiple compressed pages in memory; buffering is also an option here).

Decoding flow

Generally, a parquet file is read as follows:

  1. Read metadata
  2. Seek a row group and column
  3. iterate over (compressed) pages within that (group, column)

This is IO-intensive, requires parsing thrift, and seeking within a file.

Once a compressed page is loaded into memory, it can be decompressed, decoded and deserialized into a specific in-memory format. All of these operations are CPU-intensive and are thus left to consumers to perform, as they may want to send this work to threads.

read -> compressed page -> decompressed page -> decoded bytes -> deserialized

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Comments
  • use lz4_flex?

    use lz4_flex?

    I like your no unsafe approach, the LZ4 implementation of lz4_flex also uses foorbid(unsafe_code) with default feature flags. If you switch to it, even more parts of code would be completely safe.

    In the Readme I posted some benchmarks, the performance is a little bit slower than the unsafe version, but still decent.

    feature no-changelog 
    opened by PSeitz 12
  • WIP: Use lz4_flex for `wasm32`

    WIP: Use lz4_flex for `wasm32`

    Disclaimer: I'm new to Rust but motivated to get LZ4 Parquet decompression working in wasm. ๐Ÿ™‚

    I'm working on wasm bindings to arrow2/parquet2. So far it appears that all other compressions other than LZ4 are working. I tried to switch to lz4_flex on this branch but I'm not sure I gated the wasm target correctly. Tests seem to pass on this branch but when I try to load a Parquet file with LZ4 encoding on the web I get an error Uncaught (in promise) External format error: underlying IO error: WrongMagicNumber. Maybe I'm using the wrong lz4_flex APIs.

    Ref #85

    opened by kylebarron 10
  • Simple parquet-tools bin

    Simple parquet-tools bin

    Parquet tool companion for the parquet2 crate. This bin creates an executable that can be used to extract information from a parquet file. It has the next subcommands:

    • rowcount: show row count from parquet file
    • meta: shows metadata from file
    • dump: sample data from columns
    opened by elferherrera 10
  • Added `serde` support for `RowGroupMetaData`.

    Added `serde` support for `RowGroupMetaData`.

    use thrift for types in parquet_format_safe.

    https://github.com/jorgecarleitao/parquet2/issues/200

    it turns out that there are as many as 16 types that need to impl De/Serialize, so it is very troublesome to work it around outside.

    feature 
    opened by youngsofun 7
  • Reading large metadata-only `_metadata` file much slower than PyArrow

    Reading large metadata-only `_metadata` file much slower than PyArrow

    ๐Ÿ‘‹

    I'm working with some large partitioned Parquet datasets that have a top-level _metadata file that contains the FileMetaData for every row group in every Parquet file in the directory. This _metadata file can have up to 30,000 row groups. In my experience, parsing these files with parquet2::read::read_metadata can be up to 70x slower than with pyarrow.parquet.read_metadata.

    Python:

    In [1]: import pyarrow.parquet as pq
    
    In [2]: %timeit pq.read_metadata('_metadata')
    20.1 ms ยฑ 762 ยตs per loop (mean ยฑ std. dev. of 7 runs, 100 loops each)
    

    Arrow2:

    use std::{fs::File, time::Instant};
    
    use parquet2::read::read_metadata;
    
    fn main() {
        let mut file = File::open("_metadata").unwrap();
    
        let now = Instant::now();
        let meta = read_metadata(&mut file).unwrap();
        println!("Time to parse metadata: {}", now.elapsed().as_secs_f32());
    }
    
    > cargo run
    Time to parse metadata: 1.465529
    

    Anecdotally, for a _metadata file internally with 30,000 row groups, it was taking ~11s to parse in arrow2 and ~160ms to parse in pyarrow. (Though in the making of this repro example, I learned that pyarrow.parquet.write_metadata is O(n^2) ๐Ÿ˜ฌ, so I didn't create a full 30,000 setup for this example.)

    I haven't looked at the code for read_metadata yet; do you have any ideas where this might be slower than with pyarrow?

    Repro:

    from io import BytesIO
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    
    
    def create_example_file_meta_data():
        data = {
            "str": pa.array(["a", "b", "c", "d"], type=pa.string()),
            "uint8": pa.array([1, 2, 3, 4], type=pa.uint8()),
            "int32": pa.array([0, -2147483638, 2147483637, 1], type=pa.int32()),
            "bool": pa.array([True, True, False, False], type=pa.bool_()),
        }
        table = pa.table(data)
        metadata_collector = []
        pq.write_table(table, BytesIO(), metadata_collector=metadata_collector)
        return table.schema, metadata_collector[0]
    
    def main():
        schema, meta = create_example_file_meta_data()
        print('created collector')
        metadata_collector = [meta] * 5_000
        print('writing meta')
        pq.write_metadata(schema, '_metadata', metadata_collector=metadata_collector)
    
    if __name__ == '__main__':
        main()
    
    question no-changelog 
    opened by kylebarron 7
  • Compression levels for Parquet Compression

    Compression levels for Parquet Compression

    Would there be a desire to allow compression levels for parquet compression?

    For example, to enable Zstd compression, an option for the interface could be something like:

    pub enum Compression {
        Uncompressed,
        Snappy,
        Gzip,
        Lzo,
        Brotli,
        Lz4,
        Zstd(ZstdLevel),
    }
    
    pub struct ZstdLevel(u8);
    
    // Maintains 'compatibility'
    impl Default for ZstdLevel {
        fn default() -> Self {
             Self(1)
        }
    }
    
    impl ZstdLevel {
        pub fn try_new(level: u8) -> Result<Self, ()>;
    }
    

    However, that would be a breaking change. I'd be happy to set it up and open a PR if this seems worthwhile.

    feature no-changelog 
    opened by TurnOfACard 7
  • Removed `AsyncSeek` requirement from page stream

    Removed `AsyncSeek` requirement from page stream

    When streaming pages over the network it is quite awkward to fulfill the AsyncSeek constraint.

    This PR exposes methods that work nicely with usages that only provide AsyncRead (for example, a Rusoto S3 StreamingBody /w TryStreamExt::into_async_read).

    cargo test passes, but I note there are no tests that exercise the page filtering. A naive attempt to add additional pages in test_column_async didn't work. If the lack of tests here are of concern then I need some guidance on where/how to add them.

    enhancement 
    opened by medwards 6
  • Added optional support for LZ4 via LZ4-flex crate (thus enabling wasm)

    Added optional support for LZ4 via LZ4-flex crate (thus enabling wasm)

    This PR was done together with @kylebarron and adds an optional dependency lz4-flex by @PSeitz as a LZ4 compressor/decompressor.

    This implementation a bit slower but uses no unsafe and is written in native Rust, therefore supporting being compiled to wasm.

    feature 
    opened by jorgecarleitao 6
  • Issue decoding Parquet file with a column ending with a null page

    Issue decoding Parquet file with a column ending with a null page

    When trying to decompress a numeric column in a parquet file that ends with a page of nulls, parquet2 panics. Here is a test case that reproduces the conditions of the failure.

    https://github.com/jorgecarleitao/parquet2/commit/85a092f68326da1d555742f59e6aac2caf91b53e

    The backtrace of the error is here:

    ---- stage::tests::retrieve_s3_parquet_test stdout ----
    thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value', /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.9.0/src/encoding/hybrid_rle/mod.rs:41:32
    stack backtrace:
       0: rust_begin_unwind
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:517:5
       1: core::panicking::panic_fmt
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/panicking.rs:100:14
       2: core::panicking::panic
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/panicking.rs:50:5
       3: core::option::Option<T>::unwrap
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/option.rs:746:21
       4: parquet2::encoding::hybrid_rle::read_next
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.9.0/src/encoding/hybrid_rle/mod.rs:41:17
       5: parquet2::encoding::hybrid_rle::HybridRleDecoder::new
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.9.0/src/encoding/hybrid_rle/mod.rs:63:21
       6: arrow2::io::parquet::read::primitive::basic::read_dict_buffer_optional
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.9.0/src/io/parquet/read/primitive/basic.rs:38:9
       7: arrow2::io::parquet::read::primitive::basic::extend_from_page
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.9.0/src/io/parquet/read/primitive/basic.rs:184:13
       8: arrow2::io::parquet::read::primitive::iter_to_array
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.9.0/src/io/parquet/read/primitive/mod.rs:86:13
       9: arrow2::io::parquet::read::page_iter_to_array
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.9.0/src/io/parquet/read/mod.rs:359:20
      10: arrow2::io::parquet::read::column_iter_to_array
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.9.0/src/io/parquet/read/mod.rs:438:25
      11: ingest::stage::load_parquet_row_group::{{closure}}::{{closure}}::{{closure}}
                 at ./src/stage.rs:404:17
      12: tokio::runtime::enter::exit
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/enter.rs:90:9
      13: tokio::runtime::thread_pool::worker::block_in_place
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:332:9
      14: tokio::task::blocking::block_in_place
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/blocking.rs:77:9
      15: ingest::stage::load_parquet_row_group::{{closure}}::{{closure}}
                 at ./src/stage.rs:396:37
      16: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      17: ingest::stage::load_parquet_row_group::{{closure}}
                 at ./src/stage.rs:357:1
      18: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      19: ingest::stage::load_parquet_shard::{{closure}}::{{closure}}
                 at ./src/stage.rs:335:25
      20: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      21: ingest::stage::load_parquet_shard::{{closure}}
                 at ./src/stage.rs:311:1
      22: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      23: ingest::stage::load_shard::{{closure}}::{{closure}}
                 at ./src/stage.rs:156:17
      24: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      25: ingest::stage::load_shard::{{closure}}
                 at ./src/stage.rs:116:1
      26: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      27: <sentry_core::futures::SentryFuture<F> as core::future::future::Future>::poll::{{closure}}
                 at /home/ubuntu/.cargo/git/checkouts/sentry-rust-523ec8d3fcd6cdc5/2a15988/sentry-core/src/futures.rs:40:30
      28: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/panic/unwind_safe.rs:271:9
      29: std::panicking::try::do_call
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:403:40
      30: __rust_try
      31: std::panicking::try
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:367:19
      32: std::panic::catch_unwind
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panic.rs:133:14
      33: sentry_core::hub::Hub::run
                 at /home/ubuntu/.cargo/git/checkouts/sentry-rust-523ec8d3fcd6cdc5/2a15988/sentry-core/src/hub.rs:229:26
      34: <sentry_core::futures::SentryFuture<F> as core::future::future::Future>::poll
                 at /home/ubuntu/.cargo/git/checkouts/sentry-rust-523ec8d3fcd6cdc5/2a15988/sentry-core/src/futures.rs:40:13
      35: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/task_local.rs:280:28
      36: tokio::task::task_local::TaskLocalFuture<T,F>::with_task
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/task_local.rs:272:9
      37: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/task/task_local.rs:280:9
      38: service::context::Context::scope::{{closure}}
                 at /home/ubuntu/core/service/src/context.rs:111:9
      39: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      40: service::context::InContext::in_current_context::{{closure}}
                 at /home/ubuntu/core/service/src/context.rs:364:9
      41: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/mod.rs:80:19
      42: <core::pin::Pin<P> as core::future::future::Future>::poll
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/future/future.rs:119:9
      43: <tracing::instrument::Instrumented<T> as core::future::future::Future>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-0.1.29/src/instrument.rs:272:9
      44: tokio::runtime::task::core::CoreStage<T>::poll::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/core.rs:161:17
      45: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/loom/std/unsafe_cell.rs:14:9
      46: tokio::runtime::task::core::CoreStage<T>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/core.rs:151:13
      47: tokio::runtime::task::harness::poll_future::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:461:19
      48: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/panic/unwind_safe.rs:271:9
      49: std::panicking::try::do_call
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:403:40
      50: __rust_try
      51: std::panicking::try
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:367:19
      52: std::panic::catch_unwind
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panic.rs:133:14
      53: tokio::runtime::task::harness::poll_future
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:449:18
      54: tokio::runtime::task::harness::Harness<T,S>::poll_inner
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:98:27
      55: tokio::runtime::task::harness::Harness<T,S>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:53:15
      56: tokio::runtime::task::raw::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/raw.rs:113:5
      57: tokio::runtime::task::raw::RawTask::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/raw.rs:70:18
      58: tokio::runtime::task::LocalNotified<S>::run
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/mod.rs:343:9
      59: tokio::runtime::thread_pool::worker::Context::run_task::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:420:13
      60: tokio::coop::with_budget::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/coop.rs:106:9
      61: std::thread::local::LocalKey<T>::try_with
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/thread/local.rs:399:16
      62: std::thread::local::LocalKey<T>::with
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/thread/local.rs:375:9
      63: tokio::coop::with_budget
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/coop.rs:99:5
      64: tokio::coop::budget
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/coop.rs:76:5
      65: tokio::runtime::thread_pool::worker::Context::run_task
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:419:9
      66: tokio::runtime::thread_pool::worker::Context::run
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:386:24
      67: tokio::runtime::thread_pool::worker::run::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:371:17
      68: tokio::macros::scoped_tls::ScopedKey<T>::set
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/macros/scoped_tls.rs:61:9
      69: tokio::runtime::thread_pool::worker::run
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:368:5
      70: tokio::runtime::thread_pool::worker::block_in_place::{{closure}}::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/thread_pool/worker.rs:324:41
      71: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/blocking/task.rs:42:21
      72: tokio::runtime::task::core::CoreStage<T>::poll::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/core.rs:161:17
      73: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/loom/std/unsafe_cell.rs:14:9
      74: tokio::runtime::task::core::CoreStage<T>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/core.rs:151:13
      75: tokio::runtime::task::harness::poll_future::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:461:19
      76: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/core/src/panic/unwind_safe.rs:271:9
      77: std::panicking::try::do_call
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:403:40
      78: __rust_try
      79: std::panicking::try
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panicking.rs:367:19
      80: std::panic::catch_unwind
                 at /rustc/f1edd0429582dd29cccacaf50fd134b05593bd9c/library/std/src/panic.rs:133:14
      81: tokio::runtime::task::harness::poll_future
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:449:18
      82: tokio::runtime::task::harness::Harness<T,S>::poll_inner
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:98:27
      83: tokio::runtime::task::harness::Harness<T,S>::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/harness.rs:53:15
      84: tokio::runtime::task::raw::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/raw.rs:113:5
      85: tokio::runtime::task::raw::RawTask::poll
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/raw.rs:70:18
      86: tokio::runtime::task::UnownedTask<S>::run
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/task/mod.rs:379:9
      87: tokio::runtime::blocking::pool::Inner::run
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/blocking/pool.rs:264:17
      88: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
                 at /home/ubuntu/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.14.0/src/runtime/blocking/pool.rs:244:17
    

    The problematic file is sensitive, but the it's a mostly-null numeric column. The last page contains ~300 null values.

    bug no-changelog 
    opened by mdrach 6
  • Simplified API to write files

    Simplified API to write files

    This simplifies the API to write a file by giving the user more control over when to emit a Row group to write.

    Thanks to /u/dexterduck for the feedback on reddit that led to this PR!

    backwards-incompatible 
    opened by jorgecarleitao 5
  • duckdb fails to read generated files

    duckdb fails to read generated files

    duckdb won't read files generated by this library, with OPTIONAL/ZSTD columns.

    I think it's attempting to read the validity map as page data; it appears to try to decompress an array of vec![0xffu8; 3000] as if it was compressed page data.

    I haven't had a go at reproducing this with a reasonable size file, nor checked whether it's duckdb or parquet2 which deviates from the specification, as I have absolutely no idea what I'm doing!

    I will attempt to fill this bug report with some vaguely useful information later, but I thought this was better than nothing.


    That is, duckdb prints:

    Error: ZSTD Decompression failure

    ..for any column in my file, all of which are Utf8Array<i32> with (optional) nulls.

    question 
    opened by FauxFaux 5
  • containt ==> contain typo

    containt ==> contain typo

    containt ==> contain typo:

    https://github.com/jorgecarleitao/parquet2/blob/7a5fc27039b192f255908154a0aba2e75f6ed5a1/src/read/metadata.rs#L40 https://github.com/jorgecarleitao/parquet2/blob/7a5fc27039b192f255908154a0aba2e75f6ed5a1/src/read/stream.rs#L33

    opened by ghuls 0
  • Write bloom filters

    Write bloom filters

    This pr mentions this requires big changes: https://github.com/jorgecarleitao/parquet2/pull/99. But this seems like a feature that is important to implement for performance. How doable is it in the current state of the library? I would like to work on it if possible

    opened by ozgrakkurt 0
  • Error running tests with `--all-features` (cannot find function `lz4_decompress_to_buffer`)

    Error running tests with `--all-features` (cannot find function `lz4_decompress_to_buffer`)

    Here's just cloning the repo and trying to run the tests (note that the same error occurs if you try to run all tests for arrow2):

       Compiling parquet2 v0.17.0
    error[E0425]: cannot find function `lz4_decompress_to_buffer` in this scope
       --> src/compression.rs:196:13
        |
    196 |             lz4_decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
        |             ^^^^^^^^^^^^^^^^^^^^^^^^ not found in this scope
    
    error[E0425]: cannot find function `lz4_decompress_to_buffer` in this scope
       --> src/compression.rs:265:33
        |
    265 |         let decompressed_size = lz4_decompress_to_buffer(
        |                                 ^^^^^^^^^^^^^^^^^^^^^^^^ not found in this scope
    

    I understand this has to do with lz4_flex and inconsistent feature sets, but having --all-features cause invalid feature sets is not very nice if it can be avoided.

    lz4_decompress_to_buffer() is only defined for lz4 && !lz4_flex or for !lz4_flex && lz4, however that's not what the match checks. Perhaps lz4_flex && lz4 should be treated as lz4_flex for all intents and purposes? (so that --all-features would make sense)

    Here's what the match checks:

            #[cfg(all(feature = "lz4_flex", not(feature = "lz4")))]
            Compression::Lz4Raw => lz4_flex::block::decompress_into(...),
            #[cfg(feature = "lz4")]
            Compression::Lz4Raw => lz4::block::decompress_to_buffer(...),
            #[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]
            Compression::Lz4Raw => Err(Error::FeatureNotActive(...)),
            #[cfg(any(feature = "lz4_flex", feature = "lz4"))]
            Compression::Lz4 => try_decompress_hadoop(...),
            // ^ lz4_decompress_to_buffer() also used here, error!
            #[cfg(all(not(feature = "lz4_flex"), not(feature = "lz4")))]
            Compression::Lz4 => Err(Error::FeatureNotActive(...)),
    

    (I know it's a breaking change but, just saying, another option is converting a feature, like lz4_flex, into a 'negative' feature, so that --all-features wouldn't enable it by default)

    opened by aldanor 0
  • Add support for byte_stream_split encoding

    Add support for byte_stream_split encoding

    Hi @jorgecarleitao ,

    It doesn't look like there is support for byte_stream_split encoding in parquet2.

    Is it hard to add support?

    Looks like he C++ lib has added an implementation a couple years ago.

    • https://issues.apache.org/jira/browse/PARQUET-1716
    opened by damelLP 0
  • Reading pages with large statistics with an IndexedPageReader fails

    Reading pages with large statistics with an IndexedPageReader fails

    IndexedPageReader puts a hard limit of 1MB on the size of the page headers it can deserialize https://github.com/jorgecarleitao/parquet2/blob/main/src/read/page/indexed_reader.rs#L63

    If a page contains a value larger than 512KB and is written out with statistics, the page header will be larger than 1MB.

    This is not a problem when using an unfiltered PageReader whose limit scales with the size of the page https://github.com/jorgecarleitao/arrow2/blob/main/src/io/parquet/read/row_group.rs#L240

    This would not be much of an issue to me if I could disable statistics per field, but parquet2 can only either write all pages with statistics or none of them https://github.com/jorgecarleitao/parquet2/blob/main/src/write/indexes/serialize.rs#L47

    Repro

    use arrow2::array::Utf8Array;
    use arrow2::chunk::Chunk;
    use arrow2::datatypes::{DataType, Field, Schema};
    use arrow2::io::parquet::read;
    use arrow2::io::parquet::write::{
        transverse, CompressionOptions, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
    };
    use parquet2::indexes;
    use parquet2::indexes::Interval;
    use std::error::Error;
    use std::fs::File;
    
    #[test]
    fn write_large_statistics() -> Result<(), Box<dyn Error>> {
        let array = Utf8Array::<i32>::from_slice(["foo".repeat(1_000_000)]);
        let field = Field::new("strings", DataType::Utf8, false);
        let schema = Schema::from(vec![field.clone()]);
        let chunk = Chunk::new(vec![array.boxed()]);
    
        let options = WriteOptions {
            write_statistics: true,
            compression: CompressionOptions::Uncompressed,
            version: Version::V2,
        };
    
        let iter = vec![Ok(chunk)];
    
        let encodings = schema
            .fields
            .iter()
            .map(|f| transverse(&f.data_type, |_| Encoding::Plain))
            .collect();
    
        let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?;
    
        let path = "large_statistics.parquet";
        let mut file = File::create(path)?;
        let mut writer = FileWriter::try_new(&mut file, schema, options)?;
        for group in row_groups {
            writer.write(group?)?;
        }
        writer.end(None)?;
    
        let mut reader = File::open(path)?;
        let metadata = read::read_metadata(&mut reader)?;
        let target_group = &metadata.row_groups[0];
        let intervals = vec![Interval {
            start: 0,
            length: target_group.num_rows(),
        }];
        let locations = read::read_pages_locations(&mut reader, target_group.columns())?;
        let columns = read::read_columns(&mut reader, target_group.columns(), &field.name)?;
        let field_pages = read::get_field_pages(target_group.columns(), &locations, &field.name);
        let filtered_pages = field_pages
            .into_iter()
            .map(|field_page| indexes::select_pages(&intervals, field_page, target_group.num_rows()))
            .collect::<Result<Vec<_>, _>>()?;
    
        let mut iter = read::to_deserializer(
            columns,
            field,
            target_group.num_rows(),
            None,
            Some(filtered_pages),
        )?;
        let array = iter.next().unwrap()?;
        println!("{:?}", array);
        Ok(())
    }
    
    bug 
    opened by tjwilson90 1
Releases(v0.17.0)
Owner
Jorge Leitao
Jorge Leitao
PostQuet: Stream PostgreSQL tables/queries to Parquet files seamlessly with this high-performance, Rust-based command-line tool.

STATUS: IN DEVELOPMENT PostQuet: Streaming PostgreSQL to Parquet Exporter PostQuet is a powerful and efficient command-line tool written in Rust that

Per Arneng 4 Apr 11, 2023
๐Ÿ—„๏ธ A simple CLI for converting WARC to Parquet.

warc-parquet ??๏ธ A utility for converting WARC to Parquet. ?? Install The binary may be installed via cargo: $ cargo install warc-parquet To use the c

Max Countryman 89 Jun 5, 2023
Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. ๐Ÿš€

flaco Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. ?? Have a gander at the initial benchmarks

Miles Granger 14 Oct 31, 2022
Library for scripting analyses against crates.io's database dumps

crates.io database dumps Library for scripting analyses against crates.io's database dumps. These database dumps contain all information exposed by th

David Tolnay 52 Dec 14, 2022
ConnectorX - Fastest library to load data from DB to DataFrames in Rust and Python

ConnectorX enables you to load data from databases into Python in the fastest and most memory efficient way.

SFU Database Group 939 Jan 5, 2023
Integration between arrow-rs and extendr

arrow_extendr arrow-extendr is a crate that facilitates the transfer of Apache Arrow memory between R and Rust. It utilizes extendr, the {nanoarrow} R

Josiah Parry 8 Nov 24, 2023
A Rust DataFrame implementation, built on Apache Arrow

Rust DataFrame A dataframe implementation in Rust, powered by Apache Arrow. What is a dataframe? A dataframe is a 2-dimensional tabular data structure

Wakahisa 287 Nov 11, 2022
Official Rust implementation of Apache Arrow

Native Rust implementation of Apache Arrow Welcome to the implementation of Arrow, the popular in-memory columnar format, in Rust. This part of the Ar

The Apache Software Foundation 1.3k Jan 9, 2023
A new arguably faster implementation of Apache Spark from scratch in Rust

vega Previously known as native_spark. Documentation A new, arguably faster, implementation of Apache Spark from scratch in Rust. WIP Framework tested

raja sekar 2.1k Jan 5, 2023
bspipe A Rust implementation of Bidirectional Secure Pipe

bspipe A Rust implementation of Bidirectional Secure Pipe

xufanglu 2 Nov 14, 2022
A fast, powerful, flexible and easy to use open source data analysis and manipulation tool written in Rust

fisher-rs fisher-rs is a Rust library that brings powerful data manipulation and analysis capabilities to Rust developers, inspired by the popular pan

Syed Vilayat Ali Rizvi 5 Aug 31, 2023
A fast, powerful, flexible and easy to use open source data analysis and manipulation tool written in Rust

fisher-rs fisher-rs is a Rust library that brings powerful data manipulation and analysis capabilities to Rust developers, inspired by the popular pan

null 5 Sep 6, 2023
Dataframe structure and operations in Rust

Utah Utah is a Rust crate backed by ndarray for type-conscious, tabular data manipulation with an expressive, functional interface. Note: This crate w

Suchin 139 Sep 26, 2022
A Rust crate that reads and writes tfrecord files

tfrecord-rust The crate provides the functionality to serialize and deserialize TFRecord data format from TensorFlow. Features Provide both high level

null 22 Nov 3, 2022
ndarray: an N-dimensional array with array views, multidimensional slicing, and efficient operations

ndarray The ndarray crate provides an n-dimensional container for general elements and for numerics. Please read the API documentation on docs.rs or t

null 2.6k Jan 7, 2023
Apache Arrow DataFusion and Ballista query engines

DataFusion is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.

The Apache Software Foundation 2.9k Jan 2, 2023
Orkhon: ML Inference Framework and Server Runtime

Orkhon: ML Inference Framework and Server Runtime Latest Release License Build Status Downloads Gitter What is it? Orkhon is Rust framework for Machin

Theo M. Bulut 129 Dec 21, 2022
Tiny, no-nonsense, self-contained, Tensorflow and ONNX inference

Sonos' Neural Network inference engine. This project used to be called tfdeploy, or Tensorflow-deploy-rust. What ? tract is a Neural Network inference

Sonos, Inc. 1.5k Jan 2, 2023
Provides a way to use enums to describe and execute ordered data pipelines. ๐Ÿฆ€๐Ÿพ

enum_pipline Provides a way to use enums to describe and execute ordered data pipelines. ?? ?? I needed a succinct way to describe 2d pixel map operat

Ben Greenier 0 Oct 29, 2021