A highly efficient daemon for streaming data from Kafka into Delta Lake

Overview

kafka-delta-ingest

The kafka-delta-ingest project aims to build a highly efficient daemon for streaming data through Apache Kafka into Delta Lake.

This project is currently highly experimental and evolving in tandem with the delta-rs bindings.

Features

  • Multiple worker processes per stream

  • Basic transformations within message

  • Statsd metric output

See the design doc for more details.

Example

The repository includes an example for trying out the application locally with some fake web request data.

The included docker-compose.yml contains kafka and localstack services you can run kafka-delta-ingest against locally.

Starting Worker Processes

  1. Launch test services - docker-compose up setup

  2. Compile: cargo build

  3. Run kafka-delta-ingest against the web_requests example topic and table (customize arguments as desired):

export AWS_ENDPOINT_URL=http://0.0.0.0:4566
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test

RUST_LOG=debug cargo run ingest web_requests ./tests/data/web_requests \
  -l 60 \
  -a web_requests \
  -K "auto.offset.reset=earliest" \
  -t 'date: substr(meta.producer.timestamp, `0`, `10`)' \
      'meta.kafka.offset: kafka.offset' \
      'meta.kafka.partition: kafka.partition' \
      'meta.kafka.topic: kafka.topic'

To clean data from previous local runs, execute ./bin/clean-example-data.sh. You’ll need to do this if you destroy your Kafka container between runs since your delta log directory will be out of sync with Kafka offsets.

Example Data

A tarball containing 100K line-delimited JSON messages is included in tests/json/web_requests-100K.json.tar.gz. Running ./bin/extract-example-json.sh will unpack this to the expected location.

Pretty-printed example from the file
{
  "meta": {
    "producer": {
      "timestamp": "2021-03-24T15:06:17.321710+00:00"
    }
  },
  "method": "DELETE",
  "session_id": "7c28bcf9-be26-4d0b-931a-3374ab4bb458",
  "status": 204,
  "url": "http://www.youku.com",
  "uuid": "831c6afa-375c-4988-b248-096f9ed101f8"
}

After extracting the example data, you’ll need to play this onto the web_requests topic of the local Kafka container.

Note
URLs sampled for the test data are sourced from Wikipedia’s list of most popular websites - https://en.wikipedia.org/wiki/List_of_most_popular_websites.
Inspect example output
  • List data files - ls tests/data/web_requests/date=2021-03-24

  • List delta log files - ls tests/data/web_requests/_delta_log

  • Show some parquet data (using parquet-tools)

    • parquet-tools show tests/data/web_requests/date=2021-03-24/

Kafka SSL

In case you have Kafka topics secured by SSL client certificates, you can specify these secrets as environment variables.

For the cert chain include the PEM content as an environment variable named KAFKA_DELTA_INGEST_CERT. For the cert private key include the PEM content as an environment variable named KAFKA_DELTA_INGEST_KEY.

These will be set as the ssl.certificate.pem and ssl.key.pem Kafka settings respectively.

Make sure to provide the additional option:

-K security.protocol=SSL

when invoking the cli command as well.

Developing

Make sure the docker-compose setup has been ran, and execute cargo test to run unit and integration tests.

