A highly efficient daemon for streaming data from Kafka into Delta Lake

Overview

kafka-delta-ingest

The kafka-delta-ingest project aims to build a highly efficient daemon for streaming data through Apache Kafka into Delta Lake.

This project is currently highly experimental and evolving in tandem with the delta-rs bindings.

Developing

  • Compile: cargo build

  • Launch Kafka - docker-compose up

  • Run kafka-delta-ingest (with a short 10s allowed_latency):

RUST_LOG=debug cargo run ingest example ./tests/data/example --allowed_latency 10 -t 'modified_date: substr(modified,`0`,`10`)' 'kafka_offset: kafka.offset'
  • In separate shell, produce messages to example topic, e.g.:

echo "{\"id\":\"1\",\"value\":1,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"2\",\"value\":2,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"3\",\"value\":3,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"4\",\"value\":4,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"5\",\"value\":5,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"6\",\"value\":6,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"7\",\"value\":7,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"8\",\"value\":8,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"9\",\"value\":9,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"10\",\"value\":10,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
  • Watch the delta table folder for new files

    • watch ls tests/data/example

    • cat tests/data/example/_delta_logs/00000000000000000001.json

  • Check committed offsets

$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group kafka-delta-ingest:example

Tests

  • For unit tests, run cargo test.

  • Integration tests that depend on Kafka are currently marked with #[ignore]. Run RUST_LOG=debug cargo test — --ignored --nocapture to run these locally after running docker-compose up to create a local Kafka.

