A high level async Redis client for Rust built on Tokio and Futures.

Overview

Fred

License License Build Status Crates.io API docs

A high level async Redis client for Rust built on Tokio and Futures.

Example

("foo").await?); // or use a lower level interface for responses to defer parsing, etc let foo: RedisValue = client.get("foo").await?; assert_eq!(foo.as_str().unwrap(), "bar"); let _ = client.quit().await?; Ok(()) } ">
use fred::prelude::*;

#[tokio::main]
async fn main() -> Result<(), RedisError> {
  let config = RedisConfig::default();
  let policy = ReconnectPolicy::default();
  let client = RedisClient::new(config);
  
  // connect to the server, returning a handle to the task that drives the connection
  let _ = client.connect(Some(policy));
  let _ = client.wait_for_connect().await?;
  let _ = client.flushall(false).await?;
 
  // convert responses to many common Rust types
  let foo: Option<String> = client.get("foo").await?;
  assert!(foo.is_none());
  
  let _: () = client.set("foo", "bar", None, None, false).await?;
  // or use turbofish to declare response types
  println!("Foo: {:?}", client.get<String, _>("foo").await?);
  
  // or use a lower level interface for responses to defer parsing, etc
  let foo: RedisValue = client.get("foo").await?;
  assert_eq!(foo.as_str().unwrap(), "bar");
  
  let _ = client.quit().await?;
  Ok(())
}

See the examples for more.

Install

With cargo edit.

cargo add fred

Features

  • Flexible and generic client interfaces.
  • Supports clustered, centralized, and sentinel Redis deployments.
  • Optional built-in reconnection logic with multiple backoff policies.
  • Publish-Subscribe and keyspace events interfaces.
  • Supports transactions.
  • Supports Lua scripts.
  • Supports streaming results from the MONITOR command.
  • Supports custom commands provided by third party modules.
  • Supports TLS connections.
  • Handles cluster rebalancing operations without downtime or errors.
  • Supports streaming interfaces for scanning functions.
  • Options to automatically pipeline requests when possible.
  • Automatically retry requests under bad network conditions.
  • Support for configuring global settings that can affect performance under different network conditions. Callers can configure backpressure settings, when and how the underlying socket is flushed, and how many times requests are attempted.
  • Built-in tracking for network latency and payload size metrics.
  • A client pooling interface to round-robin requests among a pool of clients.
  • Built in support for tracing.

Tracing

This crate supports tracing via the tracing crate. See the tracing info for more information.

This feature is disabled by default, but callers can opt-in via the full-tracing or partial-tracing features.

Logging

To enable logs use the environment variable RUST_LOG with a value of trace, debug, warn, error, or info. See the documentation for env_logger for more information.

When a client is initialized it will generate a unique client name with a prefix of fred-. This name will appear in all logging statements on the client in order to associate client and server operations if logging is enabled on both.

Compile Time Features

Name Default Description
enable-tls x Enable TLS support. This requires OpenSSL (or equivalent) dependencies.
vendored-tls Enable TLS support, using vendored OpenSSL (or equivalent) dependencies, if possible.
ignore-auth-error x Ignore auth errors that occur when a password is supplied but not required.
metrics x Enable the metrics interface to track overall latency, network latency, and request/response sizes.
reconnect-on-auth-error A NOAUTH error is treated the same as a general connection failure and the client will reconnect based on the reconnection policy.
index-map Use IndexMap instead of HashMap as the backing store for Redis Map types. This is useful for testing and may also be useful for callers.
pool-prefer-active x Prefer connected clients over clients in a disconnected state when using the RedisPool interface.
full-tracing Enable full tracing support. This can emit a lot of data so a partial tracing feature is also provided.
partial-tracing Enable partial tracing support, only emitting traces for top level commands and network latency. Note: this has a non-trivial impact on performance.
blocking-encoding Use a blocking task for encoding or decoding frames over a certain size. This can be useful for clients that send or receive large payloads, but will only work when used with a multi-thread Tokio runtime.
network-logs Enable TRACE level logging statements that will print out all data sent to or received from the server.
custom-reconnect-errors Enable an interface for callers to customize the types of errors that should automatically trigger reconnection logic.
monitor Enable an interface for running the MONITOR command.
sentinel-client Enable an interface for communicating directly with Sentinel nodes. This is not necessary to use normal Redis clients behind a sentinel layer.

Environment Variables

Name Default Description
FRED_DISABLE_CERT_VERIFICATION false Disable certificate verification when using TLS features.
FRED_DISABLE_HOST_VERIFICATION false Disable host verification when using TLS features.

These are environment variables because they're dangerous in production and callers should be forced to surface them in a loud and obvious way.

Pipelining

The caller can toggle pipelining via flags on the RedisConfig provided to a client to enable automatic pipelining for commands whenever possible. These settings can drastically affect performance on both the server and client, but further performance tuning may be necessary to avoid issues such as using too much memory on the client or server while buffering commands.

See the global performance tuning functions for more information on how to tune backpressure or other relevant settings related to pipelining.

