kafka-delta-ingest
The kafka-delta-ingest project aims to build a highly efficient daemon for streaming data through Apache Kafka into Delta Lake.
This project is currently highly experimental and evolving in tandem with the delta-rs bindings.
Features
-
Multiple worker processes per stream
-
Basic transformations within message
-
Statsd metric output
See the design doc for more details.
Example
The repository includes an example for trying out the application locally with some fake web request data.
The included docker-compose.yml contains kafka and localstack services you can run kafka-delta-ingest
against locally.
Starting Worker Processes
-
Launch test services -
docker-compose up setup
-
Compile:
cargo build
-
Run kafka-delta-ingest against the web_requests example topic and table (customize arguments as desired):
export AWS_ENDPOINT_URL=http://0.0.0.0:4566
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
RUST_LOG=debug cargo run ingest web_requests ./tests/data/web_requests \
-l 60 \
-a web_requests \
-K "auto.offset.reset=earliest" \
-t 'date: substr(meta.producer.timestamp, `0`, `10`)' \
'meta.kafka.offset: kafka.offset' \
'meta.kafka.partition: kafka.partition' \
'meta.kafka.topic: kafka.topic'
To clean data from previous local runs, execute ./bin/clean-example-data.sh
. You’ll need to do this if you destroy your Kafka container between runs since your delta log directory will be out of sync with Kafka offsets.
Example Data
A tarball containing 100K line-delimited JSON messages is included in tests/json/web_requests-100K.json.tar.gz
. Running ./bin/extract-example-json.sh
will unpack this to the expected location.
Pretty-printed example from the file
{
"meta": {
"producer": {
"timestamp": "2021-03-24T15:06:17.321710+00:00"
}
},
"method": "DELETE",
"session_id": "7c28bcf9-be26-4d0b-931a-3374ab4bb458",
"status": 204,
"url": "http://www.youku.com",
"uuid": "831c6afa-375c-4988-b248-096f9ed101f8"
}
After extracting the example data, you’ll need to play this onto the web_requests topic of the local Kafka container.
Note
|
URLs sampled for the test data are sourced from Wikipedia’s list of most popular websites - https://en.wikipedia.org/wiki/List_of_most_popular_websites. |
Inspect example output
-
List data files -
ls tests/data/web_requests/date=2021-03-24
-
List delta log files -
ls tests/data/web_requests/_delta_log
-
Show some parquet data (using parquet-tools)
-
parquet-tools show tests/data/web_requests/date=2021-03-24/
-
Kafka SSL
In case you have Kafka topics secured by SSL client certificates, you can specify these secrets as environment variables.
For the cert chain include the PEM content as an environment variable named KAFKA_DELTA_INGEST_CERT
. For the cert private key include the PEM content as an environment variable named KAFKA_DELTA_INGEST_KEY
.
These will be set as the ssl.certificate.pem
and ssl.key.pem
Kafka settings respectively.
Make sure to provide the additional option:
-K security.protocol=SSL
when invoking the cli command as well.
Developing
Make sure the docker-compose setup has been ran, and execute cargo test
to run unit and integration tests.