High-level async Cassandra client written in 100% Rust.

Overview

CDRS tokio crates.io version build status

CDRS tokio - async Apache Cassandra driver using tokio

CDRS is production-ready Apache Cassandra driver written in pure Rust. Focuses on providing high level of configurability to suit most use cases at any scale, as its Java counterpart, while also leveraging the safety and performance of Rust.

Features

  • Asynchronous API;
  • TCP/TLS connection (rustls);
  • Topology-aware dynamic and configurable load balancing;
  • Configurable connection strategies and pools;
  • Configurable speculative execution;
  • LZ4, Snappy compression;
  • Cassandra-to-Rust data serialization/deserialization with custom type support;
  • Pluggable authentication strategies;
  • ScyllaDB support;
  • Server events listening;
  • Multiple CQL version support (3, 4), full spec implementation;
  • Query tracing information;
  • Prepared statements;
  • Query paging;
  • Batch statements;
  • Configurable retry and reconnection policy;
  • Support for interleaved queries;
  • Support for Yugabyte YCQL JSONB;

Performance

Due to high configurability of CDRS, the performance will vary depending on use case. The following benchmarks have been made against the latest (master as of 03-12-2012) versions of respective libraries (except cassandra-cpp: 2.16.0).

  • cdrs-tokio-large-pool - CDRS with node connection pool equal to double of physical CPU cores
  • cdrs-tokio-small-pool - CDRS with a single connection per node
  • scylladb-rust-large-pool - scylla crate with node connection pool equal to double of physical CPU cores
  • scylladb-rust-small-pool - scylla crate with a single connection per node
  • cassandra-cpp - Rust bindings for Datastax C++ Driver, running on multiple threads using Tokio
  • gocql - a driver written in Go

insert benchmark

select benchmark

mixed benchmark

Knowing given use case, CDRS can be optimized for peak performance.

Documentation and examples

Getting started

This example configures a cluster consisting of a single node without authentication, and uses round-robin load balancing. Other options are kept as default.

use cdrs_tokio::cluster::session::{TcpSessionBuilder, SessionBuilder};
use cdrs_tokio::cluster::NodeTcpConfigBuilder;
use cdrs_tokio::load_balancing::RoundRobinLoadBalancingStrategy;
use cdrs_tokio::query::*;

#[tokio::main]
async fn main() {
    let cluster_config = NodeTcpConfigBuilder::new()
        .with_contact_point("127.0.0.1:9042".into())
        .build()
        .await
        .unwrap();
    let session = TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), cluster_config).build();

    let create_ks = "CREATE KEYSPACE IF NOT EXISTS test_ks WITH REPLICATION = { \
                     'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
    session
        .query(create_ks)
        .await
        .expect("Keyspace create error");
}

License

This project is licensed under either of

at your option.

