Lightweight stream-based WebSocket implementation for Rust.

Overview

Tungstenite

Lightweight stream-based WebSocket implementation for Rust.

use std::net::TcpListener;
use std::thread::spawn;
use tungstenite::server::accept;

/// A WebSocket echo server
fn main () {
    let server = TcpListener::bind("127.0.0.1:9001").unwrap();
    for stream in server.incoming() {
        spawn (move || {
            let mut websocket = accept(stream.unwrap()).unwrap();
            loop {
                let msg = websocket.read_message().unwrap();

                // We do not want to send back ping/pong messages.
                if msg.is_binary() || msg.is_text() {
                    websocket.write_message(msg).unwrap();
                }
            }
        });
    }
}

Take a look at the examples section to see how to write a simple client/server.

NOTE: tungstenite-rs is more like a barebone to build reliable modern networking applications using WebSockets. If you're looking for a modern production-ready "batteries included" WebSocket library that allows you to efficiently use non-blocking sockets and do "full-duplex" communication, take a look at tokio-tungstenite.

MIT licensed Apache-2.0 licensed Crates.io Build Status

Documentation

Introduction

This library provides an implementation of WebSockets, RFC6455. It allows for both synchronous (like TcpStream) and asynchronous usage and is easy to integrate into any third-party event loops including MIO. The API design abstracts away all the internals of the WebSocket protocol but still makes them accessible for those who wants full control over the network.

Why Tungstenite?

It's formerly WS2, the 2nd implementation of WS. WS2 is the chemical formula of tungsten disulfide, the tungstenite mineral.

Features

Tungstenite provides a complete implementation of the WebSocket specification. TLS is supported on all platforms using native-tls or rustls available through the native-tls and rustls-tls feature flags.

There is no support for permessage-deflate at the moment. It's planned.

Testing

Tungstenite is thoroughly tested and passes the Autobahn Test Suite for WebSockets. It is also covered by internal unit tests as well as possible.

Contributing

Please report bugs and make feature requests here.

