MQTT over QUIC

Overview

MQuicTT

MQuicTT logo


🚧 This is a pre-alpha project, tread carefully 🚧

A rustlang utility/library for MQTT over QUIC.

QUIC allows us to send data over multiple concurrent streams. We can leverage this ability to multiplex the pub-sub architecture of MQTT. This means that:

  • when a MQTT client is subscribed to multiple topics, it can get data for these subscriptions concurrently. Even if data for some subscriptions gets delayed, rest of the subscriptions can carry on. When running MQTT over TCP, a single subscription's data being delayed can block rest of the data as well (see head-of-line blocking). Similarly on the server side, the server can send data for multiple subscriptions concurrently.
  • when a MQTT client publishes messages on multiple topics, these can again me sent concurrently, and the server can receive them concurrently.

QUIC also provides at-most-once message delivery guarantees, and thus the MQTT running on top of it gets QoS2 support by default. Thus the library does not give an option to configure the QoS.

As QUIC is basically a state-machine slapped on top of UDP, the only change needed is at application level. We can use the UDP APIs exposed by the Rust's standard library and we are good to go!.

Usage

To create a MQTT server:

async fn spawn_server() {
    mquictt::server(
        &([127, 0, 0, 1], 1883).into(),
        mquictt::Config::read(&"server.json").unwrap(),
    )
    .await
    .unwrap();
}

To create a MQTT client:

use bytes::Bytes;

async fn spawn_client() {
    // create a client
    let mut client = mquictt::Client::connect(
        &([127, 0, 0, 1], 2000).into(),
        &([127, 0, 0, 1], 1883).into(),
        "localhost",
        "0",
        mquictt::Config::read(&"client.json").unwrap(),
    )
    .await.unwrap();

    // create a publisher for a particular topic
    let mut publisher = client
        .publisher("hello/world", Bytes::from("hello"))
        .await.unwrap();
    publisher.publish(Bytes::from("hello again!")).await.unwrap();

    // create a subscriber
    let mut subscriber = client.subscriber("hello/world").await.unwrap();
    let read = subscriber.read().await.unwrap();
    println!("{}", std::str::from_utf8(&read).unwrap());

    Ok(())
}

Acknowledgements

We use:

