When using the examples for the mqtt_client and the mqtt_server. The server panics when the client tries to subscribe.
Code:
Server:
#[tokio::main]
async fn main() {
pretty_env_logger::init();
mquictt_server::server(
&([127, 0, 0, 1], 1883).into(),
mquictt_server::Config::read(&"server.json").unwrap(),
)
.await
.unwrap();
}
Client:
use bytes::Bytes;
use log::error;
#[tokio::main]
async fn main() -> Result<(), mquictt_client::Error> {
pretty_env_logger::init();
// create a client
let mut client = mquictt_client::Client::connect(
&([127, 0, 0, 1], 2000).into(),
&([127, 0, 0, 1], 1883).into(),
"localhost",
"0",
mquictt_client::Config::read(&"client.json").unwrap(),
)
.await?;
// create a publisher for a particular topic
let mut publisher = client
.publisher("hello/world", Bytes::from("hello"))
.await?;
let mut subscriber = client.subscriber("hello/world").await?;
publisher.publish(Bytes::from("hello again!"))?;
publisher.flush().await.unwrap();
tokio::spawn(async move {
for i in 0..100 {
if let Err(e) = publisher.publish(Bytes::from(format!("{}!", i))) {
error!("{}", e);
}
}
if let Err(e) = publisher.flush().await {
error!("{}", e);
}
if let Err(e) = publisher.close().await {
error!("{}", e);
}
});
// Read from subscriber
while let Ok(data) = subscriber.read().await {
println!("{}", std::str::from_utf8(&data).unwrap());
}
subscriber.close().await.unwrap();
client.close().await.unwrap();
Ok(())
}
Terminal Output:
Server:
DEBUG rustls::anchors > add_pem_file processed 133 valid and 0 invalid certs
DEBUG rustls::anchors > add_pem_file processed 1 valid and 0 invalid certs
INFO mquictt_server > QUIC server launched at 127.0.0.1:1883
DEBUG rustls::server::hs > decided upon suite SupportedCipherSuite { suite: TLS13_CHACHA20_POLY1305_SHA256, kx: BulkOnly, bulk: CHACHA20_POLY1305, hash: SHA256, sign: None, enc_key_len: 32, fixed_iv_len: 12, explicit_nonce_len: 0 }
INFO mquictt_server > accepted conn from 127.0.0.1:2000
DEBUG mquictt_server > connection handler spawned for 127.0.0.1:2000
DEBUG mquictt_server > stream accepted for 127.0.0.1:2000
DEBUG mquictt_server > recved CONNECT packet from 127.0.0.1:2000, len = 15
DEBUG mquictt_server > sent CONNACK packet to 127.0.0.1:2000
DEBUG rustls::server::hs > decided upon suite SupportedCipherSuite { suite: TLS13_CHACHA20_POLY1305_SHA256, kx: BulkOnly, bulk: CHACHA20_POLY1305, hash: SHA256, sign: None, enc_key_len: 32, fixed_iv_len: 12, explicit_nonce_len: 0 }
INFO mquictt_server > accepted conn from 127.0.0.1:2000
DEBUG mquictt_server > connection handler spawned for 127.0.0.1:2000
DEBUG mquictt_server > stream accepted for 127.0.0.1:2000
DEBUG mquictt_server > recved CONNECT packet from 127.0.0.1:2000, len = 15
DEBUG mquictt_server > sent CONNACK packet to 127.0.0.1:2000
thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value', /.../mquictt-server/src/lib.rs:143:67
Client:
DEBUG rustls::anchors > add_pem_file processed 1 valid and 0 invalid certs
DEBUG rustls::client::hs > No cached session for DNSNameRef("localhost")
DEBUG rustls::client::hs > Not resuming any session
DEBUG rustls::client::hs > Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256
DEBUG rustls::client::tls13 > Not resuming
DEBUG rustls::client::tls13 > TLS1.3 encrypted extensions: [ServerNameAck, TransportParameters([1, 2, 103, 16, 3, 2, 69, 200, 4, 8, 255, 255, 255, 255, 255, 255, 255, 255, 5, 4, 128, 19, 18, 208, 6, 4, 128, 19, 18, 208, 7, 4, 128, 19, 18, 208, 8, 2, 64, 100, 9, 2, 64, 100, 11, 1, 0, 14, 1, 5, 64, 182, 0, 2, 16, 57, 98, 213, 215, 55, 84, 147, 245, 185, 74, 80, 241, 248, 145, 215, 143, 32, 4, 128, 0, 255, 255, 0, 20, 128, 144, 30, 143, 26, 145, 154, 167, 146, 223, 51, 38, 97, 85, 189, 143, 48, 188, 32, 57, 15, 8, 2, 136, 98, 5, 251, 151, 183, 147])]
DEBUG rustls::client::hs > ALPN protocol is None
DEBUG rustls::client::tls13 > Got CertificateRequest CertificateRequestPayloadTLS13 { context: PayloadU8([]), extensions: [SignatureAlgorithms([ECDSA_NISTP384_SHA384, ECDSA_NISTP256_SHA256, ED25519, RSA_PSS_SHA512, RSA_PSS_SHA384, RSA_PSS_SHA256, RSA_PKCS1_SHA512, RSA_PKCS1_SHA384, RSA_PKCS1_SHA256]), AuthorityNames([PayloadU16([48, 129, 128, 49, 14, 48, 12, 6, 3, 85, 4, 6, 19, 5, 73, 110, 100, 105, 97, 49, 18, 48, 16, 6, 3, 85, 4, 8, 19, 9, 75, 97, 114, 110, 97, 116, 97, 107, 97, 49, 18, 48, 16, 6, 3, 85, 4, 7, 19, 9, 66, 97, 110, 103, 97, 108, 111, 114, 101, 49, 23, 48, 21, 6, 3, 85, 4, 9, 19, 14, 83, 117, 98, 98, 105, 97, 104, 32, 71, 97, 114, 100, 101, 110, 49, 15, 48, 13, 6, 3, 85, 4, 17, 19, 6, 53, 54, 48, 48, 49, 49, 49, 28, 48, 26, 6, 3, 85, 4, 10, 19, 19, 73, 79, 84, 32, 69, 120, 112, 114, 101, 115, 115, 32, 80, 118, 116, 32, 76, 116, 100])])] }
DEBUG rustls::client::tls13 > Attempting client auth
INFO mquictt_client > connected to 127.0.0.1:1883
DEBUG mquictt_client > sent CONNECT to 127.0.0.1:1883, len = 15
DEBUG rustls::client::tls13 > Ticket saved
DEBUG mquictt_client > recved CONNACK from 127.0.0.1:1883
created client
DEBUG mquictt_client > flushed 20 bytes
DEBUG mquictt_client > sent SUBSCRIBE to 127.0.0.1:1883
Error: ConnectionBroken```