Comments
  • Sharing socket between threads / non-blocking read

    Sharing socket between threads / non-blocking read

    Hello, I'm building a service which demands minimal latency.

    I need to be able to send a message as fast as possible (latency wise), but still engage in potential ping/pong interactions.

    My problem is that read_message seems to block until a message arrives, which won't do.

    I was thinking I could access the underlying stream, split it, then have two threads, one which blocks while waiting for new messages and then handles them, and the other which writes whenever it needs to according to its own independent logic.

    Is this possible? I've heard about using mio to make some of the components async, I saw a set_nonblocking method mentioned in another issue regarding the blocking nature of read_message. I'm overall a bit confused, and can't find an example of how I would achieve an async read (or something equivalent) using mio.

    Thanks so much!

    question 
    opened by wbrickner 20
  • Following an usual non-blocking pattern implementation adds extra time.

    Following an usual non-blocking pattern implementation adds extra time.

    Hi, first of all, thank you for your amazing work in tungstenite-rs!

    I am using your library with non-blocking sockets, and all is working fine. Nevertheless, when I compare times among reading 1-byte in blocking and non-blocking ways I noticed that the non-blocking was around twice as slow: 8us for reading 1-byte in a blocking schema and around 15us reading in a non-blocking schema.

    Investigating about it, this double increment comes from in non-blocking schema I'm "force" to call WebSocket::read_message() twice. Once to receive the byte I sent, and other call to receive the WouldBlock that notify me that there are no more messages:

    loop {
        match websocket.read_message() {
             Ok(message) => (), // 1-byte received.
             Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => break,
             //...
        }
    }
    

    Currently, I fixed this to avoid the second call to read_message() by checking by my self if there is data in the stream or the socket would block:

    loop {
        match websocket.read_message() {
            Ok(message) => {
                // 1-byte received
                if let Err(err) = web_socket.get_ref().peek(&mut [0; 0]) {
                    if err.kind() == ErrorKind::WouldBlock {
                        break;
                    }
                }
            }
            Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => break,
            //...
        }
    }
    

    With the above code, I correctly read a 1-byte message in 8us instead of 15us, but is far to be obvious for a non-blocking user, that is used to perform this kind of reading pattern until getting WouldBlock.

    Taking a look inside read_message() I saw that there is a lot of things done inside. Maybe these things should be done only in the case read_message() has data to read and if not, perform an early exit with WouldBlock.

    question 
    opened by lemunozm 17
  • How to implement ping timeouts with tungstenite::server?

    How to implement ping timeouts with tungstenite::server?

    Do I need to use a Read + Write object with tungstenite::server::accept like in https://github.com/snapview/tungstenite-rs/issues/11#issuecomment-293187432? Anyone has an sample code for that?

    I want to implement ping timeouts but still be able to use websocket.read_message() which blocks.

    question 
    opened by bbigras 13
  • Reading in non-blocking way

    Reading in non-blocking way

    How can I read from a websocket like try_recv() from a std Receiver? So that in my loop I don't have to wait until the client sends a message before I can send a message to the client.

    question 
    opened by Boscop 12
  • Keep processing incoming data even after we have initiated a close handshake.

    Keep processing incoming data even after we have initiated a close handshake.

    This commit will make it so that after initiating a close handshake but before receiving confirmation from the remote, we still pass data messages to client code. An integration test is included verifying that it works correctly.

    This fixes the issue as described here: https://github.com/snapview/tokio-tungstenite/issues/69#issuecomment-530592464

    Note that the websocket RFC is not specific about this. There is however no good reason for tungstenite to impose unnecessary data loss on clients.

    opened by najamelan 11
  • Websocket

    Websocket "BrokenPipe" panic

    Hi,

    Stumbled upon an issue writing websocket data to Grafana where write_message() panics with a IO error [below]. This seems to occur after ~28sec everytime, where prior to this everything works perfectly, writing a test sine wave onto a dashboard. The Python implementation of websockets doesn't have this issue, currently running for 2 hrs without issue. I feel that this maybe keep-alive related but that's about the limit of my knowledge! I've tried to slow/speed up data write rates, wrap write_msg() in can_write(), calling write_pending() all to no avail. Any ideas what could cause this on tungstenite, and how to fix?

    Cheers all.

    Code

    `  let mut request = "ws://localhost:3000/api/live/push/sinewave".into_client_request()?;
        request.headers_mut().append(http::header::AUTHORIZATION, "Bearer SomeAPIKey=".parse().unwrap());
    
        let (mut socket, response) = connect(request).unwrap(); 
    
    
        let mut i: f32 = 0.0;
                
        loop {
    
            let sine = i.to_radians().sin();
            let now = SystemTime::now();
            let since_the_epoch = now
                .duration_since(UNIX_EPOCH)
                .expect("Time went backwards");
    
            let json_out = format!("test sinewave={} {}", sine, since_the_epoch.as_nanos());
    
            println!("{}", json_out);
    
            socket.write_message(Message::Text(json_out)).unwrap();
                  
    
            if i == 360.0 {
                i = 0.0;
            } else {
                i += 1.0;
            }
    
            thread::sleep( Duration::from_millis(50));    
        }`
    

    Env Grafana running localhost in docker container. Working with Python.

    Error

    thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Io(Os { code: 32, kind: BrokenPipe, message: "Broken pipe" })', src/websock.rs:43:59
    note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
    
    question 
    opened by matt-walker0 10
  • Add `permessage-deflate` support

    Add `permessage-deflate` support

    • Adds de/compression logic on top of the existing flow, requiring minimal changes. Shouldn't have any major breaking changes.
    • client_max_window_bits/server_max_window_bits are not supported at the moment because that requires flate2/zlib feature. We can enable them with a feature flag later if there's a demand.
    • Passes autobahn except for 13.3 to 13.6, that requires supporting max window bits
    • Can be used from crates like warp and axum.
      • tokio-tungstenite: https://github.com/snapview/tokio-tungstenite/pull/190
      • warp: https://github.com/seanmonstar/warp/compare/master...kazk:permessage-deflate
      • Example using warp with compression: https://github.com/qualified/lsp-ws-proxy/commit/89fce502cf792cfa89e814865bbe170666499c0d

    This works for my use case, but I'd like some feedback from maintainers and users before polishing this to be merged.

    • Does it make sense to think about supporting more PMCEs (e.g., permessage-bzip2 and permessage-snappy)? These are not standardized as far as I know, and browsers don't support them.
    • ~~I think including flate2 without feature flag is fine (not large, defaults to pure Rust). What do you think? If there's a demand, we can add a flag for flate2/zlib and support max window bits.~~
    • Should the default WebSocketConfig enable compression support?

    • [x] deflate feature
    • [x] Parsing Sec-WebSocket-Extensions more strictly with headers (https://github.com/kazk/tungstenite-rs/tree/permessage-deflate-rebased)
    • [ ] Error on invalid negotiation response
    • [ ] Error handling
    • [ ] deflate-zlib feature for max window bits

    Closes #2

    opened by kazk 10
  • optionally disable verbose logging to improve performance

    optionally disable verbose logging to improve performance

    Currently the logging is too verbose. As seen from the flamegraph, half of the time is spent on logging. This PR adds a feature no-verbose-logging that disables these annoying loggings. image

    opened by qiujiangkun 10
  • Review the implementation of WebSocketContext::read_message

    Review the implementation of WebSocketContext::read_message

    Following somewhat from the discussion here, but I opened a separate issue, because I really wonder whether this does what it's author intended.

    /// Read a message from the provided stream, if possible.
    ///
    /// This function sends pong and close responses automatically.
    /// However, it never blocks on write.
    pub fn read_message<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
    where
        Stream: Read + Write,
    {
        // Do not read from already closed connections.
        self.state.check_active()?;
        loop {
            // Since we may get ping or close, we need to reply to the messages even during read.
            // Thus we call write_pending() but ignore its blocking.
            self.write_pending(stream).no_block()?;
            // If we get here, either write blocks or we have nothing to write.
            // Thus if read blocks, just let it return WouldBlock.
            if let Some(message) = self.read_message_frame(stream)? {
                trace!("Received message {}", message);
                return Ok(message);
            }
        }
    }
    

    The loop seems like an iffy control structure here. After looking through the methods it calls, if I'm not mistaking, it can only loop if there is a fragmented message being read. However the comment suggests this will deal with responding to ping and close.

    So this will not respond to ping and close directly, but only on the next read. That's inconvenient for client code, especially when using through tokio-tungstenite, which means not being able to call read_message directly when a ping or close is returned here.

    It also means that write_pending is called in a loop when a framed message is being read. That doesn't seem very useful. I would think in the least the call to write pending should be before the loop, if we really want to only respond to close/ping from the last read.

    Nicer would be to respond to close/ping directly. It's not so clear to me what the best solution is. Something like this comes to mind:

    // This loops so we continue reading frames when a fragmented message is coming in.
    // There are 4 conditions that break from this loop:
    // - errors
    // - ConnectionClosed
    // - nothing to read anymore (WouldBlock)
    // - a complete message was read
    loop {
        if let Some(message) = self.read_message_frame(stream)? {
            trace!("Received message {}", message);
            match message
            {
                // Since we may get ping or close, we need to reply to the messages even during read.
                // Thus we call write_pending() but ignore its blocking.
                Close(..) | Ping(..) => self.write_pending(stream).no_block()?,
                _ => {}
            };
            return Ok(message);
        }
    }
    

    The problem here is that write_pending might encounter an error. In that case, we have taken the message out the underlying stream, but we will never return it. It just get's lost. We just return the error.

    There are several possible solutions to this problem, like storing the error on self and returning that on the next read. I'm not sure what would be best.

    question 
    opened by najamelan 10
  • Permanently frozen WebSock stream

    Permanently frozen WebSock stream

    Hi! This issue is probably a bit unusual because I'm mid-debugging but please bear with me.

    I'm investigating spontaneous freezes in a WS client app. Sometimes they would happen every day and other times after a week. So far I pinpointed the cause to be Tugstenite's read_message() function.

    Here's a screenshot of part of the call stack from a live debugger during the freeze. It's just happened and I have not debugged any further yet:

    image

    It looks like it's stuck on reading from the TCP stream. The connection is to a 3rd party service and I'm guessing it might have died without closing the socket. I'd expect Tungstenite to report an error in this case instead of blocking forever.

    I thought about setting a read timeout on the underlying stream as a possible workaround. I'm not sure if this might break any Tungstenite invariants but it seems unlikely as timeouts should just be treated as normal errors.

    Please let me know if you need more information. I'll be keeping the debugger open for the time being.

    I will report back when I have more. Cheers! =)

    question 
    opened by wbogocki 9
  • Add support for rustls as TLS backend

    Add support for rustls as TLS backend

    This PR adds rustls as an alternative to the current native-tls. In addition, I changed the default feature set to not include any TLS implementation which is a breaking change as people have to add the TLS feature explicitly now.

    I'm not sure what way you prefer to have the feature set so I will adjust to the way you prefer once you let me know.

    Resolves #89.

    opened by dnaka91 8
  • Add `permessage-deflate` support

    Add `permessage-deflate` support

    Supersedes #235.

    • Adds optional de/compression logic on top of the existing flow without any major breaking changes.
    • permessage-deflate is not enabled by default and requires a deflate feature.
    • client_max_window_bits and server_max_window_bits are not supported yet because these require flate2/zlib which is not pure Rust.
    • Passes Autobahn except for 13.3 to 13.6. These require supporting max window bits.
    • Correct extension negotiation. The server declines invalid offers, and the client errors on invalid response to terminate the connection.
    • Sec-WebSocket-Extensions header is parsed by headers
      • [ ] Waiting for https://github.com/hyperium/headers/pull/88
    • https://github.com/snapview/tokio-tungstenite/pull/251
    • Clean warp integration: https://github.com/seanmonstar/warp/pull/1016

    I might need to adjust some code depending on https://github.com/hyperium/headers/pull/88, but this should be ready for review.


    Closes #2.

    opened by kazk 0
  • Improve adding custom headers

    Improve adding custom headers

    If you try to add custom headers to a connect request you can initialize a request object from the http crate, however this doesn't add the required ws headers like sec-websocket-accept and upgrade . The best way to add additional headers to a ws connect request is unnecessarily complicated.

                let mut request = WS_URL.into_client_request().unwrap();
                request.headers_mut().insert("User-Agent", "Some-custom/UserAgent".parse().unwrap());
    

    This is the only method that allows the socket to actually connect, maybe the request builder can be exposed optionally that adds the necessary headers added by the into_client_request implementation for URI?

    opened by Taha-Firoz 1
  • Error's `Display` and `Error::source` contain duplicate info

    Error's `Display` and `Error::source` contain duplicate info

    Error returns duplicate information since the error source is included in both the Display and Error::source, for example logging errors with tracing produce the following:

    2022-11-04T19:05:01.777878Z  WARN crate: error=websocket errored error.sources=[IO error: unexpected end of file, unexpected end of file]
    

    It would be nice to either include downstream error information in just the Error::source (leaving Display with "IO error") as discussed here: https://github.com/dtolnay/thiserror/issues/38 or to not implement Error::source for the Error::Io variant.

    opened by vilgotf 1
  • Intigration with Oss-Fuzz

    Intigration with Oss-Fuzz

    Hi @agalakhov and @application-developer-DA, I would like to help integrate this project into OSS-Fuzz.

    OSS-Fuzz is a free service run by Google that performs continuous fuzzing of important open-source projects.

    As tungstenite-rs is already cargo-fuzz based fuzzed, this makes it easily compatible with oss-fuzz out of box.

    If you would like to integrate, the only thing I need is a list of email(s), it must be associated with a google account like gmail (why?). by doing that, the provided email(s) will get access to the data produced by OSS-Fuzz, such as bug reports, coverage reports and more stats.

    As an alternative, if you don't have a google/gmail id, but still wish to integrate. I can add my mail id for time being and monitor bug/crashes.

    Notice the email(s) affiliated with the project will be public in the OSS-Fuzz repo, as they will be part of a configuration file.

    I personally have found many bugs with other open-source repositories.

    opened by code-terror 5
  • Half-blocking mode. Is it supported?

    Half-blocking mode. Is it supported?

    Hi

    I'd like to clarify if something that I am trying to implement is actually supported/possible.

    I have a separate proprietary binary which can load dynamic library. It's this dynamic library that I am trying to implement using tungstenite-rs. This library should contain web-socket client.

    However this client must not block on reading socket since it works within host binary thread. Blocking on write is desired & possible. So I decide to implement "half-blocking" tcp stream in Rust like following.

    struct TcpCustomStream {
        s: TcpStream,
    }
    
    impl TcpCustomStream {
        pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
            match TcpStream::connect(addr) {
                Ok(x) => Ok(Self { s: x }),
                Err(x) => Err(x),
            }
        }
    
        pub fn set_nonblocking(&mut self) -> io::Result<()> {
            self.s.set_nonblocking(true).map_err(|_| {
                std::io::Error::new(std::io::ErrorKind::Other, "set_nonblocking call failed")
            })
        }
    }
    
    impl std::io::Read for TcpCustomStream {
        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
            // non-blocking read
            self.s.read(buf)
        }
    }
    
    impl std::io::Write for TcpCustomStream {
        fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
            // blocking write
            let mut l = 0usize;
            loop {
                match self.s.write(buf) {
                    Err(e) if would_block(&e) => {
                        //println!("write err {:?}", e);
                        thread::yield_now();
                    }
                    Err(x) => {
                        println!("write {:?}", x);
                        break Err(x);
                    }
                    Ok(size) => {
                        l += size;
                        debug_assert!(size <= buf.len());
                        buf = &buf[size..];
                        if buf.len() == 0 {
                            break Ok(l);
                        }
                    }
                }
            }
        }
        fn flush(&mut self) -> std::io::Result<()> {
            self.s.flush()
        }
    }
    
    fn would_block(err: &std::io::Error) -> bool {
        match err {
            x if (x.kind() == io::ErrorKind::WouldBlock)
                || (x.kind() == std::io::ErrorKind::Interrupted) =>
            {
                true
            }
            _ => false,
        }
    }
    
    ...
    
    pub fn connect(uri_str: &'a str) -> Result<WebSocket<TcpCustomStream>, std::io::Error> {
            println!("Client started");
    
            let req = Uri::from_maybe_shared(uri_str.to_string()).map_err(|_| {
                std::io::Error::new(std::io::ErrorKind::InvalidInput, "Invalid connect URI")
            })?;
    
            let host_port = req
                .authority()
                .ok_or(std::io::Error::new(
                    std::io::ErrorKind::InvalidInput,
                    "Invalid connect URI",
                ))?
                .as_str();
            let stream = TcpCustomStream::connect(host_port)?;
            println!("Client connected");
    
            let (mut client, _) = client(req, stream).map_err(|_| {
                std::io::Error::new(
                    std::io::ErrorKind::Other,
                    format!("Cannot connect to {}", uri_str),
                )
            })?;
            println!("WSClient connected");
    
            client.get_mut().set_nonblocking()?;
    
            Ok(client)
        }
    
    

    So my question - is it really possible to use stream like that together with tungstenite-rs? Or I violate some invariants?

    BR

    help wanted question 
    opened by MageSlayer 5
  • socket closes without error on hibernate / standby

    socket closes without error on hibernate / standby

    When putting a (windows-)PC into standby (/hibernate) the loop (when using this syntax: loop { read_message() })pauses never to start again. Also no error is thrown so I can't reconnect. This has happend to me accross multiple projects.

    opened by Proxtx 5
