A high level async Redis client for Rust built on Tokio and Futures.

Overview

Fred

License License Build Status Crates.io API docs

A high level async Redis client for Rust built on Tokio and Futures.

Example

("foo").await?); // or use a lower level interface for responses to defer parsing, etc let foo: RedisValue = client.get("foo").await?; assert_eq!(foo.as_str().unwrap(), "bar"); let _ = client.quit().await?; Ok(()) } ">
use fred::prelude::*;

#[tokio::main]
async fn main() -> Result<(), RedisError> {
  let config = RedisConfig::default();
  let policy = ReconnectPolicy::default();
  let client = RedisClient::new(config);
  
  // connect to the server, returning a handle to the task that drives the connection
  let _ = client.connect(Some(policy));
  let _ = client.wait_for_connect().await?;
  let _ = client.flushall(false).await?;
 
  // convert responses to many common Rust types
  let foo: Option<String> = client.get("foo").await?;
  assert!(foo.is_none());
  
  let _: () = client.set("foo", "bar", None, None, false).await?;
  // or use turbofish to declare response types
  println!("Foo: {:?}", client.get<String, _>("foo").await?);
  
  // or use a lower level interface for responses to defer parsing, etc
  let foo: RedisValue = client.get("foo").await?;
  assert_eq!(foo.as_str().unwrap(), "bar");
  
  let _ = client.quit().await?;
  Ok(())
}

See the examples for more.

Install

With cargo edit.

cargo add fred

Features

  • Flexible and generic client interfaces.
  • Supports clustered, centralized, and sentinel Redis deployments.
  • Optional built-in reconnection logic with multiple backoff policies.
  • Publish-Subscribe and keyspace events interfaces.
  • Supports transactions.
  • Supports Lua scripts.
  • Supports streaming results from the MONITOR command.
  • Supports custom commands provided by third party modules.
  • Supports TLS connections.
  • Handles cluster rebalancing operations without downtime or errors.
  • Supports streaming interfaces for scanning functions.
  • Options to automatically pipeline requests when possible.
  • Automatically retry requests under bad network conditions.
  • Support for configuring global settings that can affect performance under different network conditions. Callers can configure backpressure settings, when and how the underlying socket is flushed, and how many times requests are attempted.
  • Built-in tracking for network latency and payload size metrics.
  • A client pooling interface to round-robin requests among a pool of clients.
  • Built in support for tracing.

Tracing

This crate supports tracing via the tracing crate. See the tracing info for more information.

This feature is disabled by default, but callers can opt-in via the full-tracing or partial-tracing features.

Logging

To enable logs use the environment variable RUST_LOG with a value of trace, debug, warn, error, or info. See the documentation for env_logger for more information.

When a client is initialized it will generate a unique client name with a prefix of fred-. This name will appear in all logging statements on the client in order to associate client and server operations if logging is enabled on both.

Compile Time Features

Name Default Description
enable-tls x Enable TLS support. This requires OpenSSL (or equivalent) dependencies.
vendored-tls Enable TLS support, using vendored OpenSSL (or equivalent) dependencies, if possible.
ignore-auth-error x Ignore auth errors that occur when a password is supplied but not required.
metrics x Enable the metrics interface to track overall latency, network latency, and request/response sizes.
reconnect-on-auth-error A NOAUTH error is treated the same as a general connection failure and the client will reconnect based on the reconnection policy.
index-map Use IndexMap instead of HashMap as the backing store for Redis Map types. This is useful for testing and may also be useful for callers.
pool-prefer-active x Prefer connected clients over clients in a disconnected state when using the RedisPool interface.
full-tracing Enable full tracing support. This can emit a lot of data so a partial tracing feature is also provided.
partial-tracing Enable partial tracing support, only emitting traces for top level commands and network latency. Note: this has a non-trivial impact on performance.
blocking-encoding Use a blocking task for encoding or decoding frames over a certain size. This can be useful for clients that send or receive large payloads, but will only work when used with a multi-thread Tokio runtime.
network-logs Enable TRACE level logging statements that will print out all data sent to or received from the server.
custom-reconnect-errors Enable an interface for callers to customize the types of errors that should automatically trigger reconnection logic.
monitor Enable an interface for running the MONITOR command.
sentinel-client Enable an interface for communicating directly with Sentinel nodes. This is not necessary to use normal Redis clients behind a sentinel layer.

Environment Variables

Name Default Description
FRED_DISABLE_CERT_VERIFICATION false Disable certificate verification when using TLS features.
FRED_DISABLE_HOST_VERIFICATION false Disable host verification when using TLS features.

These are environment variables because they're dangerous in production and callers should be forced to surface them in a loud and obvious way.

Pipelining

The caller can toggle pipelining via flags on the RedisConfig provided to a client to enable automatic pipelining for commands whenever possible. These settings can drastically affect performance on both the server and client, but further performance tuning may be necessary to avoid issues such as using too much memory on the client or server while buffering commands.

See the global performance tuning functions for more information on how to tune backpressure or other relevant settings related to pipelining.

This module also contains a separate test application that can be used to demonstrate the effects of pipelining. This test application also contains some helpful information on how to use the tracing features.

ACL & Authentication