Comments
  • increase in size for a byte array

    increase in size for a byte array

    I am trying to insert a compress string of size ~5Mb after compression as a byte array (Blob). when insert the actual size of the row is 23Mb which pass the database hard limit (16Mb).

    I check the size of the array right before I send it to the driver. There could be an increase of size during the drive processing the data?

    Thank you.

    bug 
    opened by Matansegal 23
  • schema change event is never received

    schema change event is never received

    Consider the following code:

        let user = "cassandra";
        let password = "cassandra";
        let auth = StaticPasswordAuthenticatorProvider::new(&user, &password);
        let config = NodeTcpConfigBuilder::new()
            .with_contact_point("127.0.0.1:9042".into())
            .with_authenticator_provider(Arc::new(auth))
            .build()
            .await
            .unwrap();
    
        let session = TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), config)
            .build()
            .unwrap();
    
        let mut event_recv = session.create_event_receiver();
    
        sleep(Duration::from_secs(3)).await; // let the driver finish connecting to the cluster and registering for the events
    
        let create_ks = "CREATE KEYSPACE IF NOT EXISTS test_events_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
    
        session.query(create_ks).await.unwrap();
    
        let event = timeout(Duration::from_secs(10), event_recv.recv())
            .await
            .unwrap()
            .unwrap();
    

    Before https://github.com/krojew/cdrs-tokio/commit/bc021f4ee93c87bef922f5bf6cbac7b9780c10f6 event_recv.recv() would complete, with that commit it now times out

    opened by rukai 23
  • Prepared statements bug.

    Prepared statements bug.

    Hello there!

    Prepared statement creating only in single node scope. With fixed nodes observablilty and enabled load balancing it become cause of query error:

    ERROR cdrs_tokio::transport: Transport error! error=Server error: ErrorBody { error_code: 9472, message: "Prepared query with ID dde1cb1db179403026499e16ff7ef1f5 not found (either the query was not prepared on this host (maybe the host has been restarted?) or you have prepared too many queries and it has been evicted from the internal cache)", additional_info:
    
    help wanted 
    opened by npatsakula 20
  • Comparison to scylla-rust-driver

    Comparison to scylla-rust-driver

    I recently came across https://github.com/scylladb/scylla-rust-driver And also saw https://www.reddit.com/r/rust/comments/pfuwhf/help_wanted_cdrstokio_road_to_performance_and/ So it seems you have seen scylla-rust-driver and that spurred a lot of the cleanup of cdrs's performance mistakes.

    After evaluating the performance of scylla-rust-driver it still seems to considerably outperform master cdrs-tokio, in both full scale usage and protocol serializing/deserializing.

    But you clearly seem dedicated to cdrs-tokio instead of jumping over to scylla-rust-driver, so what do you think cdrs-tokio does better than scylla-rust-driver in order for you to keep working on it?

    wontfix 
    opened by rukai 19
  • error UnexpectedWriteType(

    error UnexpectedWriteType("CAS")) with multiple tokio tasks

    I am getting error UnexpectedWriteType("CAS")) when doing batch inserts from multiple tasks in the same Session object.

    It fails intermittently on a three node cluster with Cassandra version [cqlsh 6.0.0 | Cassandra 4.0.5 | CQL spec 3.4.5 | Native protocol v5]. It works consistently with a single task.

    On a local Cassandra instance in docker, it work consistently with multiple tasks - version 4.0.6.

    enhancement 
    opened by devrandom 10
  • Scope of protocol functionality

    Scope of protocol functionality

    It seems this projects fork predates cassandra-proto

    I am in need of a cassandra protocol crate that can support response encoding and query decoding for the purposes of implementing a proxy. Would that be out of scope of cdrs-tokio's protocol functionality? Would you be interested in pulling cdrs-tokio's protocol functionality into a cassandra-proto like crate? (I am happy to put in the work to make this happen)

    I am currently looking at getting cassandra-proto cleaned up and improved at https://github.com/shotover/cassandra-proto But this crate is fairly active so it would be a shame to duplicate work. I would be happy to collaborate in any way that makes sense for your project: possibly a subcrate in this repo or possibly another separate repo.

    opened by rukai 7
  • provide Authenticators with authentication ResponseBody

    provide Authenticators with authentication ResponseBody

    for aws keyspaces, the authentication challenge includes a nonce key that is required for signing the auth token (when using aws credentials for cassandra/keyspaces access)

    I think the most straightforward way to do this is to pass the auth challenge response body into Authenticator.auth_token() -- you can see from the aws sigv4 java plugin that this is how the datastax driver works for it's auth plugins

    help wanted wontfix 
    opened by JoshRagem 7
  • Split cassandra-protocol into separate crate

    Split cassandra-protocol into separate crate

    closes https://github.com/krojew/cdrs-tokio/issues/40

    I tried to keep this PR as simple as possible, if there are other cleanups needed lets put them in a follow up PR. (Unless they make this PR simpler of course)

    opened by rukai 6
  • Trouble with dropped connections on Session

    Trouble with dropped connections on Session

    Hello. I am not sure if this is a bug or user error. When I am running my test suite against a database, I have noticed that at least a couple of the tests fairly consistently fail with an error: "Connection closed while waiting for response". I have tried to run the tests in single threaded mode, I have tried adding connections to the connection pool, but nothing seems to work. The number of failures vary, the tests which fail vary, etc. Since it also happens when running with "--jobs 1", I suspect it is not a threading issue.

    At the same time, I see cassandra logs that say things like "Unknown exception in client networking" and "Connection reset by peer". I am running against a single node locally running cassandra test cluster.

    I guess my question is whether this seems to be a bug or whether I am supposed to be handling intermittent connection failures like this in my client code.

    opened by braje 5
  • Why is stream_id moved out of Frame?

    Why is stream_id moved out of Frame?

    Looking at the cassandra protocol spec and our current usage of frame.stream it makes a lot of sense for stream_id to be in Frame. What was the reasoning behind moving it out of Frame?

    opened by rukai 5
  • rustc stack overflow when deriving IntoCdrsValue and TryFromRow if struct contains static string reference

    rustc stack overflow when deriving IntoCdrsValue and TryFromRow if struct contains static string reference

    I have a static string reference in my struct, when I try to compile this piece of code the rust compiler errors with stackoverflow. If having a reference is not supported it should give warning or error instead of expanding the macro to use a code that makes rustc crash.

    #[derive(IntoCdrsValue, TryFromRow, Clone, Debug, Serialize, Deserialize)]
    pub struct EventData {
        pub task_id: String,
        pub agent: String,
        pub event_name: &'static str,
        pub event_time: u32,
        pub interaction_id: String,
    }
    
    

    When I convert the above event_name to a owned type such as String, it is able to complete the code.

    Additional information

    rustc 1.68.0-nightly (270c94e48 2022-12-28)

    async-fn-in-trait is enabled. M1 Pro, ARM processor

    opened by coder3101 4
  • Session can hit errors if used immediately

    Session can hit errors if used immediately

    When a SessionBuilder (e.g. TcpSessionBuilder) build() method is called, a Session is returned. However there are still background tasks that need to complete before the session can actually be used. Two examples I have hit are:

    1. Delayed enabling of event processing - causing the broadcast::receiver returned by create_event_receiver to miss events
    2. Delayed population of ClusterMetadataManager - causing failure to send prepared statement Cannot find node 127.0.0.1:9042 for statement re-preparation!

    I think a reasonable solution to this would look like adding a tokio::sync::oneshot that gets sent when the session tasks have finished initializing and then the build() method can await on it.

    This can be worked around locally by adding sleeps, but that is clearly not ideal.

    opened by rukai 0
  • Support for tracing informations

    Support for tracing informations

    Hello 👋 Is there any way to get tracing informations when making cassandra queries with this crate? I would have hope to get various spans showing the cassandra connections and query executions but I don't find any way to get it. Am I missing something or is this feature not available as of now ?

    enhancement 
    opened by LeBoucEtMistere 1
  • Paging

    Paging

    I was wondering if there is any way of using paging as described here: https://docs.datastax.com/en/developer/java-driver/4.14/manual/core/paging/ ? I am trying to have a select query, which returns many rows, combined to about 60Mb. the performance is slow (~0.8 sec). I think this is a paging issue.

    let response = session
            .query_with_params(
                SELECT_CQL,
                StatementParamsBuilder::new()
                    .with_values(query_values!(id, month))
                    .build(),
            )
            .await?
            .response_body()?
            .into_rows();
    
    wontfix 
    opened by Matansegal 13