Comments
  • Kafka timestamp and timestamp type meta

    Kafka timestamp and timestamp type meta

    This PR fixes handling of the Kafka timestamp well-known meta field and adds support for "timestamp_type" so they can be extracted from the Kafka message and merged into the delta record.

    opened by xianwill 9
  • No snapshot or version 0 found

    No snapshot or version 0 found

    Hello guys, I'm trying to set up kafka-delta-ingest with AWS S3, but get an error

    Ingest service exited with error DeltaTable { source: NotATable("No snapshot or version 0 found, perhaps s3://mybucket/rust-deltalake/raw.test.mytopic is an empty dir?") }
    

    I'm running the command in a local Docker container using a local Kafka broker.

    command: ["/usr/local/bin/kafka-delta-ingest", "ingest",
                  "raw.test.mytopic",                                                   
                  "s3://mybucket/rust-deltalake/raw.test.mytopic", 
                  "--allowed_latency", "300",
                  "--kafka", "host.docker.internal:9092",
                  "--consumer_group_id", "my_consumer_group"]
    

    Did I miss a step or a configuration option? Should I create the empty deltalake myself?

    Thanks, T

    opened by timopetmanson 4
  • Kafka-connect sink connector

    Kafka-connect sink connector

    Hello, I would like to know if it's in the roadmap of this project to work on a kafka-connect sink connector more than a daemon.

    kafka -> deltaLakeSinkConnector -> deltaLake

    Thank you all

    opened by raphaelauv 4
  • Use delta log transactions for offset tracking

    Use delta log transactions for offset tracking

    WIP, but opening for early review + CI tests, since there's 3 possible way of this test to be executed, depending on how OS will treat threads distribution

    opened by mosyp 4
  • Add seek-offsets config to set the starting point for the kafka ingestion

    Add seek-offsets config to set the starting point for the kafka ingestion

    The current approach with relying on kafka seeks is unreliable since consumer could ingest topics from earliest before the actual seek and as such corrupt the delta store. Just adding the filtering on the consumed offsets/stored offsets as with txn actions is not enough since there's more places where it could crash, e.g. the rebalance event with writers reseek and state clearance.

    As such, we're moving towards the well tested and bullet proof usage of txn actions which are guarded by delta protocol and optimistic concurrency loop with dynamodb locking.

    The introducing changes will write first the delta log version with txn actions for each partition that are given by startingOffsets param. This is done only once by the very first succeeding writer. Once this is achieved, then the delta table is protected from duplicates and kafka seek inconsistency by delta protocol.

    opened by mosyp 2
  • no support for gzip?

    no support for gzip?

    Here's a test case with redpanda and vector.

    git clone https://github.com/bbigras/test-kdi-gzip

    docker-compose up -d
    # start kdi
    # add a couple of lines to the `log` file
    # wait for one "Delta write for version x has completed in x millis"
    # stop kdi
    
    # uncomment "compression" in vector.toml
    docker-compose up -d --force-recreate
    
    # start kdi again
    

    run kdi with:

    target/release/kafka-delta-ingest ingest my-topic ~/checkout_folder/delta \
      --checkpoints \
      -l 5 \
      --max_messages_per_batch 2 \
      --kafka 127.0.0.1:9092 \
      -K "auto.offset.reset=earliest" \
      -t \
      	  'date: substr(timestamp, `0`, `10`)' \
      	  'message: message' \
    	  'timestamp: timestamp' \
          'meta.kafka.offset: kafka.offset' \
          'meta.kafka.partition: kafka.partition' \
          'meta.kafka.topic: kafka.topic'
    

    and you'll get:

    [2021-10-22T20:06:43Z ERROR kafka_delta_ingest] Error getting BorrowedMessage while processing stream KafkaError (Message consumption error: NotImplemented (Local: Not implemented))
    
    opened by bbigras 2
  • How to ingest the key?

    How to ingest the key?

    I have looked in the repo but could not find the configuration to include the key while ingesting. Can someone help with this please?

    Sample message in my topic: student_id,{"first_name":"john","last_name":"doe","courses":["math","english"]} I want to write all these fields (including the key student_id) to the Delta table.

    opened by rymail 1
  • Remove async API from stats logger

    Remove async API from stats logger

    I need to introduce some buffering for metric reporting to reduce async overhead and this is somewhat of a lo-pro fix so this PR might linger as a draft for a while.

    Eventually fixes https://github.com/delta-io/kafka-delta-ingest/issues/24

    opened by xianwill 1
  • Update arrow_schema from metadata after write

    Update arrow_schema from metadata after write

    arrow_schema_ref is stale once the arrow writer is created. However it's expected that delta table's schema could change, but it's not going to be tracked.

    AC: Update arrow_schema_ref from table.get_metadata once in a while, after writers or new version for example.

    bug 
    opened by mosyp 1
  • The buffer_len function in DeltaWriter is inefficient

    The buffer_len function in DeltaWriter is inefficient

    Description

    The body of buffer_len is

    self.cursor.data().len()
    

    The .data() call is copying all bytes from cursor and returns new vector. Since we just need a len() this is inefficient, moreover it's being called on each new message. I couldn't find a possible easy workaround for it, also we don't have an access to internal buffer in in-memory cursor. However it seems that introducing a new function in parquet::file::writer::InMemoryWriteableCursor should resolve that, e.g

        pub fn len(&self) -> usize {
            let inner = self.buffer.lock().unwrap();
            inner.get_ref().len()
        }
    
    enhancement 
    opened by mosyp 1
  • Safe seek to the next messages

    Safe seek to the next messages

    Whenever there's reset state in consumer state, it reads latest offsets from delta, adds 1 and seeks to them. However this will not work if the resulting offset is missing (offsets are no contiguous in kafka) or not yet occupied (no new messages since). In latter case the consumer will read messages from the beginning

    bug 
    opened by mosyp 1
  • Can't write to a table with `DECIMAL(38,0)` field.

    Can't write to a table with `DECIMAL(38,0)` field.

    I have a column with type DECIMAL(38,0, but when I try to start ingestion I get the following error:

    Ingest service exited with error DeltaWriteFailed { ending_offsets: "{\"0\":4999}", partition_counts: "{\"0\":5000}", source: Arrow { source: JsonError("Decimal(38, 0) type is not supported") } }
    

    I believe this is a compatibility issue with the www.github.com/delta-io/delta-rs, a help would be appreciated. I am ready to provide more information.

    Thanks in advance !

    opened by denizsurmeli 1
  • Possibility of adding Python bindings to this project

    Possibility of adding Python bindings to this project

    Would it be possible to add Python bindings to this project?

    I think a lot of programmers are comfortable writing Python, but aren't comfortable with Rust. I'm assuming the Python bindings could be added like they were for the delta-rs project. Thoughts?

    opened by MrPowers 1
  • Add Dockerfile, update README

    Add Dockerfile, update README

    • Created a Dockerfile that uses a multi-stage build to avoid a > 4GB image size
    • Added a note in the README about writing to S3 and doing DynamoDB locking
    opened by caseybrown89 0
  • Kafka client collaboration

    Kafka client collaboration

    Hi there,

    I see you are using rdkafka for your Kafka interaction. We (= InfluxDB IOx) implemented our own pure-Rust async client called rskafka which you might find useful. There is also an intro blog post: https://www.influxdata.com/blog/building-simple-pure-rust-async-apache-kafka-client/

    I think you might be missing a feature or two, but nothing fundamental (to my knowledge).

    Let me know if you have questions, or want to chat.

    Cheers, Marco

    PS: Also feel free to close this ticket.

    opened by crepererum 1
  • Add in logic for

    Add in logic for "Create Delta Table if not exist"

    In playing around with KDI I realize it doesn't have logic to "Create Delta Table if not exist". If you point the container to a Filesystem that has no Delta table, it complains: image

    Whereas if you point it to an existing Delta Table, works fine: image

    I think this should be trivial to add in, I created a "KDI Java" just for the heck it of it after chatting with @thovoll and it's a matter of checking if the Trx log version is !(log.snapshot().getVersion() > -1): https://github.com/mdrakiburrahman/kafka-delta-ingest-adls/blob/2802eead5174e5fc00da047470572d5fd4c76981/1.KDI-Java/src/main/java/com/microsoft/kdi/KDI.java#L261

    My use case is I'm pointing KDI at a large number of Change Data Capture Topics from Debezium - so it can start pumping out Delta Tables at scale. I can't assume the Delta Tables already exist - so this would be great to have!

    opened by mdrakiburrahman 0
Owner
Delta Lake
An open-source storage layer that brings ACID transactions to Apache Spark™
Delta Lake
Rust client for Apache Kafka

Kafka Rust Client Project Status This project is starting to be maintained by John Ward, the current status is that I am bringing the project up to da

Yousuf Fauzan 902 Jan 2, 2023
Easy Hadoop Streaming and MapReduce interfaces in Rust

Efflux Efflux is a set of Rust interfaces for MapReduce and Hadoop Streaming. It enables Rust developers to run batch jobs on Hadoop infrastructure wh

Isaac Whitfield 31 Nov 22, 2022
Fluvio is a high-performance distributed streaming platform that's written in Rust

Fluvio is a high-performance distributed streaming platform that's written in Rust, built to make it easy to develop real-time applications.

InfinyOn 1.6k Dec 30, 2022
Twitch data consumer and broadcaster

NeoTwitch Arch Network broadcaster Chat (message buffer) If the message buffer is full then shut down Channel point events If the message buffer is fu

Togglebit 3 Dec 3, 2021
A highly efficient daemon for streaming data from Kafka into Delta Lake

A highly efficient daemon for streaming data from Kafka into Delta Lake

Delta Lake 172 Dec 23, 2022
Easy c̵̰͠r̵̛̠ö̴̪s̶̩̒s̵̭̀-t̶̲͝h̶̯̚r̵̺͐e̷̖̽ḁ̴̍d̶̖̔ ȓ̵͙ė̶͎ḟ̴͙e̸̖͛r̶̖͗ë̶̱́ṉ̵̒ĉ̷̥e̷͚̍ s̷̹͌h̷̲̉a̵̭͋r̷̫̊ḭ̵̊n̷̬͂g̵̦̃ f̶̻̊ơ̵̜ṟ̸̈́ R̵̞̋ù̵̺s̷̖̅ţ̸͗!̸̼͋

Rust S̵̓i̸̓n̵̉ I̴n̴f̶e̸r̵n̷a̴l mutability! Howdy, friendly Rust developer! Ever had a value get m̵̯̅ð̶͊v̴̮̾ê̴̼͘d away right under your nose just when

null 294 Dec 23, 2022
An AWS Lambda for automatically loading JSON files as they're created into Delta tables

Delta S3 Loader This AWS Lambda serves a singular purpose: bring JSON files from an S3 bucket into Delta Lake. This can be highly useful for legacy or

R. Tyler Croy 4 Jan 12, 2022
⚡️Highly efficient data and string formatting library for Rust.

⚡️Highly efficient data and string formatting library for Rust. ?? Overview Pad and format string slices and generic vectors efficiently with minimal

Firelink Data 3 Dec 21, 2023
tectonicdb is a fast, highly compressed standalone database and streaming protocol for order book ticks.

tectonicdb crate docs.rs crate.io tectonicdb tdb-core tdb-server-core tdb-cli tectonicdb is a fast, highly compressed standalone database and streamin

Ricky Han 525 Dec 23, 2022
New generation decentralized data warehouse and streaming data pipeline

World's first decentralized real-time data warehouse, on your laptop Docs | Demo | Tutorials | Examples | FAQ | Chat Get Started Watch this introducto

kamu 184 Dec 22, 2022
defmt is a highly efficient logging framework that targets resource-constrained devices, like microcontrollers

defmt defmt ("de format", short for "deferred formatting") is a highly efficient logging framework that targets resource-constrained devices, like mic

Knurling 476 Jan 2, 2023
A trading bot written in Rust based on the orderbook delta volume.

The strategy based on the concept of mean reversion. We look for large deviations in the volume delta of BTC-PERP on FTX at a depth of 1. These deviations could be caused by over-enthusiastic and over-leveraged market participants.

Dinesh Pinto 45 Dec 28, 2022
Quick poc of the rsync wire protocol in Rust. Supports delta transfer.

Rsync wire protocol in Rust This is a quick poc of the rsync wire protocol in Rust. It supports delta transfer. The code is really a mess right now, a

LightQuantum 4 Feb 18, 2023
A dead-simple tool for working with data in Kafka

ktool - a tool for Kafka ktool is a dead-simple tool for working with data in Kafka: Copy partitions / topics to disk Replay messages Inspect message

Dom 5 Nov 4, 2022
Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. 🚀

flaco Perhaps the fastest and most memory efficient way to pull data from PostgreSQL into pandas and numpy. ?? Have a gander at the initial benchmarks

Miles Granger 14 Oct 31, 2022
Materialize simplifies application development with streaming data. Incrementally-updated materialized views - in PostgreSQL and in real time. Materialize is powered by Timely Dataflow.

Materialize is a streaming database for real-time applications. Get started Check out our getting started guide. About Materialize lets you ask questi

Materialize, Inc. 4.7k Jan 8, 2023
An example repository on how to start building graph applications on streaming data. Just clone and start building 💻 💪

An example repository on how to start building graph applications on streaming data. Just clone and start building ?? ??

Memgraph 40 Dec 20, 2022
Streaming data over unix sockets, in Rust

Unix-socket based client/server In order to dig into Sōzu channels, I had to dig into the workings of unix sockets. What this repo contains a small so

Emmanuel Bosquet 3 Nov 28, 2022
Library provides a simple API for Google Firestore for create/update/query/streaming/listening data

Firestore for Rust Library provides a simple API for Google Firestore: Create or update documents using Rust structures and Serde; Support for queryin

Abdulla Abdurakhmanov 30 Dec 25, 2022
A high-performance WebSocket integration library for streaming public market data. Used as a key dependency of the `barter-rs` project.

Barter-Data A high-performance WebSocket integration library for streaming public market data from leading cryptocurrency exchanges - batteries includ

Barter 23 Feb 3, 2023