Prior to the introduction of ACL commands in Redis version 6 clients would authenticate with a single password. If callers are not using the ACL interface, or using Redis version <=5.x, they should configure the client to automatically authenticate by using the password field on the RedisConfig and leaving the username field as None.

If callers are using ACLs and Redis version >=6.x they can configure the client to automatically authenticate by using the username and password fields on the provided RedisConfig.

It is required that the authentication information provided to the RedisConfig allows the client to run CLIENT SETNAME and CLUSTER NODES. Callers can still change users via the auth command later, but it recommended to instead use the username and password provided to the RedisConfig so that the client can automatically authenticate after reconnecting.

If this is not possible callers need to ensure that the default user can run the two commands above. Additionally, it is recommended to move any calls to the auth command inside the on_reconnect block.

Redis Sentinel

To use the Redis Sentinel interface callers should provide a ServerConfig::Sentinel variant when creating the client's RedisConfig. This should contain the host/port tuples for the known sentinel nodes when first creating the client.

The client will automatically update these values in place as sentinel nodes change whenever connections to the primary Redis server close. Callers can inspect these changes with the client_config function on any RedisClient that uses the sentinel interface.

Note: Sentinel connections will use the same authentication and TLS configuration options as the connections to the Redis servers.

Callers can also use the sentinel-client feature to communicate directly with Sentinel nodes.

Customizing Error Handling

The custom-reconnect-errors feature enables an interface on the globals to customize the list of errors that should automatically trigger reconnection logic (if configured).

In many cases applications respond to Redis errors by logging the error, maybe waiting and reconnecting, and then trying again. Whether to do this often depends on the prefix in the error message, and this interface allows callers to specify which errors should be handled this way.

Errors that trigger this can be seen with the on_error function.

Tests

To run the unit and integration tests:

cargo test -- --test-threads=1

OR

# run the tests with default features
./tests/run.sh

OR

# run the tests 3 times, once with default features, once with no features, and once with all features (except chaos monkey)
./tests/run_all.sh

Note: a local Redis server must be running on port 6379, and a clustered deployment must be running on ports 30001 - 30006 for the integration tests to pass.

Scripts are included to download and run centralized and clustered redis servers at any Redis version. These scripts will not make any modifications to your system outside the tests/tmp folder.

export REDIS_VERSION=6.2.2
./tests/scripts/install_redis_centralized.sh
./tests/scripts/install_redis_clustered.sh

Beware: the tests will periodically run flushall.

Contributing

See the contributing documentation for info on adding new commands.