This module also contains a separate test application that can be used to demonstrate the effects of pipelining. This test application also contains some helpful information on how to use the tracing features.

ACL & Authentication

Prior to the introduction of ACL commands in Redis version 6 clients would authenticate with a single password. If callers are not using the ACL interface, or using Redis version <=5.x, they should configure the client to automatically authenticate by using the password field on the RedisConfig and leaving the username field as None.

If callers are using ACLs and Redis version >=6.x they can configure the client to automatically authenticate by using the username and password fields on the provided RedisConfig.

It is required that the authentication information provided to the RedisConfig allows the client to run CLIENT SETNAME and CLUSTER NODES. Callers can still change users via the auth command later, but it recommended to instead use the username and password provided to the RedisConfig so that the client can automatically authenticate after reconnecting.

If this is not possible callers need to ensure that the default user can run the two commands above. Additionally, it is recommended to move any calls to the auth command inside the on_reconnect block.

Redis Sentinel

To use the Redis Sentinel interface callers should provide a ServerConfig::Sentinel variant when creating the client's RedisConfig. This should contain the host/port tuples for the known sentinel nodes when first creating the client.

The client will automatically update these values in place as sentinel nodes change whenever connections to the primary Redis server close. Callers can inspect these changes with the client_config function on any RedisClient that uses the sentinel interface.

Note: Sentinel connections will use the same authentication and TLS configuration options as the connections to the Redis servers.

Callers can also use the sentinel-client feature to communicate directly with Sentinel nodes.

Customizing Error Handling

The custom-reconnect-errors feature enables an interface on the globals to customize the list of errors that should automatically trigger reconnection logic (if configured).

In many cases applications respond to Redis errors by logging the error, maybe waiting and reconnecting, and then trying again. Whether to do this often depends on the prefix in the error message, and this interface allows callers to specify which errors should be handled this way.

Errors that trigger this can be seen with the on_error function.

Tests

To run the unit and integration tests:

cargo test -- --test-threads=1

OR

# run the tests with default features
./tests/run.sh

OR

# run the tests 3 times, once with default features, once with no features, and once with all features (except chaos monkey)
./tests/run_all.sh

Note: a local Redis server must be running on port 6379, and a clustered deployment must be running on ports 30001 - 30006 for the integration tests to pass.

Scripts are included to download and run centralized and clustered redis servers at any Redis version. These scripts will not make any modifications to your system outside the tests/tmp folder.

export REDIS_VERSION=6.2.2
./tests/scripts/install_redis_centralized.sh
./tests/scripts/install_redis_clustered.sh

Beware: the tests will periodically run flushall.

Contributing

See the contributing documentation for info on adding new commands.