Owner
Snapview GmbH
Open source projects of Snapview GmbH
Snapview GmbH
Jamsocket is a lightweight framework for building WebSocket-based application backends.

Jamsocket is a lightweight framework for building services that are accessed through WebSocket connections.

null 94 Dec 30, 2022
A lightweight framework for building WebSocket-based application backends.

Jamsocket Jamsocket is a lightweight framework for building services that are accessed through WebSocket connections. Services can either be native Ru

drifting in space 94 Dec 30, 2022
A WebSocket (RFC6455) library written in Rust

Rust-WebSocket Note: Maintainership of this project is slugglish. You may want to use tungstenite or tokio-tungstenite instead. Rust-WebSocket is a We

Rust Websockets 1.3k Jan 6, 2023
A WebSocket (RFC6455) library written in Rust

Rust-WebSocket Rust-WebSocket is a WebSocket (RFC6455) library written in Rust. Rust-WebSocket provides a framework for dealing with WebSocket connect

Jason N 19 Aug 22, 2022
An aria2 websocket jsonrpc in Rust.

aria2-ws An aria2 websocket jsonrpc in Rust. Built with tokio. Docs.rs aria2 RPC docs Features Almost all methods and structed responses Auto reconnec

null 8 Sep 7, 2022
A simple toy websocket client to connect to Bitstamp.net and print the live order book written in Rust.