Releases(7.0.4)
  • 7.0.4(Dec 19, 2022)

  • 7.0.3(Dec 19, 2022)

    Fixed

    • Fixed serialization of routing key with known indexes.

    Changed

    • Deprecated query_with_param() in Pager, in favor of query_with_params().
    Source code(tar.gz)
    Source code(zip)
  • 7.0.1(Sep 30, 2022)

  • 7.0.0(Sep 21, 2022)

    New

    • Clone implemented for BodyResReady and BodyReqExecute.

    Changed

    • Control connection errors are now logged as warnings, since they're recoverable.
    • Exposed fields of BodyReqAuthResponse and BodyReqExecute.
    • Replaced CInet type with SocketAddr, since it was nothing more than a wrapper.
    Source code(tar.gz)
    Source code(zip)
  • 7.0.0-beta.1(Aug 8, 2022)

    Fixed

    • ExponentialReconnectionSchedule duration overflow.
    • Forgetting real error type in certain transport error situations.
    • Not sending re-preparation statements to correct nodes.
    • Infinite set keyspace notification loop.

    New

    • Protocol V5 support. Please look at official changelog for more information: https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v5.spec#L1419.
    • Support for beta protocols - possibility to connect to beta clusters.
    • From<Decimal> for BigInt.
    • check_envelope_size for Evelope.
    • Error is now Clone.
    • FrameEncoder, FrameDecoder and FrameEncodingFactory responsible for encoding/decoding frames on the wire.
    • with_frame_encoder_factory Session build option.
    • Error impl for CheckEnvelopeSizeError and ParseEnvelopeError.
    • New Error variants for more granular error handling.
    • Node address in Error::Server variant.

    Changed

    • Due to naming changes in V5, frame have been renamed to message, Frame to Envelope and a frame now corresponds to wrapped envelopes, as defined by the protocol.
    • Serialize and FromCursor traits now pass protocol version to implementations.
    • Row::from_frame_body renamed to from_body.
    • ClusterMetadataManager::find_node renamed to find_node_by_rpc_address for consistency.
    • QueryFlags got extended for V5 and now supports Serialize and FromCursor.
    • Session builders now validate given configuration and return a Result.
    • Transport startup now fails gracefully on unexpected server response.
    • CdrsTransport now requires explicit information if messages are a part of initial handshake.
    • ResResultBody::as_rows_metadata and ResponseBody::as_rows_metadata now return a reference to the data.
    • Hash, PartialEq and PartialOrd for PreparedQuery only take id and result_metadata_id into account, since those define equivalence.
    • Updated chrono dependency to work around found CVE.
    Source code(tar.gz)
    Source code(zip)
  • 6.2.0(Apr 19, 2022)

  • 6.1.0(Mar 10, 2022)

  • 6.0.0(Dec 21, 2021)

    This version is a departure from legacy API design, stemming from the sync version migration. Due to large performance issues and lack of dynamic topology handling in earlier versions, a decision has been made to cut the ties and focus on delivering the best functionality without legacy burden. The API surface changes are quite large, but everyone is encouraged to update - the performance improvements and new features cannot be understated.

    New

    • Topology-aware load balancing: TopologyAwareNodeDistanceEvaluator and TopologyAwareLoadBalancingStrategy.
    • New ReconnectionPolicy used when trying to re-establish connections to downed nodes.
    • Error now implements standard Error.
    • SessionBuilder introduced as the preferred way to create a session.
    • Added missing traits for BatchType and QueryFlags.
    • ToString implementation for SimpleServerEvent.
    • Standard trait implementations for event frames.
    • contains_column, is_empty_by_name and is_empty functions for Row.
    • Display implementation for public enums.
    • Missing traits for PreparedMetadata, Value, Consistency and ColType.
    • New PreparedMetadataFlags.
    • New ClusterMetadata representing information about a cluster.
    • Extracted protocol functionality to separate cassandra-protocol crate.
    • Passing final auth data from the server to SaslAuthenticator.
    • SpeculativeExecutionPolicy for speculative execution control.

    Changed

    • All with_name fields or args in the query API are now bool instead of Option<bool>
    • flags field removed from QueryParams (flags are now derived from the other fields at serialization time)
    • Rewritten transport layer for massive performance improvements (including removing bb8). This involves changing a large portion of public API related to transport and server events.
    • Rewritten event mechanism - now you can subscribe to server events via create_event_receiver() in Session.
    • Replaced RowsMetadataFlag, QueryFlags and frame::Flags vectors with bitflags.
    • Changed Target and ChangeType enums to SchemaChangeTarget and SchemaChangeType.
    • The varint type now uses num::BigInt representation (this implies Decimal also uses "big" types).
    • Removed unstable-dynamic-cluster feature, since it wasn't working as expected and introduced performance
      penalty. Dynamic topology handling is now built-in.
    • Removed AsBytes in favor of new Serialize trait due to performance penalty.
    • Removed FromSingleByte and AsByte in favor of From/TryFrom.
    • Removed traits along with async-trait dependency: BatchExecutor, ExecExecutor, PrepareExecutor, QueryExecutor, GetConnection and CdrsSession. Everything is now embedded directly in Session.
    • Load balancing strategy now returns query plans, rather than individual nodes, and operates on cluster metadata.
    • Removed SingleNode load balancing strategy.
    • Removed empty SimpleError.
    • Renamed connect_generic_static to connect_generic.
    • Removed GetRetryPolicy.
    • Renamed ChangeSchemeOptions to SchemaChangeOptions.
    • Protocol version can now be selected at run time.
    • Value now directly contains the value in the Some variant instead of a separate body field.
    • Consistent naming convention in all builders.
    • Split protocol-level parameters from high-level statement parameters (QueryParams vs StatementParams) and simplified API.
    • add_query_prepared for batch queries now takes PreparedQuery by reference.
    Source code(tar.gz)
    Source code(zip)
  • 5.0.0(Jul 30, 2021)

  • 4.0.0(Apr 27, 2021)

    Fixed

    • Build problems with Rustls.
    • TLS connections sometimes not flushing all data.
    • Not setting current namespace when not using an authenticator.

    New

    • New connect_generic_* functions allowing custom connection configurations (see generic_connection.rs for example usage).
    • Possibility to use custom error types which implement FromCdrsError throughout the crate.
    • Consistency now implements FromStr.
    • Pagers can be converted into PagerState.
    • Support for v4 marshaled types.
    • Copy, Clone, Ord, PartialOrd, Eq, Hash for Opcode.
    • Customizable query retry policies with built-in FallthroughRetrySession and DefaultRetryPolicy.

    Changed

    • TCP configuration now owns contained data - no need to keep it alive while the config is alive.
    • ExecPager is now public.
    • Bytes now implements From for supported types, instead of Into.
    • Moved some generic types to associated types, thus removing a lot of type passing.
    • SessionPager no longer needs mutable session.
    • A lot of names have been migrated to idiomatic Rust (mainly upper camel case abbreviations).
    Source code(tar.gz)
    Source code(zip)
  • 3.0.0(Mar 2, 2021)

    Fixed

    • Remembering USEd keyspaces across connections.
    • Race condition on query id overflow.

    Changed

    • Removed deprecated PasswordAuthenticator.
    • Removed unused Compressor trait.
    • Large API cleanup.
    • Renamed IntoBytes to AsBytes.
    • Authenticator can now be created at runtime - removed static type parameter.
    • Removed unneeded memory allocations when parsing data.
    Source code(tar.gz)
    Source code(zip)
  • 2.1.0(Feb 3, 2021)

  • 2.0.0(Jan 5, 2021)

    • Support for NonZero* types.
    • Support for chrono NaiveDateTime and DateTime<Utc>.
    • Update tokio to 1.0.
    • Pager supporting QueryValues and consistency.
    Source code(tar.gz)
    Source code(zip)
