kafka-delta-ingest
The kafka-delta-ingest project aims to build a highly efficient daemon for streaming data through Apache Kafka into Delta Lake.
This project is currently highly experimental and evolving in tandem with the delta-rs bindings.
Developing
-
Compile:
cargo build
-
Launch Kafka -
docker-compose up
-
Run kafka-delta-ingest (with a short 10s allowed_latency):
RUST_LOG=debug cargo run ingest example ./tests/data/example --allowed_latency 10 -t 'modified_date: substr(modified,`0`,`10`)' 'kafka_offset: kafka.offset'
-
In separate shell, produce messages to
example
topic, e.g.:
echo "{\"id\":\"1\",\"value\":1,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"2\",\"value\":2,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"3\",\"value\":3,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"4\",\"value\":4,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"5\",\"value\":5,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"6\",\"value\":6,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"7\",\"value\":7,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"8\",\"value\":8,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"9\",\"value\":9,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
echo "{\"id\":\"10\",\"value\":10,\"modified\":\"2021-03-16T14:38:58Z\"}" | kafkacat -P -b localhost:9092 -t example -p -1;
-
Watch the delta table folder for new files
-
watch ls tests/data/example
-
cat tests/data/example/_delta_logs/00000000000000000001.json
-
-
Check committed offsets
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group kafka-delta-ingest:example
Tests
-
For unit tests, run
cargo test
. -
Integration tests that depend on Kafka are currently marked with
#[ignore]
. RunRUST_LOG=debug cargo test — --ignored --nocapture
to run these locally after runningdocker-compose up
to create a local Kafka.