Comments
  • Client not responding

    Client not responding

    Description

    Hi! Thanks for the awesome library.

    I'm having a problem with the Redis client getting stuck before sending the command to the Redis server (it happens with any command). This is happening in a Stream, but didn't try to replicate outside of one.

    The pattern of getting stuck is random, sometimes it runs for tens of stream values and gets stuck and other times it gets stuck on the first value of the stream.

    What I tried

    I tried to track to the best of my abilities where it was getting stuck and I think it's at wait_for_response(rx) in src/utils.rs, but I'm not sure.

    When setting a default timeout it gets hit, otherwise it waits forever.

    I also ran redis-cli monitor on the Redis server and confirmed that the client gets stuck before sending the command.

    Reproducibility

    The minimal code to reproduce it is (UPDATE: this doesn't reproduce the issue, see https://github.com/aembke/fred.rs/issues/13#issuecomment-974429304):

    use std::time::Duration;
    
    use futures::StreamExt;
    use fred::prelude::*;
    use fred::pool::StaticRedisPool;
    
    #[tokio::main]
    async fn main() {
      let pool = StaticRedisPool::new(RedisConfig::default(), 5);
      pool.connect(Some(ReconnectPolicy::default()));
      pool.wait_for_connect().await.unwrap();
    
      let stream = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_millis(100)))
        .then(move |_| {
          let pool = pool.clone();
    
          async move {
            let value: Option<String> = pool.get("key").await.unwrap(); // This call never responds.
    
            value
          }
        });
    
      futures::pin_mut!(stream);
    
      while let Some(value) = stream.next().await {
        println!("{:?}", value);
      }
    }
    

    Cargo.toml:

    [package]
    edition = "2021"
    
    [dependencies]
    fred = { version = "4.2.1", default-features = false, features = [
        "pool-prefer-active",
        "ignore-auth-error",
    ] }
    futures = { version = "0.3.17", default-features = false }
    tokio = { version = "1.13.0", features = ["rt", "macros", "time"] }
    tokio-stream = "0.1.8"
    

    OS and versions

    Rust: 1.56.0 fred: 4.2.1 Redis: 6.2.6 inside Docker OS: Ubuntu 21.04

    opened by dbstratta 21
  • Waiting for exec to return from server

    Waiting for exec to return from server

    Hi, I don't know if it's an intended behavior, but I ran into following situation:

    Suppose you have 10ms latency to Redis. Every 5ms you application needs to run

    MULTI
    HSET key value
    PUBLISH message
    EXEC
    

    I am using the following code to execute this sequence in fred:

    let trx = redis.multi(true).await?;
    trx.hset(key, hashmap! { field => value.into()}).await?;
    trx.publish(channel, message).await?;
    trx.exec().await?;
    

    I'd like to have a writer that executes this code, but waits for the response in a new tokio task every 5ms. This way there will be 2 running requests waiting for responses. I don't know how to achieve this with fred.

    There are 2 issues:

    • Fred allows to call another MULTI once the response from EXEC has arrived. I think new MULTI can be allowed once the EXEC has been sent
    • It's not possible to split the sending the commands and receiving the response. I don't know when it is safe to call second MULTI

    Thanks for help

    enhancement 
    opened by mkurtak 18
  • Connection management issues

    Connection management issues

    Hi - Thank you for a great library!

    We have been experiencing some connection management issues (including using latest version 5.2.0). Using fred as a redis client in a kubernetes cluster (redis is self-hosted in k8s) - if the redis instance is moved to another node, we end up in a corrupted state with connection management.

    This can simply be reproduced doing kubectl scale deploy/redis --replicas 0.

    We have tested with/without pipelining. With/without connection pool. Config is using exponential reconnect policy + command timeout. This is a trace of the events.

    Initially we get a connection - everything is fine:

     DEBUG fred::multiplexer::commands > fred-JUKIBXKdhV: Initializing connections...
     TRACE fred::protocol::types       > fred-JUKIBXKdhV: Using 10.0.78.183 among 1 possible socket addresses for redis:6379
     TRACE fred::multiplexer::utils    > fred-JUKIBXKdhV: Connecting to 10.0.78.183:6379
     TRACE mio::poll                   > registering event source with poller: token=Token(0), interests=READABLE | WRITABLE
     DEBUG fred::protocol::connection  > fred-JUKIBXKdhV: Skip setting client name.
     TRACE redis_protocol::utils       > allocating more, len: 0, amt: 24
     TRACE fred::protocol::codec       > fred-JUKIBXKdhV: Encoded 24 bytes to 10.0.78.183:6379. Buffer len: 24 (RESP2)
     TRACE tokio_util::codec::framed_impl > flushing framed transport
     TRACE tokio_util::codec::framed_impl > writing; remaining=24
     TRACE tokio_util::codec::framed_impl > framed transport flushed
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 4 bytes from 10.0.78.183:6379 (RESP2).
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Parsed 4 bytes from 10.0.78.183:6379
     TRACE tokio_util::codec::framed_impl > frame decoded from buffer
     DEBUG fred::protocol::connection     > fred-JUKIBXKdhV: Read client ID: Resp2(Integer(7))
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Creating new close tx sender.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Set centralized connection closed sender.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting connect message.
    

    Now we scale down the deployment:

    kubectl scale deploy/redis --replicas 0
    

    Fred realizes that the connection is lost:

     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Starting command stream...
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Set inner connection closed sender.
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Redis frame stream closed with error Redis Error - kind: Canceled, details: Canceled.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emit connection closed from error: Redis Error - kind: Canceled, details: Canceled.
     TRACE fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting connection closed with 0 messages
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Recv reconnect message with 0 commands. State: Disconnected
     INFO  fred::multiplexer::commands    > fred-JUKIBXKdhV: Sleeping for 132 ms before reconnecting
     TRACE fred::protocol::types          > fred-JUKIBXKdhV: Using 10.0.78.183 among 1 possible socket addresses for redis:6379
     TRACE fred::multiplexer::utils       > fred-JUKIBXKdhV: Connecting to 10.0.78.183:6379
     TRACE mio::poll                      > registering event source with poller: token=Token(1), interests=READABLE | WRITABLE
    

    Notice at this time there are no pods running redis, so the reconnect will just hang. Now we attempt to execute a command against redis, using a client from the connection pool:

     TRACE fred::multiplexer::commands    > fred-JUKIBXKdhV: Recv command on multiplexer SET. Buffer len: 0
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Will block multiplexer loop waiting on SET to finish.
     TRACE fred::multiplexer              > fred-JUKIBXKdhV: Skip waiting on cluster sync.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Writing command SET to redis:6379
     TRACE fred::protocol::connection     > fred-JUKIBXKdhV: Sending command and flushing the sink.
     TRACE redis_protocol::utils          > allocating more, len: 0, amt: 40
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Encoded 40 bytes to 10.0.78.183:6379. Buffer len: 40 (RESP2)
     TRACE tokio_util::codec::framed_impl > flushing framed transport
     TRACE tokio_util::codec::framed_impl > writing; remaining=40
     WARN  fred::multiplexer::commands    > fred-JUKIBXKdhV: Error writing command None: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Reconnecting or stopping due to error: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting close all sockets message: Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }
     WARN  fred::multiplexer::utils       > fred-JUKIBXKdhV: Error sending close message to socket streams: SendError(Redis Error - kind: IO, details: Os { code: 32, kind: BrokenPipe, message: "Broken pipe" })
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Waiting for client to reconnect...
    resp Err(Redis Error - kind: Timeout, details: Request timed out.)
    

    Next we scale up redis:

    kubectl scale deploy/redis --replicas 1
    

    After a while the connection pool will reconnect:

     DEBUG fred::protocol::connection     > fred-JUKIBXKdhV: Skip setting client name.
     TRACE redis_protocol::utils          > allocating more, len: 0, amt: 24
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Encoded 24 bytes to 10.0.78.183:6379. Buffer len: 24 (RESP2)
     TRACE tokio_util::codec::framed_impl > flushing framed transport
     TRACE tokio_util::codec::framed_impl > writing; remaining=24
     TRACE tokio_util::codec::framed_impl > framed transport flushed
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 4 bytes from 10.0.78.183:6379 (RESP2).
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Parsed 4 bytes from 10.0.78.183:6379
     TRACE tokio_util::codec::framed_impl > frame decoded from buffer
     DEBUG fred::protocol::connection     > fred-JUKIBXKdhV: Read client ID: Resp2(Integer(3))
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Set centralized connection closed sender.
     TRACE mio::poll                      > deregistering event source from poller
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Reconnect task finished reconnecting or syncing with: Ok(())
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Sending 0 commands after reconnecting.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Emitting connect message.
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
    

    And now we have the problem - if we try to execute a command using the client from the connection pool:

     TRACE fred::multiplexer::commands    > fred-JUKIBXKdhV: Recv command on multiplexer SET. Buffer len: 0
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Will block multiplexer loop waiting on SET to finish.
     TRACE fred::multiplexer              > fred-JUKIBXKdhV: Skip waiting on cluster sync.
     DEBUG fred::multiplexer::utils       > fred-JUKIBXKdhV: Writing command SET to redis:6379
     TRACE fred::protocol::connection     > fred-JUKIBXKdhV: Sending command and flushing the sink.
     TRACE redis_protocol::utils          > allocating more, len: 0, amt: 40
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Encoded 40 bytes to 10.0.78.183:6379. Buffer len: 40 (RESP2)
     TRACE tokio_util::codec::framed_impl > flushing framed transport
     TRACE tokio_util::codec::framed_impl > writing; remaining=40
     TRACE tokio_util::codec::framed_impl > framed transport flushed
     DEBUG fred::multiplexer::commands    > fred-JUKIBXKdhV: Waiting on last request to finish without pipelining.
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 5 bytes from 10.0.78.183:6379 (RESP2).
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Parsed 5 bytes from 10.0.78.183:6379
     TRACE tokio_util::codec::framed_impl > frame decoded from buffer
     TRACE fred::multiplexer::responses   > fred-JUKIBXKdhV: Processing response from redis:6379 to SET with frame kind SimpleString
     TRACE fred::multiplexer::responses   > fred-JUKIBXKdhV: Writing to multiplexer sender to unblock command loop.
     WARN  fred::multiplexer::responses   > fred-JUKIBXKdhV: Error sending cmd loop response: ()
     TRACE fred::multiplexer::responses   > fred-JUKIBXKdhV: Responding to caller for SET
     WARN  fred::multiplexer::responses   > fred-JUKIBXKdhV: Failed to respond to caller.
     TRACE tokio_util::codec::framed_impl > attempting to decode a frame
     TRACE fred::protocol::codec          > fred-JUKIBXKdhV: Recv 0 bytes from 10.0.78.183:6379 (RESP2).
    resp Err(Redis Error - kind: Timeout, details: Request timed out.)
    

    The command is actually executed (we can verify this using redis-cli and checking that the key has been SET in the redis cluster.). All following requests using this client will now result in timeouts (although the command is in fact executed!) .

    We use tokio v1.20.1 for reference.

    If we use a headless service in k8s, we do not experience this problem (as the DNS will resolve with 0 ip-addresses when the deployment is scaled to 0 replicas, and resolve the new ip-address once the pod has been moved to the other node).

    opened by birkmose 15
  • Mocking layer missing but still referenced in Cargo.toml

    Mocking layer missing but still referenced in Cargo.toml

    The mocks feature still exists in Cargo.toml but is no longer present in the crate.

    It's mentioned in the prior repository's README.md, and was located under /src/mocks, but has since been removed.

    Two items to address:

    • [ ] Remove the feature flag or mark it as something to be supported in the future
    • [ ] Suggest an alternative method of testing for developers using this library, so people don't reinvent the wheel or write tests with IO dependencies and race conditions on external servers
    enhancement 
    opened by Dessix 13
  • [Bug?] 6.0.0: Reading concurrently from multiple connections

    [Bug?] 6.0.0: Reading concurrently from multiple connections

    It's either a bug in my DNA (© old programming joke) or in fred.

    Redis version - 7.0.5 Platform - Mac Using Docker and/or Kubernetes - no Deployment type - centralized

    Describe the bug

    A brief explanation to why I'm doing things the way the repro is written: I need to read a lot of STREAMs from a Redis Cluster. XREAD is able to read many streams at once, just not in the cluster environment. In cluster environment it can only read from streams that hash into the same slot (not the same node!). So to minimize the number of XREADs I group the channel names by the result of redis_keyslot(), for every slot that has streams I open a connection to the cluster node (not a clustered connection, but a result of the clone_new() from one of split_cluster()) and run XREAD on this connection. And that doesn't work the way I intended. After I run a test case with just a few streams only one stream works as supposed. The others wake up the connection but do not trigger XREADs. If you keep trying to XADD to these "ignored" streams, they suddenly start working (from 1-2 attempts, sometimes more). The original stream that was working stops working. But the data sent to the "ignored" channels to wake them up is lost which probably means that these other XREADs are not even started because otherwise they'd have captured the data due to their "$" wildcard ID. Phew.

    To Reproduce

    This repro simulates this behavior without the cluster and that's why it looks the way it is. The easiest way to produce the repro is to put it into the examples folder and RUST_LOG=trace cargo run --example bug84. Then from another terminal run redis-cli and issue a few commands:

    xadd test0 * v test0
    xadd test1 * v test1
    xadd test0 * v test2
    etc
    

    I had this problem with 5.2.0 and moved to 6.0.0 hoping it's going to be better.

    Quite possibly it's my insufficient understanding on how the async Rust works...

    Repro itself:

    use std::sync::Arc;
    
    use fred::{prelude::*, types::XReadResponse};
    use futures::stream::FuturesUnordered;
    use tokio_stream::StreamExt;
    
    async fn _read(
            client: Arc<RedisClient>,
            stream_name: &String,
        ) -> Result<Vec<(String, String)>, RedisError> {
            let r: XReadResponse<String, String, String, String> =
                client.xread_map(None, Some(0), stream_name, "$".to_string()).await?;
            let mut result = vec![];
            for (stream_name, records) in r.iter() {
                for (last_id, _record) in records {
                    result.push((stream_name.to_string(), last_id.to_string()));
                }
            }
            Ok(result)
        }
    
    #[tokio::main]
    async fn main() -> Result<(), RedisError> {
        pretty_env_logger::init();
    
        let config = RedisConfig::default();
        let mut clients: Vec<Arc<RedisClient>> = vec![];
        let mut stream_names: Vec<String> = vec![];
    
        for i in 0..3 {
            let c = RedisClient::new(config.clone(), None, None);
            c.connect();
            c.wait_for_connect().await?;
            clients.push(Arc::new(c));
            stream_names.push(format!("test{i}"));
        }
    
        loop {
            let mut xreads = FuturesUnordered::new();
            for (i, stream_name) in stream_names.iter().enumerate() {
                xreads.push(_read(clients.get(i).unwrap().clone(), stream_name))
            }
            tokio::select! {
                Some(Ok(events)) = xreads.next() => {
                    for event in events {
                        println!("Stream {} last ID {}", event.0, event.1);
                    }
                }
            }
        }
    }
    
    bug 
    opened by kika 12
  • Different auth for sentinel and redis

    Different auth for sentinel and redis

    Hi. I saw in the example how to connect through sentinel. However, the example seems to rely on the same auth used for both sentinel and redis itself. My company sets it up such that the auth (i.e. username/password) between the two are different, for security purpose. Is there a way I can specific different auths for the two? Thanks!

    enhancement 
    opened by tsukit 11
  • Get on non-existent key returns wrong error

    Get on non-existent key returns wrong error

    Hello, when I try to get a non-existent key I receive this error: Redis Error - kind: Parse, details: Cannot convert to number., but I believe this is the wrong type of error for the triggered action.

    What I would expect

    Since the Redis CLI returns nil maybe the library should just return an Option::None, or a different kind of RedisError like NotFound.

    Personally I think both solutions are right and in an ideal world there should be an option for both behaviors, but for my use case a different kind of error it's preferred since I want to do a specific action on specific errors and the error propagation operator (?) would help me achieve this more simply and in a cleaner way.

    enhancement 
    opened by mercxry 10
  • Connection to Redis Cluster in Minikube

    Connection to Redis Cluster in Minikube

    Hello Alec, I have a problem connecting to a local Redis cluster created in Minikube. If the problem isn't on Fred's side let me know :)

    The setup is relatively simple: there's a single LoadBalancer exposed via minikube tunnel that redirects to a cluster node, and the cluster itself isn't accessible from outside (so the nodes can only talk to each other and the load balancer). Here's the commands I used:

    Using redis-cli on the host with load balancer IP 10.102.101.213 like so:

    redis-cli -c -h 10.102.101.213 -a $REDIS_PASSWORD CLUSTER NODES
    Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
    aceb5bf17a706440a297c5be764f4e4b6a60eb61 172.17.0.7:6379@16379 slave 16da11b0423cafe75261b5adbc2eb13a90358cc5 0 1640566876716 2 connected
    a67bf5b0d784c9be05fba218bdc456b484f37c86 172.17.0.2:6379@16379 myself,slave 2008db068b8a58a28595b37060d52d3736d04a6d 0 1640566873000 1 connected
    a10e1fe9800e7fb2a376f5ec5e1734658b15c6c9 172.17.0.5:6379@16379 master - 0 1640566875000 3 connected 10923-16383
    2008db068b8a58a28595b37060d52d3736d04a6d 172.17.0.6:6379@16379 master - 0 1640566874000 1 connected 0-5460
    53cf7c4159f5896deb1c42478c82a6404ac5f56e 172.17.0.3:6379@16379 slave a10e1fe9800e7fb2a376f5ec5e1734658b15c6c9 0 1640566876000 3 connected
    16da11b0423cafe75261b5adbc2eb13a90358cc5 :0@0 master,noaddr - 1640566845731 1640566845730 2 disconnected 5461-10922
    

    all works as expected.

    Trying to connect to the same cluster with Fred (env variable REDIS_URI is the LoadBalancer IP):

    let config = RedisConfig {
                server: ServerConfig::new_clustered([(dotenv!("REDIS_URI").to_string(), 6379 as u16)].to_vec()),
                fail_fast: true,
                pipeline: true,
                blocking: Blocking::Block,
                username: None,
                password: Some(dotenv!("REDIS_PASSWD").to_string()),
                tls: None,
                tracing: false
            };
            println!("Creating client...");
            let client = RedisClient::new(config);
            println!("Created client!");
    
            let policy = ReconnectPolicy::default();
    
            tokio::spawn(client
                .on_error()
                .for_each(|e| async move {
                    println!("Client received connection error: {:?}", e);
                }));
    
            println!("Checked errors...");
    
            tokio::spawn(client
                .on_reconnect()
                .for_each(move |client| async move {
                    println!("Client {} reconnected.", client.id());
                    // select the database each time we connect or reconnect
                    let _ = client.select(REDIS_SESSION_DB).await;
                }));
    
    
            println!("Checked reconnections...");
    
            client.connect(Some(policy));
            println!("Trying to connect Redis...");
            client.wait_for_connect().await.unwrap();
            println!("Redis connected!");
    

    makes it throw this error:

    Client received connection error: Redis Error - kind: IO, details: Os { code: 113, kind: HostUnreachable, message: "No route to host" }
    

    Given that the info about nodes in cluster is gathered with CLUSTER NODES (per https://docs.rs/fred/latest/fred/types/enum.ServerConfig.html#variant.Clustered), I assume passing a single IP is fine. I guess this is some kind of a DNS/redirect resolution problem. I see that you're currently working on supporting custom DNS resolvers on the client, could this be related?

    Keep up the great work, this is probably the best Redis driver for Rust at the moment!

    P.S. When do you plan to release Streams functionality by the way (in terms of time)?

    good first issue 
    opened by ConstBur 7
  • How can I configure the server parameters?

    How can I configure the server parameters?

    Hello! I need to create an instance of RedisClient with my own parameters. But I only see the default function. Where I can set up my host and port to RedisConfig?

    Also, where i can set a new client name?

    Thank you!

    question 
    opened by juanki03 6
  • [`6.0.0-beta.1`] panic with `partial-tracing` for `debug` lvl on reconnect

    [`6.0.0-beta.1`] panic with `partial-tracing` for `debug` lvl on reconnect

    When I tested re-connection with current version in the main branch, I faced with panic when I enabled debug level tracing.

    2022-12-07T09:55:31.463755Z DEBUG fred::multiplexer::clustered: fred-mpjrO24y5w: Finish synchronizing cluster connections.
        at /usr/local/cargo/git/checkouts/fred.rs-3fb6d817444a1870/4f0c3f4/src/multiplexer/clustered.rs:622 on reactor
    thread 'reactor' panicked at 'tried to clone Id(2251799813685263), but no span exists with that ID
    This may be caused by consuming a parent span (`parent: span`) rather than borrowing it (`parent: &span`).', /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tracing-subscriber-0.3.16/src/registry/sharded.rs:311:32
    note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
    

    It doesn't appear when I use info level and reconnection works totally fine (with this fix) Thanks for all your efforts @aembke!

    opened by DDtKey 5
  • Questions: Read selection strategy and failing replica node handling

    Questions: Read selection strategy and failing replica node handling

    This is actually questions, not issues. I can't seem to find the info around this.

    1. For reading, what's the reading strategy among replica nodes? Round-robin? Or something different? Can it be configured?

    2. How does the RedisClient handle a failing replica node?

    Thank you. Ed

    question 
    opened by tsukit 5
  • Incorrect Cluster Failover Behavior

    Incorrect Cluster Failover Behavior

    The library behaves incorrectly in a Redis cluster failover scenario.

    Reproduce scenario:

    1. Run a Redis cluster in docker with failover
    2. Connect to the cluster
    3. Pause master container with data
    4. Wait for the cluster failover mechanism sync the nodes and rehash slots
    5. The library can no longer read and write data to the node, because reinitialization of slots and nodes does not occur. There is no way (public methods) to re-sync the cluster state and slots on the client side.

    Problems:

    • There is no background monitoring of the cluster state on the client side. Cluster reconfiguration occurs only when the connection is explicitly disconnected or when the answer is "MOVED".
    • Timeout does not trigger reconnect.

    Bottom line: If the connection to the master node of the cluster is lost, the library endlessly unsuccessfully tries to connect to it. This does not reconfigure the cluster and slots on the client side

    opened by neogenie 19
  • Unable to reconnect to Redis Cluster in Kubernetes after cluster rollout

    Unable to reconnect to Redis Cluster in Kubernetes after cluster rollout

    Thanks for the library! We're trying to use this library in a server running in Kubernetes connecting to a Redis Cluster running in the same Kubernetes cluster. We're connecting to Redis using redis-cluster://my-redis-cluster:6379 as the RedisConfig with a RedisPool.

    We noticed that when we go to roll out a new version of the Redis deployment that our fred-using server will hang trying to send commands to the Redis Cluster. The logs we're seeing are of the form:

    Error creating or using backchannel for cluster nodes: Redis Error - kind: IO, details: Os { code: 110, kind: TimedOut, message: "Connection timed out" }
    Failed to reconnect with error Redis Error - kind: Cluster, details: Failed to read cluster nodes on all possible backchannel servers.
    

    Restarting our fred-using server with the same configuration is able to connect to the Redis Cluster just fine.

    After adding some logging, it seems that what's happening is that the IP addresses on the cluster cycle out to different ones, but IIUC fred doesn't try to re-resolve the DNS address to reconnect to the cluster.

    opened by zombiezen 10
Releases(v5.0.0)
  • v5.0.0(May 4, 2022)

    • Remove mocks feature (https://github.com/aembke/fred.rs/issues/36)
    • Change bzpopmin/bzpopmax return value types
    • Fix scan_cluster bugs
    • Add URL parsing for RedisConfig
    Source code(tar.gz)
    Source code(zip)
  • v5.0.0-beta.1(Mar 14, 2022)

    • Rewrite the protocol parser so it can decode frames without moving or copying the underlying bytes
    • Change most command implementations to avoid unnecessary allocations when using static str slices
    • Rewrite the public interface to use different traits for different parts of the redis interface
    • Relax some restrictions on certain commands being used in a transaction
    • Implement the Streams interface (XADD, XREAD, etc)
    • RESP3 support
    • Move most perf configuration options from globals to client-specific config structs
    • Add backpressure configuration options to the client config struct
    • Fix bugs that can occur when using non-UTF8 byte arrays as keys
    • Add the serde-json feature
    • Handle more complicated failure modes with Redis clusters
    • Add a more robust and specialized pubsub subscriber client
    • Ergonomics improvements on the public interfaces
    • Improve docs
    • More tests
    Source code(tar.gz)
    Source code(zip)
  • v4.3.2(Jan 15, 2022)

  • v4.3.1(Dec 20, 2021)

  • v4.3.0(Dec 5, 2021)

  • v4.2.3(Dec 1, 2021)

  • v4.2.2(Nov 30, 2021)

    • Remove some unnecessary async locks
    • Fix client pool wait_for_connect implementation
    • Add tests for https://github.com/aembke/fred.rs/issues/13 and https://github.com/aembke/fred.rs/issues/14
    Source code(tar.gz)
    Source code(zip)
  • v4.2.1(Nov 1, 2021)

  • v4.2.0(Sep 28, 2021)

  • v4.1.0(Sep 27, 2021)

  • 4.0.0(Sep 18, 2021)

  • 3.0.0(Sep 12, 2021)

    • Rewrite to use async/await and new dependencies
    • Switch github repository
    • Redo testing framework
    • Support ACL, geo, cluster, client, hyperloglog, lua, slowlog, and transaction commands
    • Add pipeline testing application
    • Change RedisKey to use bytes instead of str
    • Redo RedisValue implementation
    • Support automatic pipelining
    • Redo cluster state caching implementation
    • Support manual and automatic client unblocking
    • Custom reconnect errors feature
    • Blocking encoding feature
    • Network logs feature
    • Add Tracing support
    • Support client pooling
    • Support the MONITOR command
    • Refactor RedisConfig interface
    • Add chaos monkey tests
    • Add examples
    • Update docs
    Source code(tar.gz)
    Source code(zip)
Owner
Alec Embke
Alec Embke
A type-safe, high performance ORM framework

A type-safe, high performance ORM framework

dark-flames 21 Mar 4, 2022
pgvector support for Rust Supports Rust-Postgres, SQLx, and Diesel

pgvector-rust pgvector support for Rust Supports Rust-Postgres, SQLx, and Diesel Getting Started Follow the instructions for your database library: Ru

Andrew Kane 14 Nov 23, 2022
gobang - A cross-platform TUI database management tool written in Rust

gobang - A cross-platform TUI database management tool written in Rust

Takayuki Maeda 2.1k Jan 1, 2023
A MongoDB ODM for Rust based on Mongoose

Nongoose MongoDB ODM for Rust based on Mongoose Basic usage use mongodb::{bson::oid::ObjectId, sync::Client}; use nongoose::Schema; use serde::{Deseri

NextChat 9 Nov 8, 2022
📺 Netflix in Rust/ React-TS/ NextJS, Actix-Web, Async Apollo-GraphQl, Cassandra/ ScyllaDB, Async SQLx, Kafka, Redis, Tokio, Actix, Elasticsearch, Influxdb Iox, Tensorflow, AWS

Fullstack Movie Streaming Platform ?? Netflix in RUST/ NextJS, Actix-Web, Async Apollo-GraphQl, Cassandra/ ScyllaDB, Async SQLx, Spark, Kafka, Redis,

null 34 Apr 17, 2023
Easy c̵̰͠r̵̛̠ö̴̪s̶̩̒s̵̭̀-t̶̲͝h̶̯̚r̵̺͐e̷̖̽ḁ̴̍d̶̖̔ ȓ̵͙ė̶͎ḟ̴͙e̸̖͛r̶̖͗ë̶̱́ṉ̵̒ĉ̷̥e̷͚̍ s̷̹͌h̷̲̉a̵̭͋r̷̫̊ḭ̵̊n̷̬͂g̵̦̃ f̶̻̊ơ̵̜ṟ̸̈́ R̵̞̋ù̵̺s̷̖̅ţ̸͗!̸̼͋

Rust S̵̓i̸̓n̵̉ I̴n̴f̶e̸r̵n̷a̴l mutability! Howdy, friendly Rust developer! Ever had a value get m̵̯̅ð̶͊v̴̮̾ê̴̼͘d away right under your nose just when

null 294 Dec 23, 2022
Utilities for tokio/tokio-uring based async IO

dbs-fuse The dbs-fuse is a utility crate to support fuse-backend-rs. Wrappers for Rust async io It's challenging to support Rust async io, and it's ev

OpenAnolis Community 6 Oct 23, 2022
Incomplete Redis client and server implementation using Tokio - for learning purposes only

mini-redis mini-redis is an incomplete, idiomatic implementation of a Redis client and server built with Tokio. The intent of this project is to provi

Tokio 2.3k Jan 4, 2023
High-level async Cassandra client written in 100% Rust.

CDRS tokio CDRS is production-ready Apache Cassandra driver written in pure Rust. Focuses on providing high level of configurability to suit most use

Kamil Rojewski 73 Dec 26, 2022
Lightweight async Redis client with connection pooling written in pure Rust and 100% memory safe

redi-rs (or redirs) redi-rs is a Lightweight Redis client with connection pooling written in Rust and 100% memory safe redi-rs is a Redis client writt

Oğuz Türkay 4 May 20, 2023
The most fundamental type for async synchronization: an intrusive linked list of futures

wait-list This crate provides WaitList, the most fundamental type for async synchronization. WaitList is implemented as an intrusive linked list of fu

Sabrina Jewson 7 Oct 26, 2022
A Redis module that provides rate limiting in Redis as a single command.

redis-cell A Redis module that provides rate limiting in Redis as a single command. Implements the fairly sophisticated generic cell rate algorithm (G

Brandur Leach 1.1k Jan 6, 2023
Fault-tolerant Async Actors Built on Tokio

Kameo ???? Fault-tolerant Async Actors Built on Tokio Async: Built on tokio, actors run asyncronously in their own isolated spawned tasks. Supervision

Ari Seyhun 135 Jul 25, 2024
A Rust on-site channel benchmarking helper. Inter-Process (async / busy) & Intra-Process (async single threaded / async multi threaded)

On-Site Rust Channel Benchmarking Helper Deploy on server to determine which public crates are the fastest for communicating in different architecture

null 23 Jul 9, 2024
Russh - Async (tokio) SSH2 client and server rimplementation

Russh Async (tokio) SSH2 client and server rimplementation. This is a fork of Thrussh by Pierre-Étienne Meunier which adds: More safety guarantees AES

Warptech Industries 113 Dec 28, 2022
The gRPC library for Rust built on C Core library and futures

gRPC-rs gRPC-rs is a Rust wrapper of gRPC Core. gRPC is a high performance, open source universal RPC framework that puts mobile and HTTP/2 first. Sta

TiKV Project 1.6k Jan 7, 2023
A fully asynchronous, futures-based Kafka client library for Rust based on librdkafka

rust-rdkafka A fully asynchronous, futures-enabled Apache Kafka client library for Rust based on librdkafka. The library rust-rdkafka provides a safe

Federico Giraud 1.1k Jan 8, 2023
Simple crate that wraps a tokio::process into a tokio::stream

tokio-process-stream tokio-process-stream is a simple crate that wraps a tokio::process into a tokio::stream Having a stream interface to processes is

Leandro Lisboa Penz 8 Sep 13, 2022
Cassandra DB native client written in Rust language. Find 1.x versions on https://github.com/AlexPikalov/cdrs/tree/v.1.x Looking for an async version? - Check WIP https://github.com/AlexPikalov/cdrs-async

CDRS CDRS is looking for maintainers CDRS is Apache Cassandra driver written in pure Rust. ?? Looking for an async version? async-std https://github.c

Alex Pikalov 338 Jan 1, 2023
Redis backed session store for async-session using fred.rs.

async-fred-session Redis backed session store for async-session using fred.rs. This work is mostly based on async-redis-session. use async_fred_sessio

void* 4 Feb 28, 2023