A simple toy websocket client to connect to Bitstamp.net and print the live order book written in Rust.

Nate Houk 1 Feb 14, 2022
Websocket generic library for Bitwyre WS-API

Websocket Core (Rust) Websocket generic server library for: Periodic message broadcast Eventual (Pubsub) message broadcast Async request reply Authors

Bitwyre 13 Oct 28, 2022
A CLI development tool for WebSocket APIs

A CLI development tool for WebSocket APIs

Espen Henriksen 622 Dec 26, 2022
Spawn process IO to websocket with full PTY support.

Cliws Spawn process IO to websocket with full PTY support. Features Any process IO through Websocket Full pty support: VIM, SSH, readline, Ctrl+X Auto

B23r0 91 Jan 5, 2023
The ever fast websocket tunnel built on top of lightws

Kaminari The ever fast websocket tunnel built on top of lightws. Intro Client side receives tcp then sends [tcp/ws/tls/wss]. Server side receives [tcp

zephyr 261 Dec 27, 2022
websocket client

#websocket client async fn test_websocket()->anyhow::Result<()> { wasm_logger::init(wasm_logger::Config::default()); let (tx, rx) = futures_c

null 1 Feb 11, 2022
A webserver and websocket pair to stop your viewers from spamming !np and "what's the song?" all the time.

spotify-np ?? spotify-np is a Rust-based local webserver inspired by l3lackShark's gosumemory application, but the catch is that it's for Spotify! ??

Noire 2 Aug 27, 2022
WebSocket-to-HTTP reverse proxy

websocket-bridge This is a simple reverse proxy server which accepts WebSocket connections and forwards any incoming frames to backend HTTP server(s)

Fermyon 5 Dec 21, 2022
A secure, real-time, low-latency binary WebSocket RPC subprotocol.

HardLight A secure, real-time, low-latency binary WebSocket RPC subprotocol. HardLight has two data models: RPC: a client connects to a server, and ca

valera 5 Apr 2, 2023
Lightweight, event-driven WebSockets for Rust.

WS-RS Lightweight, event-driven WebSockets for Rust. /// A WebSocket echo server listen("127.0.0.1:3012", |out| { move |msg| { out.send(ms

Jason Housley 1.3k Jan 8, 2023
SockJS server for rust language

SockJS server SockJS server for Actix framework. API Documentation Cargo package: sockjs SockJS is built with Actix web Minimum supported Rust version

Actix 63 Oct 7, 2022
A very-very simple url shortener for Rust

urlshortener-rs A very simple urlshortener for Rust. This library aims to implement as much URL shortener services as possible and to provide an inter

Victor Polevoy 39 Nov 20, 2022
Synchronized state machines for Rust over WebSockets.

Aper is a framework for real-time sharing of application state over WebSockets.

null 191 Dec 20, 2022
Rust + wasm + websockets

This is a template repo for eframe, a framework for writing apps using egui.

Emil Ernerfeldt 12 Oct 3, 2022