Owner
Kamil Rojewski
Kamil Rojewski
📺 Netflix in Rust/ React-TS/ NextJS, Actix-Web, Async Apollo-GraphQl, Cassandra/ ScyllaDB, Async SQLx, Kafka, Redis, Tokio, Actix, Elasticsearch, Influxdb Iox, Tensorflow, AWS

Fullstack Movie Streaming Platform ?? Netflix in RUST/ NextJS, Actix-Web, Async Apollo-GraphQl, Cassandra/ ScyllaDB, Async SQLx, Spark, Kafka, Redis,

null 34 Apr 17, 2023
Lightweight async Redis client with connection pooling written in pure Rust and 100% memory safe

redi-rs (or redirs) redi-rs is a Lightweight Redis client with connection pooling written in Rust and 100% memory safe redi-rs is a Redis client writt

Oğuz Türkay 4 May 20, 2023
Cassandra (CQL) driver for Rust, using the DataStax C/C++ driver under the covers.

cassandra-cpp This is a maintained Rust project that exposes the DataStax cpp driver at https://github.com/datastax/cpp-driver/ in a somewhat-sane cra

null 93 Jan 7, 2023
ORM for ScyllaDb and Cassandra

ScyllaDb/Cassandra Object-Relation Mapper Features This library contains several crates with the following features: Automatic map tables to Rust stru

