When configuring multiple scylla nodes, insert failes with an error "failed to fill whole buffer":
thread '<unnamed>' panicked at 'Failed to insert data: Io(Custom { kind: UnexpectedEof, error: StringError("failed to fill whole buffer") })', libcore/result.rs:1009:5
stack backtrace:
0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
at libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
1: std::sys_common::backtrace::print
at libstd/sys_common/backtrace.rs:71
at libstd/sys_common/backtrace.rs:59
2: std::panicking::default_hook::{{closure}}
at libstd/panicking.rs:211
3: std::panicking::default_hook
at libstd/panicking.rs:227
4: std::panicking::rust_panic_with_hook
at libstd/panicking.rs:476
5: std::panicking::continue_panic_fmt
at libstd/panicking.rs:390
6: rust_begin_unwind
at libstd/panicking.rs:325
7: core::panicking::panic_fmt
at libcore/panicking.rs:77
8: core::result::unwrap_failed
at libcore/macros.rs:26
9: <core::result::Result<T, E>>::expect
at libcore/result.rs:835
10: <unknown>
at src/main.rs:62
The issue can be reproduced with the following setup:
- Setup scylla cluster with 3 nodes
docker run --name scylla-0 -p 9042:9042 -d scylladb/scylla
# wait until scylla-0 is up: docker exec -it scylla-0 nodetool status
docker run --name scylla-1 -p 9043:9042 -d scylladb/scylla --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' scylla-0)"
docker run --name scylla-2 -p 9044:9042 -d scylladb/scylla --seeds="$(docker inspect --format='{{ .NetworkSettings.IPAddress }}' scylla-0)"
- Create column family within cqlsh:
docker exec -it scylla-0 cqlsh
CREATE KEYSPACE test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2};
use test;
CREATE TABLE data (
id int,
epoch_utc timestamp,
value double,
PRIMARY KEY (id, epoch_utc))
WITH CLUSTERING ORDER BY (epoch_utc DESC)
AND COMPACTION = {
'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': '1'
};
- Cargo.toml
[package]
name = "rust_test_scylla"
version = "0.1.0"
authors = ["Pascal Sachs <[email protected]>"]
edition = "2018"
[dependencies]
cdrs = "^2.0.0-beta.6"
cdrs_helpers_derive = "0.1.0"
rand = "^0.6.5"
[profile.dev]
panic = "abort"
[profile.release]
panic = "abort"
- main.rs
#[macro_use]
extern crate cdrs;
#[macro_use]
extern crate cdrs_helpers_derive;
use std::sync::Arc;
use std::thread;
use std::time::SystemTime;
use rand::prelude::*;
use cdrs::authenticators::NoneAuthenticator;
use cdrs::cluster::session::{new_lz4 as new_session, Session};
use cdrs::cluster::{ClusterTcpConfig, NodeTcpConfigBuilder, TcpConnectionPool};
use cdrs::frame::IntoBytes;
use cdrs::load_balancing::RoundRobinSync;
use cdrs::query::*;
use cdrs::types::prelude::*;
type ScyllaSession = Session<RoundRobinSync<TcpConnectionPool<NoneAuthenticator>>>;
const SCYLLA_NODES: &'static [&'static str] =
&["localhost:9042", "localhost:9043", "localhost:9044"];
const INSERT_DATA: &'static str = "\
INSERT INTO test.data \
(id, epoch_utc, value) \
VALUES (?, ?, ?)";
fn main() {
let nodes = SCYLLA_NODES
.into_iter()
.map(|addr| NodeTcpConfigBuilder::new(addr, NoneAuthenticator {}).build())
.collect();
let cluster_config = ClusterTcpConfig(nodes);
let session: Arc<ScyllaSession> = Arc::new(
new_session(&cluster_config, RoundRobinSync::new())
.expect("Could not connect to scylla cluster"),
);
for i in 0..20 {
let thread_session = session.clone();
thread::spawn(move || {
let mut rng = rand::thread_rng();
let query_insert_data = thread_session
.prepare(INSERT_DATA)
.expect("Failed to prepare insert data query");
let id = i;
let epoch_utc = (1_000
* SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()) as i64;
let value = rng.gen();
let values = DataStruct {
id,
epoch_utc,
value,
}
.into_query_values();
thread_session
.exec_with_values(&query_insert_data, values)
.expect("Failed to insert data");
})
.join()
.expect("thread error");
}
}
#[derive(Clone, Debug, IntoCDRSValue, PartialEq)]
pub struct DataStruct {
id: i32,
epoch_utc: i64,
value: f64,
}
impl DataStruct {
fn into_query_values(self) -> QueryValues {
query_values!(self.id, self.epoch_utc, self.value)
}
}