Comments
  • Kafka timestamp and timestamp type meta

    Kafka timestamp and timestamp type meta

    This PR fixes handling of the Kafka timestamp well-known meta field and adds support for "timestamp_type" so they can be extracted from the Kafka message and merged into the delta record.

    opened by xianwill 9
  • No snapshot or version 0 found

    No snapshot or version 0 found

    Hello guys, I'm trying to set up kafka-delta-ingest with AWS S3, but get an error

    Ingest service exited with error DeltaTable { source: NotATable("No snapshot or version 0 found, perhaps s3://mybucket/rust-deltalake/raw.test.mytopic is an empty dir?") }
    

    I'm running the command in a local Docker container using a local Kafka broker.

    command: ["/usr/local/bin/kafka-delta-ingest", "ingest",
                  "raw.test.mytopic",                                                   
                  "s3://mybucket/rust-deltalake/raw.test.mytopic", 
                  "--allowed_latency", "300",
                  "--kafka", "host.docker.internal:9092",
                  "--consumer_group_id", "my_consumer_group"]
    

    Did I miss a step or a configuration option? Should I create the empty deltalake myself?

    Thanks, T

    opened by timopetmanson 4
  • Kafka-connect sink connector

    Kafka-connect sink connector

    Hello, I would like to know if it's in the roadmap of this project to work on a kafka-connect sink connector more than a daemon.

    kafka -> deltaLakeSinkConnector -> deltaLake

    Thank you all

    opened by raphaelauv 4
  • Use delta log transactions for offset tracking

    Use delta log transactions for offset tracking

    WIP, but opening for early review + CI tests, since there's 3 possible way of this test to be executed, depending on how OS will treat threads distribution

    opened by mosyp 4
  • Add seek-offsets config to set the starting point for the kafka ingestion

    Add seek-offsets config to set the starting point for the kafka ingestion

    The current approach with relying on kafka seeks is unreliable since consumer could ingest topics from earliest before the actual seek and as such corrupt the delta store. Just adding the filtering on the consumed offsets/stored offsets as with txn actions is not enough since there's more places where it could crash, e.g. the rebalance event with writers reseek and state clearance.

    As such, we're moving towards the well tested and bullet proof usage of txn actions which are guarded by delta protocol and optimistic concurrency loop with dynamodb locking.

    The introducing changes will write first the delta log version with txn actions for each partition that are given by startingOffsets param. This is done only once by the very first succeeding writer. Once this is achieved, then the delta table is protected from duplicates and kafka seek inconsistency by delta protocol.

    opened by mosyp 2
  • no support for gzip?

    no support for gzip?

    Here's a test case with redpanda and vector.

    git clone https://github.com/bbigras/test-kdi-gzip

    docker-compose up -d
    # start kdi
    # add a couple of lines to the `log` file
    # wait for one "Delta write for version x has completed in x millis"
    # stop kdi
    
    # uncomment "compression" in vector.toml
    docker-compose up -d --force-recreate
    
    # start kdi again
    

    run kdi with:

    target/release/kafka-delta-ingest ingest my-topic ~/checkout_folder/delta \
      --checkpoints \
      -l 5 \
      --max_messages_per_batch 2 \
      --kafka 127.0.0.1:9092 \
      -K "auto.offset.reset=earliest" \
      -t \
      	  'date: substr(timestamp, `0`, `10`)' \
      	  'message: message' \
    	  'timestamp: timestamp' \
          'meta.kafka.offset: kafka.offset' \
          'meta.kafka.partition: kafka.partition' \
          'meta.kafka.topic: kafka.topic'
    

    and you'll get:

    [2021-10-22T20:06:43Z ERROR kafka_delta_ingest] Error getting BorrowedMessage while processing stream KafkaError (Message consumption error: NotImplemented (Local: Not implemented))
    
    opened by bbigras 2
  • How to ingest the key?

    How to ingest the key?

    I have looked in the repo but could not find the configuration to include the key while ingesting. Can someone help with this please?

    Sample message in my topic: student_id,{"first_name":"john","last_name":"doe","courses":["math","english"]} I want to write all these fields (including the key student_id) to the Delta table.

    opened by rymail 1
  • Remove async API from stats logger

    Remove async API from stats logger

    I need to introduce some buffering for metric reporting to reduce async overhead and this is somewhat of a lo-pro fix so this PR might linger as a draft for a while.

    Eventually fixes https://github.com/delta-io/kafka-delta-ingest/issues/24

    opened by xianwill 1
  • Update arrow_schema from metadata after write

    Update arrow_schema from metadata after write

    arrow_schema_ref is stale once the arrow writer is created. However it's expected that delta table's schema could change, but it's not going to be tracked.

    AC: Update arrow_schema_ref from table.get_metadata once in a while, after writers or new version for example.

    bug 
    opened by mosyp 1
  • The buffer_len function in DeltaWriter is inefficient

    The buffer_len function in DeltaWriter is inefficient

    Description

    The body of buffer_len is

    self.cursor.data().len()
    

    The .data() call is copying all bytes from cursor and returns new vector. Since we just need a len() this is inefficient, moreover it's being called on each new message. I couldn't find a possible easy workaround for it, also we don't have an access to internal buffer in in-memory cursor. However it seems that introducing a new function in parquet::file::writer::InMemoryWriteableCursor should resolve that, e.g

        pub fn len(&self) -> usize {
            let inner = self.buffer.lock().unwrap();
            inner.get_ref().len()
        }
    
    enhancement 
    opened by mosyp 1
  • Safe seek to the next messages

    Safe seek to the next messages

    Whenever there's reset state in consumer state, it reads latest offsets from delta, adds 1 and seeks to them. However this will not work if the resulting offset is missing (offsets are no contiguous in kafka) or not yet occupied (no new messages since). In latter case the consumer will read messages from the beginning

    bug 
    opened by mosyp 1
  • Can't write to a table with `DECIMAL(38,0)` field.

    Can't write to a table with `DECIMAL(38,0)` field.

    I have a column with type DECIMAL(38,0, but when I try to start ingestion I get the following error:

    Ingest service exited with error DeltaWriteFailed { ending_offsets: "{\"0\":4999}", partition_counts: "{\"0\":5000}", source: Arrow { source: JsonError("Decimal(38, 0) type is not supported") } }
    

    I believe this is a compatibility issue with the www.github.com/delta-io/delta-rs, a help would be appreciated. I am ready to provide more information.

    Thanks in advance !

    opened by denizsurmeli 1
  • Possibility of adding Python bindings to this project

    Possibility of adding Python bindings to this project

    Would it be possible to add Python bindings to this project?

    I think a lot of programmers are comfortable writing Python, but aren't comfortable with Rust. I'm assuming the Python bindings could be added like they were for the delta-rs project. Thoughts?

    opened by MrPowers 1
  • Add Dockerfile, update README

    Add Dockerfile, update README

    • Created a Dockerfile that uses a multi-stage build to avoid a > 4GB image size
    • Added a note in the README about writing to S3 and doing DynamoDB locking
    opened by caseybrown89 0
  • Kafka client collaboration

    Kafka client collaboration

    Hi there,

    I see you are using rdkafka for your Kafka interaction. We (= InfluxDB IOx) implemented our own pure-Rust async client called rskafka which you might find useful. There is also an intro blog post: https://www.influxdata.com/blog/building-simple-pure-rust-async-apache-kafka-client/

    I think you might be missing a feature or two, but nothing fundamental (to my knowledge).

    Let me know if you have questions, or want to chat.

    Cheers, Marco

    PS: Also feel free to close this ticket.

    opened by crepererum 1
  • Add in logic for

    Add in logic for "Create Delta Table if not exist"

    In playing around with KDI I realize it doesn't have logic to "Create Delta Table if not exist". If you point the container to a Filesystem that has no Delta table, it complains: image

    Whereas if you point it to an existing Delta Table, works fine: image

    I think this should be trivial to add in, I created a "KDI Java" just for the heck it of it after chatting with @thovoll and it's a matter of checking if the Trx log version is !(log.snapshot().getVersion() > -1): https://github.com/mdrakiburrahman/kafka-delta-ingest-adls/blob/2802eead5174e5fc00da047470572d5fd4c76981/1.KDI-Java/src/main/java/com/microsoft/kdi/KDI.java#L261

    My use case is I'm pointing KDI at a large number of Change Data Capture Topics from Debezium - so it can start pumping out Delta Tables at scale. I can't assume the Delta Tables already exist - so this would be great to have!

    opened by mdrakiburrahman 0
Owner
Delta Lake
An open-source storage layer that brings ACID transactions to Apache Spark™
Delta Lake
New generation decentralized data warehouse and streaming data pipeline

World's first decentralized real-time data warehouse, on your laptop Docs | Demo | Tutorials | Examples | FAQ | Chat Get Started Watch this introducto

kamu 184 Dec 22, 2022
Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. 🚀

flaco Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. ?? Have a gander at the initial benchmarks

Miles Granger 14 Oct 31, 2022
An example repository on how to start building graph applications on streaming data. Just clone and start building 💻 💪

An example repository on how to start building graph applications on streaming data. Just clone and start building ?? ??

Memgraph 40 Dec 20, 2022
Bytewax is an open source Python framework for building highly scalable dataflows.

Bytewax Bytewax is an open source Python framework for building highly scalable dataflows. Bytewax uses PyO3 to provide Python bindings to the Timely

Bytewax 289 Jan 6, 2023
🦖 Evolve your fixed length data files into Apache Arrow tables, fully parallelized!

?? Evolve your fixed length data files into Apache Arrow tables, fully parallelized! ?? Overview ... ?? Installation The easiest way to install evolut

Firelink Data 3 Dec 22, 2023
A Modern Real-Time Data Processing & Analytics DBMS with Cloud-Native Architecture, built to make the Data Cloud easy

A Modern Real-Time Data Processing & Analytics DBMS with Cloud-Native Architecture, built to make the Data Cloud easy

Datafuse Labs 5k Jan 9, 2023
This library provides a data view for reading and writing data in a byte array.

Docs This library provides a data view for reading and writing data in a byte array. This library requires feature(generic_const_exprs) to be enabled.

null 2 Nov 2, 2022
ndarray: an N-dimensional array with array views, multidimensional slicing, and efficient operations

ndarray The ndarray crate provides an n-dimensional container for general elements and for numerics. Please read the API documentation on docs.rs or t

null 2.6k Jan 7, 2023
Dig into ClickHouse with TUI interface. PRE ALPHA version, everything will be changed.

chdig Dig into ClickHouse with TUI interface. Motivation The idea is came from everyday digging into various ClickHouse issues. ClickHouse has a appro

Azat Khuzhin 5 Feb 10, 2023
High-performance runtime for data analytics applications

Weld Documentation Weld is a language and runtime for improving the performance of data-intensive applications. It optimizes across libraries and func

Weld 2.9k Dec 28, 2022
A high-performance, high-reliability observability data pipeline.

Quickstart • Docs • Guides • Integrations • Chat • Download What is Vector? Vector is a high-performance, end-to-end (agent & aggregator) observabilit

Timber 12.1k Jan 2, 2023
Rayon: A data parallelism library for Rust

Rayon Rayon is a data-parallelism library for Rust. It is extremely lightweight and makes it easy to convert a sequential computation into a parallel

null 7.8k Jan 8, 2023
Quickwit is a big data search engine.

Quickwit This repository will host Quickwit, the big data search engine developed by Quickwit Inc. We will progressively polish and opensource our cod

Quickwit Inc. 2.9k Jan 7, 2023
DataFrame / Series data processing in Rust

black-jack While PRs are welcome, the approach taken only allows for concrete types (String, f64, i64, ...) I'm not sure this is the way to go. I want

Miles Granger 30 Dec 10, 2022
A Modern Real-Time Data Processing & Analytics DBMS with Cloud-Native Architecture, written in Rust

Datafuse Modern Real-Time Data Processing & Analytics DBMS with Cloud-Native Architecture Datafuse is a Real-Time Data Processing & Analytics DBMS wit

Datafuse Labs 5k Jan 4, 2023
ConnectorX - Fastest library to load data from DB to DataFrames in Rust and Python

ConnectorX enables you to load data from databases into Python in the fastest and most memory efficient way.

SFU Database Group 939 Jan 5, 2023
A cross-platform library to retrieve performance statistics data.

A toolkit designed to be a foundation for applications to monitor their performance.

Lark Technologies Pte. Ltd. 155 Nov 12, 2022
TensorBase is a new big data warehousing with modern efforts.

TensorBase is a new big data warehousing with modern efforts.

null 1.3k Jan 4, 2023
Fill Apache Arrow record batches from an ODBC data source in Rust.

arrow-odbc Fill Apache Arrow arrays from ODBC data sources. This crate is build on top of the arrow and odbc-api crate and enables you to read the dat

Markus Klein 21 Dec 27, 2022