null 36 Jan 1, 2023
Quick Pool: High Performance Rust Async Resource Pool

Quick Pool High Performance Rust Async Resource Pool Usage DBCP Database Backend Adapter Version PostgreSQL tokio-postgres qp-postgres Example use asy

Seungjae Park 13 Aug 23, 2022
A dotfiles manager, with real time fle watching and 100% less sym-links!

Kubo A dotfile manager that watches files in real time. Usage Create a directory called .kubo in $HOME, then create a file called kubo.toml in .kubo.

StandingPad 5 Jul 24, 2023
An async Rust client for SurrealDB's RPC endpoint

An async Rust client for SurrealDB's RPC endpoint This crate serves as a temporary yet complete implementation of an async Rust client to connect to a

Thibault H 12 Jan 21, 2023
An async-ready Phoenix Channels v2 client library in Rust

Phoenix Channels This crate implements a Phoenix Channels (v2) client in Rust. Status NOTE: This client is still a work-in-progress, though it has eno

LiveView Native 22 Jan 7, 2023
Async Lightweight HTTP client using system native library if possible. (Currently under heavy development)

Async Lightweight HTTP Client (aka ALHC) What if we need async but also lightweight http client without using such a large library like reqwest, isahc

SteveXMH 7 Dec 15, 2022
The rust client for CeresDB. CeresDB is a high-performance, distributed, schema-less, cloud native time-series database that can handle both time-series and analytics workloads.