Comments
  • Client not responding

    Client not responding

    Description

    Hi! Thanks for the awesome library.

    I'm having a problem with the Redis client getting stuck before sending the command to the Redis server (it happens with any command). This is happening in a Stream, but didn't try to replicate outside of one.

    The pattern of getting stuck is random, sometimes it runs for tens of stream values and gets stuck and other times it gets stuck on the first value of the stream.

    What I tried

    I tried to track to the best of my abilities where it was getting stuck and I think it's at wait_for_response(rx) in src/utils.rs, but I'm not sure.

    When setting a default timeout it gets hit, otherwise it waits forever.

    I also ran redis-cli monitor on the Redis server and confirmed that the client gets stuck before sending the command.

    Reproducibility

    The minimal code to reproduce it is (UPDATE: this doesn't reproduce the issue, see https://github.com/aembke/fred.rs/issues/13#issuecomment-974429304):

    use std::time::Duration;
    
    use futures::StreamExt;
    use fred::prelude::*;
    use fred::pool::StaticRedisPool;
    
    #[tokio::main]
    async fn main() {
      let pool = StaticRedisPool::new(RedisConfig::default(), 5);
      pool.connect(Some(ReconnectPolicy::default()));
      pool.wait_for_connect().await.unwrap();
    
      let stream = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_millis(100)))
        .then(move |_| {
          let pool = pool.clone();
    
          async move {
            let value: Option<String> = pool.get("key").await.unwrap(); // This call never responds.
    
            value
          }
        });
    
      futures::pin_mut!(stream);
    
      while let Some(value) = stream.next().await {
        println!("{:?}", value);
      }
    }
    

    Cargo.toml:

    [package]
    edition = "2021"
    
    [dependencies]
    fred = { version = "4.2.1", default-features = false, features = [
        "pool-prefer-active",
        "ignore-auth-error",
    ] }
    futures = { version = "0.3.17", default-features = false }
    tokio = { version = "1.13.0", features = ["rt", "macros", "time"] }
    tokio-stream = "0.1.8"
    

    OS and versions

    Rust: 1.56.0 fred: 4.2.1 Redis: 6.2.6 inside Docker OS: Ubuntu 21.04

    opened by dbstratta 21
  • Waiting for exec to return from server

    Waiting for exec to return from server

    Hi, I don't know if it's an intended behavior, but I ran into following situation:

    Suppose you have 10ms latency to Redis. Every 5ms you application needs to run

    MULTI
    HSET key value
    PUBLISH message
    EXEC
    

    I am using the following code to execute this sequence in fred:

    let trx = redis.multi(true).await?;
    trx.hset(key, hashmap! { field => value.into()}).await?;
    trx.publish(channel, message).await?;
    trx.exec().await?;
    

    I'd like to have a writer that executes this code, but waits for the response in a new tokio task every 5ms. This way there will be 2 running requests waiting for responses. I don't know how to achieve this with fred.

    There are 2 issues:

    • Fred allows to call another MULTI once the response from EXEC has arrived. I think new MULTI can be allowed once the EXEC has been sent
    • It's not possible to split the sending the commands and receiving the response. I don't know when it is safe to call second MULTI

    Thanks for help

    enhancement 
    opened by mkurtak 18
  • Different auth for sentinel and redis

    Different auth for sentinel and redis

    Hi. I saw in the example how to connect through sentinel. However, the example seems to rely on the same auth used for both sentinel and redis itself. My company sets it up such that the auth (i.e. username/password) between the two are different, for security purpose. Is there a way I can specific different auths for the two? Thanks!

    enhancement 
    opened by tsukit 11
  • Get on non-existent key returns wrong error

    Get on non-existent key returns wrong error

    Hello, when I try to get a non-existent key I receive this error: Redis Error - kind: Parse, details: Cannot convert to number., but I believe this is the wrong type of error for the triggered action.

    What I would expect

    Since the Redis CLI returns nil maybe the library should just return an Option::None, or a different kind of RedisError like NotFound.

    Personally I think both solutions are right and in an ideal world there should be an option for both behaviors, but for my use case a different kind of error it's preferred since I want to do a specific action on specific errors and the error propagation operator (?) would help me achieve this more simply and in a cleaner way.

    enhancement 
    opened by mercxry 10
  • Connection to Redis Cluster in Minikube

    Connection to Redis Cluster in Minikube

    Hello Alec, I have a problem connecting to a local Redis cluster created in Minikube. If the problem isn't on Fred's side let me know :)

    The setup is relatively simple: there's a single LoadBalancer exposed via minikube tunnel that redirects to a cluster node, and the cluster itself isn't accessible from outside (so the nodes can only talk to each other and the load balancer). Here's the commands I used:

    Using redis-cli on the host with load balancer IP 10.102.101.213 like so:

    redis-cli -c -h 10.102.101.213 -a $REDIS_PASSWORD CLUSTER NODES
    Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
    aceb5bf17a706440a297c5be764f4e4b6a60eb61 172.17.0.7:[email protected] slave 16da11b0423cafe75261b5adbc2eb13a90358cc5 0 1640566876716 2 connected
    a67bf5b0d784c9be05fba218bdc456b484f37c86 172.17.0.2:[email protected] myself,slave 2008db068b8a58a28595b37060d52d3736d04a6d 0 1640566873000 1 connected
    a10e1fe9800e7fb2a376f5ec5e1734658b15c6c9 172.17.0.5:[email protected] master - 0 1640566875000 3 connected 10923-16383
    2008db068b8a58a28595b37060d52d3736d04a6d 172.17.0.6:[email protected] master - 0 1640566874000 1 connected 0-5460
    53cf7c4159f5896deb1c42478c82a6404ac5f56e 172.17.0.3:[email protected] slave a10e1fe9800e7fb2a376f5ec5e1734658b15c6c9 0 1640566876000 3 connected
    16da11b0423cafe75261b5adbc2eb13a90358cc5 :[email protected] master,noaddr - 1640566845731 1640566845730 2 disconnected 5461-10922
    

    all works as expected.

    Trying to connect to the same cluster with Fred (env variable REDIS_URI is the LoadBalancer IP):

    let config = RedisConfig {
                server: ServerConfig::new_clustered([(dotenv!("REDIS_URI").to_string(), 6379 as u16)].to_vec()),
                fail_fast: true,
                pipeline: true,
                blocking: Blocking::Block,
                username: None,
                password: Some(dotenv!("REDIS_PASSWD").to_string()),
                tls: None,
                tracing: false
            };
            println!("Creating client...");
            let client = RedisClient::new(config);
            println!("Created client!");
    
            let policy = ReconnectPolicy::default();
    
            tokio::spawn(client
                .on_error()
                .for_each(|e| async move {
                    println!("Client received connection error: {:?}", e);
                }));
    
            println!("Checked errors...");
    
            tokio::spawn(client
                .on_reconnect()
                .for_each(move |client| async move {
                    println!("Client {} reconnected.", client.id());
                    // select the database each time we connect or reconnect
                    let _ = client.select(REDIS_SESSION_DB).await;
                }));
    
    
            println!("Checked reconnections...");
    
            client.connect(Some(policy));
            println!("Trying to connect Redis...");
            client.wait_for_connect().await.unwrap();
            println!("Redis connected!");
    

    makes it throw this error:

    Client received connection error: Redis Error - kind: IO, details: Os { code: 113, kind: HostUnreachable, message: "No route to host" }
    

    Given that the info about nodes in cluster is gathered with CLUSTER NODES (per https://docs.rs/fred/latest/fred/types/enum.ServerConfig.html#variant.Clustered), I assume passing a single IP is fine. I guess this is some kind of a DNS/redirect resolution problem. I see that you're currently working on supporting custom DNS resolvers on the client, could this be related?

    Keep up the great work, this is probably the best Redis driver for Rust at the moment!

    P.S. When do you plan to release Streams functionality by the way (in terms of time)?

    good first issue 
    opened by ConstBur 7
  • How can I configure the server parameters?

    How can I configure the server parameters?

    Hello! I need to create an instance of RedisClient with my own parameters. But I only see the default function. Where I can set up my host and port to RedisConfig?

    Also, where i can set a new client name?

    Thank you!

    question 
    opened by juanki03 6
  • Questions: Read selection strategy and failing replica node handling

    Questions: Read selection strategy and failing replica node handling

    This is actually questions, not issues. I can't seem to find the info around this.

    1. For reading, what's the reading strategy among replica nodes? Round-robin? Or something different? Can it be configured?

    2. How does the RedisClient handle a failing replica node?

    Thank you. Ed

    question 
    opened by tsukit 5
  • Version 5

    Version 5

    Hello Alec, it's me again. I'd just like to thank you again for making this driver and for incredible help in my last issue ( #21 ). Fred is awesome, the best among Redis drivers I've encountered. Never thought that someone could make something better pretty much single-handedly than the teams/communities behind other drivers, especially the "official" one (https://github.com/mitsuhiko/redis-rs).

    As a thank you I'd really like to support this library, you've got Patreon or something like that?

    Can't wait for the 5.0.0 release, hope it will be ready soon!

    opened by ConstBur 5
  • `DynamicRedisPool` blocked on `wait_for_connect`

    `DynamicRedisPool` blocked on `wait_for_connect`

    Description

    I am attempting to use a dynamic pool in an actix web app (btw I am using a v4 beta so am on the correct tokio version) and my code hangs on wat_for_connect consistently. I know similar things have happened previously but as those were closed issues I started this one.

    Reproduction

    To reproduce I am able to just run the code from the example here https://github.com/aembke/fred.rs/blob/main/examples/dynamic_pool.rs

    Here is my app main

    use fred::pool::DynamicRedisPool;
    use fred::prelude::*;
    
    #[tokio::main]
    async fn main() -> Result<(), RedisError> {
        let config = RedisConfig {
            // whether to skip reconnect logic when first connecting
            fail_fast: true,
            // server configuration
            server: ServerConfig::new_centralized("127.0.0.1", 6379),
            // whether to automatically pipeline commands
            pipeline: true,
            // how to handle commands sent while a connection is blocked
            blocking: Blocking::Block,
            // an optional username, if using ACL rules
            username: None,
            // an optional authentication key or password
            password: None,
            // optional TLS settings
            tls: None,
            // whether to enable tracing
        };
        // the max size isn't a hard limit - it just determines the size of the client array when the pool is initialized
        let pool = DynamicRedisPool::new(config, None, 5, 10);
    
        println!("I am printed");
        let _ = pool.connect();
        println!("Me too");
        let _ = pool.wait_for_connect().await?;
        println!("Not Me :(");
    
        // modify the size of the pool at runtime
        let (new_client, _) = pool.scale_up().await;
        if let Some(old_client) = pool.scale_down(true).await {
            assert_eq!(new_client.id(), old_client.id());
        }
    
        for client in pool.clients() {
            println!("Client ID {} in pool.", client.id());
        }
    
        // due to the locking required by the resizing operations the Deref trait cannot be used with this pool implementation.
        // if modifications to the pool are not required at runtime the static pool is usually easier to use
        let _ = pool.next().get("foo").await?;
        let _ = pool.next().set("foo", "bar", None, None, false).await?;
        let _ = pool.next().get("foo").await?;
    
        // if the pool can be empty a function exists that will lazily create a new client, if needed.
        // if the pool is not empty this just calls `next` without creating a new client.
        let _ = pool.next_connect(true).await.get("foo").await?;
    
        let _ = pool.quit_pool().await;
        Ok(())
    }
    

    Hope this helps and please ask any questions.

    System

    MacOS, Redis via Docker, Rust 1.57

    opened by JamesPatrickGill 5
  • fred (and redis-protocol) uses wrong assumption that keys are utf8

    fred (and redis-protocol) uses wrong assumption that keys are utf8

    Hello, First of all thanks for the great library!

    I just want to raise an issue about wrong assumption (and behaviour) that Redis keys are valid utf8 strings. According the Redis data types doc

    Redis keys are binary safe, this means that you can use any binary sequence as a key, from a string like "foo" to the content of a JPEG file.

    Although in the redis-protocol library the function redis_keyslot wrongly accepts key as &str type.

    In combination with String::from_utf8_lossy conversion in https://github.com/aembke/fred.rs/blob/main/src/commands/lua.rs#L26 it produces wrong keys.

    Unfortuantely this is not a single issue. In https://github.com/aembke/redis-protocol.rs/blob/main/src/utils.rs#L360 utf8 string key is indexed using byte index which not always correct because index can be not a UTF-8 code point boundary (which generates panic).

    bug next major version 
    opened by khvzak 5
  • Cluster Redis should reconnect to hosts other than those configured

    Cluster Redis should reconnect to hosts other than those configured

    Great work btw!

    When connecting to a Redis cluster, if it connects to a master node the connection will not update itself with the other nodes. If the master fails at this point the client will lose all connection as it did not update the RedisInnerClient. It seems as though sync_cluster is only called on handle_connection_closed.

    I feel like this should be a fairly simple fix of adding a call to sync_cluster, and I could open a PR if you want, but I could be overlooking something trivial.

    Also I just want to confirm that this line does not contain a a bug:

    https://github.com/aembke/fred.rs/blob/93ddcadd57a3386103ece5aae7a8d66e839b91e5/src/protocol/utils.rs#L150

    The third column of the CLUSTER NODES command can also contain a list of flags such as master,fail. Would it be the right move here to not include nodes that are failed?

    opened by davlum 4
  • add support for FUNCTION LOAD / FCALL / FCALL_RO

    add support for FUNCTION LOAD / FCALL / FCALL_RO

    base on pull request https://github.com/aembke/fred.rs/pull/64 , see the diff for this in https://github.com/aembke/fred.rs/pull/65/commits/e1f580c55268617f8baa4bdb4d928943d8f9ba1a add support for FUNCTION LOAD / FCALL / FCALL_RO see https://redis.io/commands/function-load/

    opened by usrtax 0
  • Connection management issues

    Connection management issues

    Hi - Thank you for a great library!

    We have been experiencing some connection management issues (including using latest version 5.2.0). Using fred as a redis client in a kubernetes cluster (redis is self-hosted in k8s) - if the redis instance is moved to another node, we end up in a corrupted state with connection management.

    This can simply be reproduced doing kubectl scale deploy/redis --replicas 0.

    We have tested with/without pipelining. With/without connection pool. Config is using exponential reconnect policy + command timeout. This is a trace of the events.

    Initially we get a connection - everything is fine:

     DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Initializing connections...
     TRACE fred::protocol::types       > fred-JUKIBXKdhV: Using 10.0.78.183 among 1 possible socket addresses for redis:6379
     TRACE fred::multiplexer::utils    > fred-JUKIBXKdhV: Connecting to 10.0.78.183:6379
     TRACE mio::poll                   > registering event source with poller: token=Token(0), interests=READABLE | WRITABLE
     DEBUG fred::protocol::connection  > fred-JUKIBXKdhV: Skip setting client name.
     TRACE redis_protocol::utils       > allocating more, len: 0, amt: 24
     TRACE fred::protocol::codec       > fred-JUKIBXKdhV: Encoded 24 bytes to 10.0.78.183:6379. Buffer len: 24 (RESP2)
     TRACE tokio_util::codec::framed_impl > flushing framed transport
     TRACE tokio_util::codec::framed_impl > writing; remaining=24
     TRACE tokio_util::codec::framed_impl > framed transport flushed
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 4 bytes from 10.0.78.183:6379 (RESP2).
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Parsed 4 bytes from 10.0.78.183:6379
     TRACE tokio_util::codec::framed_impl > frame decoded from buffer
     DEBUG fred::protocol::connection     > fred-JUKIBXKdhV: Read client ID: Resp2(Integer(7))
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Creating new close tx sender.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Set centralized connection closed sender.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting connect message.
    

    Now we scale down the deployment:

    kubectl scale deploy/redis --replicas 0
    

    Fred realizes that the connection is lost:

     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Starting command stream...
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Set inner connection closed sender.
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Redis frame stream closed with error Redis Error - kind: Canceled, details: Canceled.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emit connection closed from error: Redis Error - kind: Canceled, details: Canceled.
     TRACE fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting connection closed with 0 messages
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Recv reconnect message with 0 commands. State: Disconnected
     INFO  fred::multiplexer::commands    > fred-JUKIBXKdhV: Sleeping for 132 ms before reconnecting
     TRACE fred::protocol::types          > fred-JUKIBXKdhV: Using 10.0.78.183 among 1 possible socket addresses for redis:6379
     TRACE fred::multiplexer::utils       > fred-JUKIBXKdhV: Connecting to 10.0.78.183:6379
     TRACE mio::poll                      > registering event source with poller: token=Token(1), interests=READABLE | WRITABLE
    

    Notice at this time there are no pods running redis, so the reconnect will just hang. Now we attempt to execute a command against redis, using a client from the connection pool:

     TRACE fred::multiplexer::commands    > fred-JUKIBXKdhV: Recv command on multiplexer SET. Buffer len: 0
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Will block multiplexer loop waiting on SET to finish.
     TRACE fred::multiplexer              > fred-JUKIBXKdhV: Skip waiting on cluster sync.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Writing command SET to redis:6379
     TRACE fred::protocol::connection     > fred-JUKIBXKdhV: Sending command and flushing the sink.
     TRACE redis_protocol::utils          > allocating more, len: 0, amt: 40
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Encoded 40 bytes to 10.0.78.183:6379. Buffer len: 40 (RESP2)
     TRACE tokio_util::codec::framed_impl > flushing framed transport
     TRACE tokio_util::codec::framed_impl > writing; remaining=40
     WARN  fred::multiplexer::commands    > fred-JUKIBXKdhV: Error writing command None: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Reconnecting or stopping due to error: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting close all sockets message: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
     WARN  fred::multiplexer::utils       > fred-JUKIBXKdhV: Error sending close message to socket streams: SendError(Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" })
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Waiting for client to reconnect...
    resp Err(Redis Error - kind: Timeout, details: Request timed out.)
    

    Next we scale up redis:

    kubectl scale deploy/redis --replicas 1
    

    After a while the connection pool will reconnect:

     DEBUG fred::protocol::connection     > fred-JUKIBXKdhV: Skip setting client name.
     TRACE redis_protocol::utils          > allocating more, len: 0, amt: 24
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Encoded 24 bytes to 10.0.78.183:6379. Buffer len: 24 (RESP2)
     TRACE tokio_util::codec::framed_impl > flushing framed transport
     TRACE tokio_util::codec::framed_impl > writing; remaining=24
     TRACE tokio_util::codec::framed_impl > framed transport flushed
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 4 bytes from 10.0.78.183:6379 (RESP2).
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Parsed 4 bytes from 10.0.78.183:6379
     TRACE tokio_util::codec::framed_impl > frame decoded from buffer
     DEBUG fred::protocol::connection     > fred-JUKIBXKdhV: Read client ID: Resp2(Integer(3))
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Set centralized connection closed sender.
     TRACE mio::poll                      > deregistering event source from poller
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Reconnect task finished reconnecting or syncing with: Ok(())
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Sending 0 commands after reconnecting.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting connect message.
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
    

    And now we have the problem - if we try to execute a command using the client from the connection pool:

     TRACE fred::multiplexer::commands    > fred-JUKIBXKdhV: Recv command on multiplexer SET. Buffer len: 0
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Will block multiplexer loop waiting on SET to finish.
     TRACE fred::multiplexer              > fred-JUKIBXKdhV: Skip waiting on cluster sync.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Writing command SET to redis:6379
     TRACE fred::protocol::connection     > fred-JUKIBXKdhV: Sending command and flushing the sink.
     TRACE redis_protocol::utils          > allocating more, len: 0, amt: 40
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Encoded 40 bytes to 10.0.78.183:6379. Buffer len: 40 (RESP2)
     TRACE tokio_util::codec::framed_impl > flushing framed transport
     TRACE tokio_util::codec::framed_impl > writing; remaining=40
     TRACE tokio_util::codec::framed_impl > framed transport flushed
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Waiting on last request to finish without pipelining.
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 5 bytes from 10.0.78.183:6379 (RESP2).
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Parsed 5 bytes from 10.0.78.183:6379
     TRACE tokio_util::codec::framed_impl > frame decoded from buffer
     TRACE fred::multiplexer::responses   > fred-JUKIBXKdhV: Processing response from redis:6379 to SET with frame kind SimpleString
     TRACE fred::multiplexer::responses   > fred-JUKIBXKdhV: Writing to multiplexer sender to unblock command loop.
     WARN  fred::multiplexer::responses   > fred-JUKIBXKdhV: Error sending cmd loop response: ()
     TRACE fred::multiplexer::responses   > fred-JUKIBXKdhV: Responding to caller for SET
     WARN  fred::multiplexer::responses   > fred-JUKIBXKdhV: Failed to respond to caller.
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
    resp Err(Redis Error - kind: Timeout, details: Request timed out.)
    

    The command is actually executed (we can verify this using redis-cli and checking that the key has been SET in the redis cluster.). All following requests using this client will now result in timeouts (although the command is in fact executed!) .

    We use tokio v1.20.1 for reference.

    If we use a headless service in k8s, we do not experience this problem (as the DNS will resolve with 0 ip-addresses when the deployment is scaled to 0 replicas, and resolve the new ip-address once the pod has been moved to the other node).

    opened by birkmose 6
  • Ping fails with SubscriberClient

    Ping fails with SubscriberClient

    Ping seems to be failing with the SubscriberClient consistently with an error: "Could not convert multiple frames to RedisValue". The stack seems to show the response is parsed as resp3, but the server AFAIK only supports resp2.

    Environment:

    redis_version:5.0.7 (Seems to fail the same with latest 7.0) fred.rs: 5.1.0

    Logs:

    2022-08-28 02:11:57.234867 [00d] [D] [connection]:0287 fred-JeHtXwyITZ: Read client ID: Resp2(Integer(82)) 2022-08-28 02:12:57.236755 [00c] [D] [utils ]:0400 fred-JeHtXwyITZ: Writing command PING to localhost:6379 2022-08-28 02:12:57.237310 [00a] [D] [server ]:0094 fred-JeHtXwyITZ: Recv ping response.

    Stack:

      * frame #0: 0x00005555562aaa94 test`fred::protocol::utils::frame_to_single_result(frame=redis_protocol::resp3::types::Frame::Array @ 0x00007ffff649ede0) at utils.rs:547:20
        frame #1: 0x000055555582dfb9 test`fred::commands::impls::server::ping::{{closure}}((null)=core::future::ResumeTy @ 0x00007ffff649efc8) at server.rs:95:3
        frame #2: 0x0000555555a7e497 test`<core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll(self=core::pin::Pin<&mut core::future::from_generator::GenFuture<fred::commands::impls::server::ping::{async_fn_env#0}>> @ 0x00007ffff649f460, cx=0x00007ffff64a93d0) at mod.rs:91:19
        frame #3: 0x00005555558197d9 test`fred::interfaces::ClientLike::ping::{{closure}}::{{closure}}((null)=core::future::ResumeTy @ 0x00007ffff64a2948) at interfaces.rs:256:58
    
    

    Data:

    print frame
    (redis_protocol::resp3::types::Frame::Array) frame = {
      data = (2) vec![{...}, {...}] {}
      attributes = None {}
    }
    
    print frame.data.0
    (alloc::raw_vec::RawVec<redis_protocol::resp3::types::Frame, alloc::alloc::Global>) buf = {
      ptr = {
        pointer = {}
        _marker = {}
      }
      cap = 2
      alloc = {}
    }
    
    opened by tugtugtug 4
  • Incorrect Cluster Failover Behavior

    Incorrect Cluster Failover Behavior

    The library behaves incorrectly in a Redis cluster failover scenario.

    Reproduce scenario:

    1. Run a Redis cluster in docker with failover
    2. Connect to the cluster
    3. Pause master container with data
    4. Wait for the cluster failover mechanism sync the nodes and rehash slots
    5. The library can no longer read and write data to the node, because reinitialization of slots and nodes does not occur. There is no way (public methods) to re-sync the cluster state and slots on the client side.

    Problems:

    • There is no background monitoring of the cluster state on the client side. Cluster reconfiguration occurs only when the connection is explicitly disconnected or when the answer is "MOVED".
    • Timeout does not trigger reconnect.

    Bottom line: If the connection to the master node of the cluster is lost, the library endlessly unsuccessfully tries to connect to it. This does not reconfigure the cluster and slots on the client side

    opened by neogenie 1
  • Add script utility type

    Add script utility type

    Handles the common pattern of EVALSHA then calling EVAL if EVALSHA returns a NOSCRIPT error.

    I would be happy to write some unit tests, but wasn't sure the best strategy. Would appreciate any guidance you can provide.

    opened by zombiezen 2
Releases(v5.0.0)
  • v5.0.0(May 4, 2022)

    • Remove mocks feature (https://github.com/aembke/fred.rs/issues/36)
    • Change bzpopmin/bzpopmax return value types
    • Fix scan_cluster bugs
    • Add URL parsing for RedisConfig
    Source code(tar.gz)
    Source code(zip)
  • v5.0.0-beta.1(Mar 14, 2022)

    • Rewrite the protocol parser so it can decode frames without moving or copying the underlying bytes
    • Change most command implementations to avoid unnecessary allocations when using static str slices
    • Rewrite the public interface to use different traits for different parts of the redis interface
    • Relax some restrictions on certain commands being used in a transaction
    • Implement the Streams interface (XADD, XREAD, etc)
    • RESP3 support
    • Move most perf configuration options from globals to client-specific config structs
    • Add backpressure configuration options to the client config struct
    • Fix bugs that can occur when using non-UTF8 byte arrays as keys
    • Add the serde-json feature
    • Handle more complicated failure modes with Redis clusters
    • Add a more robust and specialized pubsub subscriber client
    • Ergonomics improvements on the public interfaces
    • Improve docs
    • More tests
    Source code(tar.gz)
    Source code(zip)
  • v4.3.2(Jan 15, 2022)

  • v4.3.1(Dec 20, 2021)

  • v4.3.0(Dec 5, 2021)

  • v4.2.3(Dec 1, 2021)

  • v4.2.2(Nov 30, 2021)

    • Remove some unnecessary async locks
    • Fix client pool wait_for_connect implementation
    • Add tests for https://github.com/aembke/fred.rs/issues/13 and https://github.com/aembke/fred.rs/issues/14
    Source code(tar.gz)
    Source code(zip)
  • v4.2.1(Nov 1, 2021)

  • v4.2.0(Sep 28, 2021)

  • v4.1.0(Sep 27, 2021)

  • 4.0.0(Sep 18, 2021)

  • 3.0.0(Sep 12, 2021)

    • Rewrite to use async/await and new dependencies
    • Switch github repository
    • Redo testing framework
    • Support ACL, geo, cluster, client, hyperloglog, lua, slowlog, and transaction commands
    • Add pipeline testing application
    • Change RedisKey to use bytes instead of str
    • Redo RedisValue implementation
    • Support automatic pipelining
    • Redo cluster state caching implementation
    • Support manual and automatic client unblocking
    • Custom reconnect errors feature
    • Blocking encoding feature
    • Network logs feature
    • Add Tracing support
    • Support client pooling
    • Support the MONITOR command
    • Refactor RedisConfig interface
    • Add chaos monkey tests
    • Add examples
    • Update docs
    Source code(tar.gz)
    Source code(zip)
Owner
Alec Embke
Alec Embke
A type-safe, high performance ORM framework

A type-safe, high performance ORM framework

dark-flames 21 Mar 4, 2022
pgvector support for Rust Supports Rust-Postgres, SQLx, and Diesel

pgvector-rust pgvector support for Rust Supports Rust-Postgres, SQLx, and Diesel Getting Started Follow the instructions for your database library: Ru

Andrew Kane 14 Nov 23, 2022
gobang - A cross-platform TUI database management tool written in Rust

gobang - A cross-platform TUI database management tool written in Rust

Takayuki Maeda 2.1k Dec 2, 2022
A MongoDB ODM for Rust based on Mongoose

Nongoose MongoDB ODM for Rust based on Mongoose Basic usage use mongodb::{bson::oid::ObjectId, sync::Client}; use nongoose::Schema; use serde::{Deseri

NextChat 9 Nov 8, 2022
Easy c̵̰͠r̵̛̠ö̴̪s̶̩̒s̵̭̀-t̶̲͝h̶̯̚r̵̺͐e̷̖̽ḁ̴̍d̶̖̔ ȓ̵͙ė̶͎ḟ̴͙e̸̖͛r̶̖͗ë̶̱́ṉ̵̒ĉ̷̥e̷͚̍ s̷̹͌h̷̲̉a̵̭͋r̷̫̊ḭ̵̊n̷̬͂g̵̦̃ f̶̻̊ơ̵̜ṟ̸̈́ R̵̞̋ù̵̺s̷̖̅ţ̸͗!̸̼͋

Rust S̵̓i̸̓n̵̉ I̴n̴f̶e̸r̵n̷a̴l mutability! Howdy, friendly Rust developer! Ever had a value get m̵̯̅ð̶͊v̴̮̾ê̴̼͘d away right under your nose just when

null 293 Sep 22, 2022
Utilities for tokio/tokio-uring based async IO

dbs-fuse The dbs-fuse is a utility crate to support fuse-backend-rs. Wrappers for Rust async io It's challenging to support Rust async io, and it's ev

OpenAnolis Community 6 Oct 23, 2022
Incomplete Redis client and server implementation using Tokio - for learning purposes only

mini-redis mini-redis is an incomplete, idiomatic implementation of a Redis client and server built with Tokio. The intent of this project is to provi

Tokio 2.2k Dec 2, 2022
High-level async Cassandra client written in 100% Rust.

CDRS tokio CDRS is production-ready Apache Cassandra driver written in pure Rust. Focuses on providing high level of configurability to suit most use

Kamil Rojewski 69 Nov 26, 2022
A Redis module that provides rate limiting in Redis as a single command.

redis-cell A Redis module that provides rate limiting in Redis as a single command. Implements the fairly sophisticated generic cell rate algorithm (G

Brandur Leach 1.1k Nov 17, 2022
The most fundamental type for async synchronization: an intrusive linked list of futures

wait-list This crate provides WaitList, the most fundamental type for async synchronization. WaitList is implemented as an intrusive linked list of fu

Sabrina Jewson 7 Oct 26, 2022
Russh - Async (tokio) SSH2 client and server rimplementation

Russh Async (tokio) SSH2 client and server rimplementation. This is a fork of Thrussh by Pierre-Étienne Meunier which adds: More safety guarantees AES

Warptech Industries 99 Dec 4, 2022
The gRPC library for Rust built on C Core library and futures

gRPC-rs gRPC-rs is a Rust wrapper of gRPC Core. gRPC is a high performance, open source universal RPC framework that puts mobile and HTTP/2 first. Sta

TiKV Project 1.6k Nov 25, 2022
Cassandra DB native client written in Rust language. Find 1.x versions on https://github.com/AlexPikalov/cdrs/tree/v.1.x Looking for an async version? - Check WIP https://github.com/AlexPikalov/cdrs-async

CDRS CDRS is looking for maintainers CDRS is Apache Cassandra driver written in pure Rust. ?? Looking for an async version? async-std https://github.c

Alex Pikalov 338 Oct 29, 2022
A fully asynchronous, futures-based Kafka client library for Rust based on librdkafka

rust-rdkafka A fully asynchronous, futures-enabled Apache Kafka client library for Rust based on librdkafka. The library rust-rdkafka provides a safe

Federico Giraud 1.1k Dec 2, 2022
Gecko is a high-level, general-purpose programming language built on top of the LLVM project.

Gecko is a high-level, general-purpose programming language built on top of the LLVM project. Gecko Technology & principles Gecko is a general-purpose

Gecko 19 Oct 3, 2022
Simple crate that wraps a tokio::process into a tokio::stream

tokio-process-stream tokio-process-stream is a simple crate that wraps a tokio::process into a tokio::stream Having a stream interface to processes is

Leandro Lisboa Penz 8 Sep 13, 2022
Network-agnostic, high-level game networking library for client-side prediction and server reconciliation.

WARNING: This crate currently depends on nightly rust unstable and incomplete features. crystalorb Network-agnostic, high-level game networking librar

Ernest Wong 170 Nov 12, 2022
Public aircraft & flightroute api Built in Rust for Docker, using PostgreSQL & Redis

api.adsbdb.com public aircraft & flightroute api Built in Rust for Docker, using PostgreSQL & Redis See typescript branch for original typescript vers

Jack Wills 61 Nov 23, 2022
Fork of async-raft, the Tokio-based Rust implementation of the Raft protocol.

Agreed Fork of async-raft, the Tokio-based Rust implementation of the Raft distributed consensus protocol. Agreed is an implementation of the Raft con

NLV8 Technologies 8 Jul 5, 2022
Mix async code with CPU-heavy thread pools using Tokio + Rayon

tokio-rayon Mix async code with CPU-heavy thread pools using Tokio + Rayon Resources Documentation crates.io TL;DR Sometimes, you're doing async stuff

Andy Barron 72 Nov 30, 2022