Comments
  • Use Broadcast channels

    Use Broadcast channels

    • [x] Use tokio::sync::broadcast::channel() instead of flume::bounded, each topic gets a channel.
    • [x] Subscribing streams get owned copy of receiver(subscribed once) that can recv().await for new publishes.
    • [x] Publishing streams must ask for write() lock every time to perform send().
    • [x] Topic -> Channel mapping is performed by shared Arc<Mutex<HashMap>>, this is accessed every time a new subscriber is created or when a stream wants to publish.

    NOTE: Subscribers can create Rx in case no publisher exists, then they shall wait for a publisher to access the Tx and forward publishes to all subscribed streams. I am not sure if broadcasts are the best solution, this is just my proposal

    opened by de-sh 1
  • logs

    logs

    • [X] sled integration
    • [X] using sled::Db to share data, and use notifications based system for reading data
    • [X] fix examples
      • [X] check for any bugs
    • [X] update readme
    • [X] remove irrelevant debug logging
    opened by abhikjain360 0
  • Handle sub to topic not in mapper

    Handle sub to topic not in mapper

    Closes #14

    Adds a slot in the mapper to be used to place a SubReqRx in when a SubReq channel is created on a subscribe on topic not in Mapper and enable it to find and utilize the SubReqRx when a publish is actually received on the topic in the future.

    opened by de-sh 0
  • mquictt_server panics after subscribe by client

    mquictt_server panics after subscribe by client

    When using the examples for the mqtt_client and the mqtt_server. The server panics when the client tries to subscribe.

    Code:

    Server:

    #[tokio::main]
    async fn main() {
        pretty_env_logger::init();
    
        mquictt_server::server(
            &([127, 0, 0, 1], 1883).into(),
            mquictt_server::Config::read(&"server.json").unwrap(),
        )
        .await
        .unwrap();
    }
    

    Client:

    use bytes::Bytes;
    use log::error;
    
    #[tokio::main]
    async fn main() -> Result<(), mquictt_client::Error> {
        pretty_env_logger::init();
    
        // create a client
        let mut client = mquictt_client::Client::connect(
            &([127, 0, 0, 1], 2000).into(),
            &([127, 0, 0, 1], 1883).into(),
            "localhost",
            "0",
            mquictt_client::Config::read(&"client.json").unwrap(),
        )
        .await?;
    
        // create a publisher for a particular topic
        let mut publisher = client
            .publisher("hello/world", Bytes::from("hello"))
            .await?;
    
        let mut subscriber = client.subscriber("hello/world").await?;
        publisher.publish(Bytes::from("hello again!"))?;
        publisher.flush().await.unwrap();
        tokio::spawn(async move {
            for i in 0..100 {
                if let Err(e) = publisher.publish(Bytes::from(format!("{}!", i))) {
                    error!("{}", e);
                }
            }
            if let Err(e) = publisher.flush().await {
                error!("{}", e);
            }
            if let Err(e) = publisher.close().await {
                error!("{}", e);
            }
        });
    
        // Read from subscriber
        while let Ok(data) = subscriber.read().await {
            println!("{}", std::str::from_utf8(&data).unwrap());
        }
        subscriber.close().await.unwrap();
        client.close().await.unwrap();
    
        Ok(())
    }
    

    Terminal Output:

    Server:

    DEBUG rustls::anchors > add_pem_file processed 133 valid and 0 invalid certs
     DEBUG rustls::anchors > add_pem_file processed 1 valid and 0 invalid certs
     INFO  mquictt_server  > QUIC server launched at 127.0.0.1:1883
     DEBUG rustls::server::hs > decided upon suite SupportedCipherSuite { suite: TLS13_CHACHA20_POLY1305_SHA256, kx: BulkOnly, bulk: CHACHA20_POLY1305, hash: SHA256, sign: None, enc_key_len: 32, fixed_iv_len: 12, explicit_nonce_len: 0 }
     INFO  mquictt_server     > accepted conn from 127.0.0.1:2000
     DEBUG mquictt_server     > connection handler spawned for 127.0.0.1:2000
     DEBUG mquictt_server     > stream accepted for 127.0.0.1:2000
     DEBUG mquictt_server     > recved CONNECT packet from 127.0.0.1:2000, len = 15
     DEBUG mquictt_server     > sent CONNACK packet to 127.0.0.1:2000
     DEBUG rustls::server::hs > decided upon suite SupportedCipherSuite { suite: TLS13_CHACHA20_POLY1305_SHA256, kx: BulkOnly, bulk: CHACHA20_POLY1305, hash: SHA256, sign: None, enc_key_len: 32, fixed_iv_len: 12, explicit_nonce_len: 0 }
     INFO  mquictt_server     > accepted conn from 127.0.0.1:2000
     DEBUG mquictt_server     > connection handler spawned for 127.0.0.1:2000
     DEBUG mquictt_server     > stream accepted for 127.0.0.1:2000
     DEBUG mquictt_server     > recved CONNECT packet from 127.0.0.1:2000, len = 15
     DEBUG mquictt_server     > sent CONNACK packet to 127.0.0.1:2000
    thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value', /.../mquictt-server/src/lib.rs:143:67
    

    Client:

     DEBUG rustls::anchors > add_pem_file processed 1 valid and 0 invalid certs
     DEBUG rustls::client::hs > No cached session for DNSNameRef("localhost")
     DEBUG rustls::client::hs > Not resuming any session
     DEBUG rustls::client::hs > Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256
     DEBUG rustls::client::tls13 > Not resuming
     DEBUG rustls::client::tls13 > TLS1.3 encrypted extensions: [ServerNameAck, TransportParameters([1, 2, 103, 16, 3, 2, 69, 200, 4, 8, 255, 255, 255, 255, 255, 255, 255, 255, 5, 4, 128, 19, 18, 208, 6, 4, 128, 19, 18, 208, 7, 4, 128, 19, 18, 208, 8, 2, 64, 100, 9, 2, 64, 100, 11, 1, 0, 14, 1, 5, 64, 182, 0, 2, 16, 57, 98, 213, 215, 55, 84, 147, 245, 185, 74, 80, 241, 248, 145, 215, 143, 32, 4, 128, 0, 255, 255, 0, 20, 128, 144, 30, 143, 26, 145, 154, 167, 146, 223, 51, 38, 97, 85, 189, 143, 48, 188, 32, 57, 15, 8, 2, 136, 98, 5, 251, 151, 183, 147])]
     DEBUG rustls::client::hs    > ALPN protocol is None
     DEBUG rustls::client::tls13 > Got CertificateRequest CertificateRequestPayloadTLS13 { context: PayloadU8([]), extensions: [SignatureAlgorithms([ECDSA_NISTP384_SHA384, ECDSA_NISTP256_SHA256, ED25519, RSA_PSS_SHA512, RSA_PSS_SHA384, RSA_PSS_SHA256, RSA_PKCS1_SHA512, RSA_PKCS1_SHA384, RSA_PKCS1_SHA256]), AuthorityNames([PayloadU16([48, 129, 128, 49, 14, 48, 12, 6, 3, 85, 4, 6, 19, 5, 73, 110, 100, 105, 97, 49, 18, 48, 16, 6, 3, 85, 4, 8, 19, 9, 75, 97, 114, 110, 97, 116, 97, 107, 97, 49, 18, 48, 16, 6, 3, 85, 4, 7, 19, 9, 66, 97, 110, 103, 97, 108, 111, 114, 101, 49, 23, 48, 21, 6, 3, 85, 4, 9, 19, 14, 83, 117, 98, 98, 105, 97, 104, 32, 71, 97, 114, 100, 101, 110, 49, 15, 48, 13, 6, 3, 85, 4, 17, 19, 6, 53, 54, 48, 48, 49, 49, 49, 28, 48, 26, 6, 3, 85, 4, 10, 19, 19, 73, 79, 84, 32, 69, 120, 112, 114, 101, 115, 115, 32, 80, 118, 116, 32, 76, 116, 100])])] }
     DEBUG rustls::client::tls13 > Attempting client auth
     INFO  mquictt_client        > connected to 127.0.0.1:1883
     DEBUG mquictt_client        > sent CONNECT to 127.0.0.1:1883, len = 15
     DEBUG rustls::client::tls13 > Ticket saved
     DEBUG mquictt_client        > recved CONNACK from 127.0.0.1:1883
    created client
     DEBUG mquictt_client        > flushed 20 bytes
     DEBUG mquictt_client        > sent SUBSCRIBE to 127.0.0.1:1883
    Error: ConnectionBroken```
    
    
    
    opened by jannemannX 4
  • review client code

    review client code

    The code in mquictt_client/src/lib.rs is severely out of date compared to the assumptions made by the server side. This requires a thorough review of the client side code. This will also solve #7.

    opened by abhikjain360 1
  • Connections are dropping without close

    Connections are dropping without close

    It can be noted that the client is unable to close the connection before dropping off. This is causing a timeout based closure:

     ERROR mquictt_server > Read Error : connection closed: timed out
    

    We need to figure out how a Disconnect MQTT packet can trigger server to gracefully exit the connection.

    opened by de-sh 1
  • Roadmap / Issue tracker

    Roadmap / Issue tracker

    • [x] Implement QUIC connections and streams using TLS1.3 and quinn.
    • [x] Implement MQTT broker with QUIC for connections.
    • [ ] Implement MQTT client with QUIC for connections:
      • [ ] #7 (@de-sh).
    • [ ] Add logging using sled to log publishes (@abhikjain360).

    Status The library is usable, but not complete, hence the plan is to release in beta once we have sufficient features implemented.

    opened by de-sh 0
Owner
MQTT over QUIC
null
QUIC proxy that allows to use QUIC to connect to an SSH server without needing to patch the client or the server.

quicssh-rs ?? quicssh-rs is a QUIC proxy that allows to use QUIC to connect to an SSH server without needing to patch the client or the server. quicss

Jun Ouyang 18 May 5, 2023
An experimental HTTP server in Rust that supports HTTP/1.1, HTTP/2, and HTTP/3 over QUIC.

?? H123 An experimental HTTP server in Rust that supports HTTP/1.1, HTTP/2, and HTTP/3 over QUIC. Warning This is an experimental project and not inte

Naoki Ikeguchi 7 Dec 15, 2022
Converts Hikvision camera events to MQTT

HikSink streams Hikvision camera and NVR events (motion, line crossing, tamper, illegal logins, etc.) to MQTT messages for consumption by home automat

Corner Bit 48 Dec 27, 2022
TeleMQ is an experimental MQTT broker implemented in Rust language.

TeleMQ TeleMQ is an experimental MQTT broker implemented in Rust language. The broker implements MQTT version 3.1.1 specification. License This projec

null 12 Dec 27, 2022
Small MQTT router. Allows creating multiple inputs/outputs and run action when input triggers.

MQRT Small MQTT router. Allows creating multiple inputs/outputs and run action when input triggers. Features multi-(input/output) multiple actions tie

Nazar Gondaruk 0 Jan 4, 2022
Export statistics of Mosquitto MQTT broker (topic: $SYS) to Prometheus

Preface The Mosquitto MQTT broker provides a number of statistics on the special $SYS/# topic (see mosquitto(8)). Build requirements As a Rust program

Bobobo-bo Bo-bobo 2 Dec 15, 2022
Pure rust mqtt cilent

NOTE: Archived. No further development under this repo. Follow progress of a different implementation here Pure rust MQTT client which strives to be s

Ather Energy Pvt Ltd 201 Dec 2, 2022
This Intelligent Transportation Systems (ITS) MQTT client based on the JSon ETSI specification transcription provides a ready to connect project for the mobility

This Intelligent Transportation Systems (ITS) MQTT client based on the JSon ETSI specification transcription provides a ready to connect project for the mobility (connected and autonomous vehicles, road side units, vulnerable road users,...). Let's connect your device or application to our Intelligent Transport Systems (ITS) platform!

Orange 4 Nov 29, 2022
Subscribe to MQTT topics and push them to InfluxDB 1.x or v2

MQTT 2 InfluxDB Subscribe to MQTT topics and push them to InfluxDB 1.x or v2 Something like Telegraf for MQTT like it does with inputs.mqtt_consumer a

null 2 Feb 20, 2022
Convert TeleInfo frames from a Linky meter's serial port to Home Assistant-compatible MQTT messages.

teleinfo2mqtt-rs Convert TeleInfo frames from a Linky meter's serial port to Home Assistant-compatible MQTT messages. Overview sequenceDiagram par

Stanislas 4 Mar 19, 2024
πŸ₯§ Savoury implementation of the QUIC transport protocol and HTTP/3

quiche is an implementation of the QUIC transport protocol and HTTP/3 as specified by the IETF. It provides a low level API for processing QUIC packet

Cloudflare 7.1k Jan 8, 2023
Futures-based QUIC implementation in Rust

Pure-rust QUIC protocol implementation Quinn is a pure-rust, future-based implementation of the QUIC transport protocol undergoing standardization by

null 2.6k Jan 8, 2023
neqo β€” an Implementation of QUIC written in Rust

Neqo, an Implementation of QUIC written in Rust To run test HTTP/3 programs (neqo-client and neqo-server): cargo build ./target/debug/neqo-server [::]

Mozilla 1.6k Jan 7, 2023
TCP is so widely used, however QUIC may have a better performance.

TCP is so widely used, however QUIC may have a better performance. For softwares which use protocols built on TCP, this program helps them take FULL advantage of QUIC.

zephyr 15 Jun 10, 2022
Peer-to-peer communications library for Rust based on QUIC protocol

qp2p Crate Documentation MaidSafe website SAFE Dev Forum SAFE Network Forum Overview This library provides an API to simplify common tasks when creati

MaidSafe 337 Dec 14, 2022
A high-performance, lightweight, and cross-platform QUIC library

TQUIC English | δΈ­ζ–‡ TQUIC is a high-performance, lightweight, and cross-platform library for the IETF QUIC protocol. Advantages High performance: TQUIC

Tencent 11 Oct 27, 2023
A tcp over http2 + tls proxy

mtunnel A tcp over http2 + tls proxy. Usage 1. get certificates, by following steps. 2. make your config client config: { "local_addr": "127.0.0.1

cssivision 9 Sep 5, 2022
πŸ€– brwrs is a new protocol running over TCP/IP that is intended to be a suitable candidate for terminal-only servers

brwrs is a new protocol running over TCP/IP that is intended to be a suitable candidate for terminal-only servers (plain text data). That is, although it can be accessed from a browser, brwrs will not correctly interpret the browser's GET request.

daCoUSB 3 Jul 30, 2021
Bevy asset loader that transparently supports loading over http(s)

Bevy Web Asset This is a tiny crate that that wraps the standard bevy asset loader, and adds the ability to load assets from http and https urls. Supp

Johan Klokkhammer Helsing 28 Jan 2, 2023