The rust client for CeresDB. CeresDB is a high-performance, distributed, schema-less, cloud native time-series database that can handle both time-series and analytics workloads.

null 12 Nov 18, 2022
A very WIP RISCV64 OS written in Rust to learn about low-level and OS development

river A very WIP Rust-based RISCV64 OS for learning. The name is based off of the name RISCV with some added letters: "riscv" + er Make sure you have

James [Undefined] 5 Dec 18, 2022
Affine-client is a client for AFFINE based on Tauri

Affine Client affine-client is a client for AFFINE based on Tauri Supported Platforms Windows Linux MacOS Download https://github.com/m1911star/affine

Horus 216 Dec 25, 2022
🧰 The Rust SQL Toolkit. An async, pure Rust SQL crate featuring compile-time checked queries without a DSL. Supports PostgreSQL, MySQL, SQLite, and MSSQL.

SQLx ?? The Rust SQL Toolkit Install | Usage | Docs Built with ❤️ by The LaunchBadge team SQLx is an async, pure Rust† SQL crate featuring compile-tim

launchbadge 7.6k Dec 31, 2022
TDS 7.2+ (mssql / Microsoft SQL Server) async driver for rust

Tiberius A native Microsoft SQL Server (TDS) client for Rust. Supported SQL Server versions Version Support level Notes 2019 Tested on CI 2017 Tested

Prisma 189 Dec 25, 2022
Simple, async embedded Rust

Cntrlr - Simple, asynchronous embedded Cntrlr is an all-in-one embedded platform for writing simple asynchronous applications on top of common hobbyis

Branan Riley 11 Jun 3, 2021
Rust async runtime based on io-uring.

Monoio A thread-per-core Rust runtime with io_uring. 中文说明 Design Goal As a runtime based on io_uring, Monoio is designed to be the most efficient and

Bytedance Inc. 2.4k Jan 6, 2023
🐚 An async & dynamic ORM for Rust

SeaORM ?? An async & dynamic ORM for Rust SeaORM SeaORM is a relational ORM to help you build web services in Rust with the familiarity of dynamic lan

SeaQL 3.5k Jan 6, 2023
Dataloader-rs - Rust implementation of Facebook's DataLoader using async-await.

Dataloader Rust implementation of Facebook's DataLoader using async-await. Documentation Features Batching load requests with caching Batching load re

cksac 229 Nov 27, 2022
Automatically deleted async I/O temporary files in Rust

async-tempfile Provides the TempFile struct, an asynchronous wrapper based on tokio::fs for temporary files that will be automatically deleted when th

Markus Mayer 3 Jan 4, 2023