Fast and easy-to-use event-driven network library.

Overview

message-io is a fast and easy-to-use event-driven network library. The library handles the OS socket internally and offers a simple event message API to the user. It also allows you to make an adapter for your own transport protocol following some rules, delegating the tedious asynchrony and thread management to the library.

If you find a problem using the library or you have an idea to improve it, do not hesitate to open an issue. Any contribution is welcome! And remember: more caffeine, more productive!

Motivation

Managing sockets is hard because you need to fight with threads, concurrency, full duplex, encoding, IO errors that come from the OS (which are really difficult to understand in some situations), etc. If you make use of non-blocking sockets, it adds a new layer of complexity: synchronize the events that come asynchronously from the Operating System.

message-io offers an easy way to deal with all these aforementioned problems, making them transparent for you, the programmer that wants to make an application with its own problems. For that, the library gives you a simple API with two concepts to understand: messages (the data you send and receive), and endpoints (the recipients of that data). This abstraction also offers the possibility to use the same API independently of the transport protocol used. You could change the transport of your application in literally one line.

Features

  • Highly scalable: non-blocking sockets that allow for the management of thousands of active connections.
  • Multiplatform: see mio platform support.
  • Multiple transport protocols (docs):
    • TCP: stream and framed mode (to deal with messages instead of stream)
    • UDP, with multicast option
    • WebSocket: plain and secure#102 option using tungstenite-rs (wasm is not supported but planned).
  • Custom FIFO events with timers and priority.
  • Easy, intuitive and consistent API:
    • Follows KISS principle.
    • Abstraction from transport layer: don't think about sockets, think about messages and endpoints.
    • Only two main entities to use:
      • a NodeHandler to manage all connections (connect, listen, remove, send) and signals (timers, priority).
      • a NodeListener to process all signals and events from the network.
    • Forget concurrency problems: handle all connection and listeners from one thread: "One thread to rule them all".
    • Easy error handling: do not deal with dark internal std::io::Error when sending/receiving from the network.
  • High performance (see the benchmarks):
    • Write/read messages with zero-copy. You write and read directly from the internal OS socket buffer without any copy in the middle by the library.
    • Full duplex: simultaneous reading/writing operations over the same internal OS socket.
  • Customizable: message-io doesn't have the transport you need? Easily add an adapter.

Documentation

Getting started

Add to your Cargo.toml (all transports included by default):

[dependencies]
message-io = "0.14"

If you only want to use a subset of the available transport battery, you can select them by their associated features tcp, udp, and websocket. For example, in order to include only TCP and UDP, add to your Cargo.toml:

[dependencies]
message-io = { version = "0.14", default-features = false, features = ["tcp", "udp"] }

Read before update to 0.14: CHANGELOG.md/0.14

All in one: TCP, UDP and WebSocket echo server

The following example is the simplest server that reads messages from the clients and responds to them with the same message. It is able to offer the "service" for 3 differents protocols at the same time.

use message_io::node::{self};
use message_io::network::{NetEvent, Transport};

fn main() {
    // Create a node, the main message-io entity. It is divided in 2 parts:
    // The 'handler', used to make actions (connect, send messages, signals, stop the node...)
    // The 'listener', used to read events from the network or signals.
    let (handler, listener) = node::split::<()>();

    // Listen for TCP, UDP and WebSocket messages at the same time.
    handler.network().listen(Transport::FramedTcp, "0.0.0.0:3042").unwrap();
    handler.network().listen(Transport::Udp, "0.0.0.0:3043").unwrap();
    handler.network().listen(Transport::Ws, "0.0.0.0:3044").unwrap();

    // Read incoming network events.
    listener.for_each(move |event| match event.network() {
        NetEvent::Connected(_, _) => unreachable!(), // Used for explicit connections.
        NetEvent::Accepted(_endpoint, _listener) => println!("Client connected"), // Tcp or Ws
        NetEvent::Message(endpoint, data) => {
            println!("Received: {}", String::from_utf8_lossy(data));
            handler.network().send(endpoint, data);
        },
        NetEvent::Disconnected(_endpoint) => println!("Client disconnected"), //Tcp or Ws
    });
}

Echo client

The following example shows a client that can connect to the previous server. It sends a message each second to the server and listen its echo response. Changing the Transport::FramedTcp to Udp or Ws will change the underlying transport used.

use message_io::node::{self, NodeEvent};
use message_io::network::{NetEvent, Transport};
use std::time::Duration;

enum Signal {
    Greet,
    // Any other app event here.
}

fn main() {
    let (handler, listener) = node::split();

    // You can change the transport to Udp or Ws (WebSocket).
    let (server, _) = handler.network().connect(Transport::FramedTcp, "127.0.0.1:3042").unwrap();

    listener.for_each(move |event| match event {
        NodeEvent::Network(net_event) => match net_event {
            NetEvent::Connected(_endpoint, _ok) => handler.signals().send(Signal::Greet),
            NetEvent::Accepted(_, _) => unreachable!(), // Only generated by listening
            NetEvent::Message(_endpoint, data) => {
                println!("Received: {}", String::from_utf8_lossy(data));
            },
            NetEvent::Disconnected(_endpoint) => (),
        }
        NodeEvent::Signal(signal) => match signal {
            Signal::Greet => { // computed every second
                handler.network().send(server, "Hello server!".as_bytes());
                handler.signals().send_with_timer(Signal::Greet, Duration::from_secs(1));
            }
        }
    });
}

Test it yourself!

Clone the repository and test the Ping Pong example (similar to the README example but more vitaminized).

Run the server:

cargo run --example ping-pong server tcp 3456

Run the client:

cargo run --example ping-pong client tcp 127.0.0.1:3456

You can play with it by changing the transport, running several clients, disconnecting them, etc. See more here.

Do you need a transport protocol that message-io doesn't have? Add an adapter!

message-io offers two kinds of API. The user API that talks to message-io itself as a user of the library, and the internal adapter API for those who want to add their protocol adapters into the library.

If a transport protocol can be built in top of mio (most of the existing protocol libraries can), then you can add it to message-io really easily:

  1. Add your adapter file in src/adapters/<my-transport-protocol>.rs that implements the traits that you find here. It contains only 8 mandatory functions to implement (see the template), and it takes arround 150 lines to implement an adapter.

  2. Add a new field in the Transport enum found in src/network/transport.rs to register your new adapter.

That's all. You can use your new transport with the message-io API like any other.

Oops! one more step: make a Pull Request so everyone can use it :)

Open source projects using message-io

  • Termchat Terminal chat through the LAN with video streaming and file transfer.
  • Egregoria Contemplative society simulation.
  • Project-Midas Distributed network based parallel computing system.
  • AsciiArena Terminal multiplayer death match game (alpha).
  • LanChat LanChat flutter + rust demo.

Does your awesome project use message-io? Make a Pull Request and add it to the list!

Comments
  • Improve error handling

    Improve error handling

    This is especially important since this crate is meant to be used as a library.

    For a example an nmap scan while termchat is active will crash here https://github.com/lemunozm/message-io/blob/master/src/network_adapter.rs#L348

    bug enhancement 
    opened by sigmaSd 16
  • 32-bit OS

    32-bit OS

    message-io doesn't seem to work on 32-bit systems. Some of the constants in resource_id.rs (namely RESOURCE_TYPE_BIT and ADAPTER_ID_MASK_OVER_ID) overflow in 32-bit systems.

    Perhaps the constants can be updated to make use of std::mem::size_of, or the type changed from usize to u64?

    opened by ray33ee 14
  • Pending task after CTRL+C

    Pending task after CTRL+C

    In main I start two tasks via tokio. One is a graphql server and the other is a service running node_listener.for_each(... to handle messages

        // Create and start discovery service
        // thread::spawn(move || {
        let t1 = tokio::task::spawn_blocking(move || {
            debug!("Creating Discovery Service");
            let dsserver = DiscoveryServer::new();
            if let Ok(discovery) = dsserver {
                debug!("Starting discovery service");
                let _ = discovery.run();
            }
        });
    
        tokio::join!(async move {
            let _ = graphql_server.run().await;
        });
    

    When I want to terminate, the discovery service stays and can only be killed brutally sudo kill -9 PID Is there a clean way to structure the code such that it can terminate gently?

    opened by fgadaleta 12
  • Sendin big data, or multiple small data is not working correctly

    Sendin big data, or multiple small data is not working correctly

    original discussion https://github.com/lemunozm/termchat/pull/11

    Sending multiple small data:

    • using this branch https://github.com/sigmaSd/termchat/tree/send_in_chunks (sends file in small chunks)
    • send a 10mb file with ?send
    • try to send any message after -> no message is received
    • debugging this lead me to encoding here https://github.com/lemunozm/message-io/blob/master/src/encoding.rs#L100 decoded_data at some point becomes always None

    Sending big data:

    • using this branch https://github.com/sigmaSd/termchat/tree/send_files (send the whole file in one send)
    • send 10mb file
    • receiver will crash here https://github.com/lemunozm/message-io/blob/master/src/network.rs#L112 ``Err value: Io(Kind(UnexpectedEof))
    • the issue might be here https://github.com/lemunozm/message-io/blob/master/src/encoding.rs#L11 the PADDING can be too small for the data length
    opened by sigmaSd 11
  • Close connection

    Close connection

    Let's say a client sends a bogus packet, or some error occurs, or the server wants to ban or rate-limit a client, would it be possible to add a way to close an endpoint? That is, cut the connection. The API could look like Network::close(Endpoint). The NetEvent::Disconnected event would be sent to the server, and if a new connection attempt is done, the NetEvent::Connected would be sent again.

    opened by Uriopass 8
  • About the new node API: How do I keep control ?

    About the new node API: How do I keep control ?

    I'm using message-io as part of my game's networking. Using the previous API I was doing something like this (pseudo code)

    let mut game_state = GameState::new();
    let mut networking = Networking::new(); // connects to the server
    
    loop {
        let new_inputs = networking.handle_packets();
        game_state.update(new_inputs);
        draw(&game_state);
    }
    
    
    // handle_packets being something like this
    fn handle_packets(&mut self) -> Option<Inputs> {
        while let Ok(packet) = self.network.try_receive() {
             // handle packet
        }
        self.new_inputs
    }
    

    However, it looks like using the new API I cannot do this as the for_each loop completely takes control of the thread used.

    My guess is that I now have to communicate between my game state and the networking code using channels.. I liked that it was abstracted away.

    opened by Uriopass 7
  • How to use endpoint

    How to use endpoint

    Hi, I have a question , most likely is a simple/silly one but I'm new in Rust so I hope you can answer me:

    I run the ping-pong example and it work. But in order to use it in my project I need to access to client/server endpoint from outside listener loop because I want to keep just one connection all the time between client and server.

    Outside the ping-pong example I have my project (I use a game engine which use winit as crate) where I have my UI with a button that I made in order to send a message when I click on it. I was going to use handler.network().send(server_id, &output_data); But I have no access to the server_id / endpoint because the point where my button trigger is inside at winit crate loop (aka UI).

    Because the UI loop consume(move) the data - it use a closure - it give an error when I try to save endpoint in a external struct because Endpoint doesn't have Clone trait.

    But I'm not confident to make changes yet in your code :).

    So my question is: which solution/option do you suggest? I tried my best to explain my problem but tell me if you need more info.

    opened by git2013vb 6
  • Game server

    Game server

    Hi,

    I am looking into creating a game server for my classical RPG project at Eldiron.com.

    Question: From your perspective what would be a good design implementation to have one thread on the server communicate with the clients and another thread performing the game ticks on the characters ? Is there some kind of best practice for this ?

    Thanks!

    opened by markusmoenig 6
  • Blocking on for_each_async

    Blocking on for_each_async

    I'm trying to implement message-io as a tcp client within the actix framework. This is my current implementation:

    use actix::Message;
    use actix::{
        Actor, ActorContext, AsyncContext, Context, Handler, Running, StreamHandler, WrapFuture,
    };
    use log::info;
    use message_io::network::{NetEvent, Transport};
    use message_io::node::{
        self, NodeEvent, NodeHandler, NodeListener, NodeTask, StoredNetEvent, StoredNodeEvent,
    };
    
    pub struct TcpClientActor {
        handler: NodeHandler<String>,
        listener: Option<NodeListener<String>>,
    }
    
    impl TcpClientActor {
        pub fn new() -> Self {
            let (handler, listener) = node::split::<String>();
            TcpClientActor {
                handler,
                listener: Some(listener),
            }
        }
    }
    
    impl Actor for TcpClientActor {
        type Context = Context<Self>;
    
        fn started(&mut self, ctx: &mut Self::Context) {
            let message = serde_json::json!({
                "op": "subscribe",
                "topic": "/client_count"
            });
            // Listen for TCP, UDP and WebSocket messages at the same time.
            let (server, socket_address) = self
                .handler
                .network()
                .connect(Transport::Tcp, "192.168.43.217:9092")
                .unwrap();
        }
    
        fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
            ctx.stop();
            Running::Stop
        }
    }
    
    #[derive(Message)]
    #[rtype(result = "()")]
    pub struct Listen {}
    
    impl Handler<Listen> for TcpClientActor {
        type Result = ();
    
        fn handle(&mut self, msg: Listen, ctx: &mut Self::Context) -> Self::Result {
            let listener = self.listener.take().unwrap();
            listener
                .for_each_async(move |event| {
                    match event {
                        NodeEvent::Network(net_event) => match net_event {
                            NetEvent::Connected(_endpoint, _ok) => {
                                info!("Connected");
                                // handler.signals().send(message.to_string());
                            }
                            NetEvent::Accepted(_, _) => unreachable!(), // Only generated by listening
                            NetEvent::Message(_endpoint, data) => {
                                println!("Received: {}", String::from_utf8_lossy(data));
                            }
                            NetEvent::Disconnected(_endpoint) => (),
                        },
                        NodeEvent::Signal(signal) => match signal {
                            _ => {
                                // computed every second
                                info!("Signal Received: {}", signal);
                                // handler.signals().send_with_timer(signal, Duration::from_secs(1));
                            }
                        },
                    }
                })
                .wait();
        }
    }
    

    The problem with the current implementation is that whenever the listener.for_each_async() method runs, the thread gets blocked and the entire program freezes. I though for_each_async was non-blocking. Am I misinterpreting something here? Thank you

    Ps: If you need any information regarding the actix framework or anything else, just let me know and I'll write it down. I didn't want to overwhelm the issue with information that might not be needed.

    opened by fabracht 6
  • Compiling message-io with just `websocket` feature breaks.

    Compiling message-io with just `websocket` feature breaks.

    Error message:

    error[E0433]: failed to resolve: could not find `tcp` in `super`
       --> /home/redact/.cargo/registry/src/github.com-1ecc6299db9ec823/message-io-0.14.1/src/adapters/ws.rs:183:45
        |
    183 |                     let tcp_status = super::tcp::check_stream_ready(&stream.0);
        |                                             ^^^ could not find `tcp` in `super`
    

    This might be intended, but in that case there should be websocket = ["tcp"]. It also doesn't make sense for the web either. Maybe have two different websocket features? websocket and websocket-web.

    opened by iMplode-nZ 5
  • multicast example panic (Mac)

    multicast example panic (Mac)

    hi @lemunozm ! excellent work on this crate, very exciting!

    just wanted to share that while i was reviewing and running some of the examples, i found this issue where the second client to attempt running the multicast example panics after connecting, with an Address already in use error.

    image

    bug good first issue 
    opened by dabat 5
  • Packages contain code that will be rejected by a future version of Rust

    Packages contain code that will be rejected by a future version of Rust

    Hello, after adding message-io to new project and compiling, i got the following message:


    warning: the following packages contain code that will be rejected by a future version of Rust: ntapi v0.3.7 note: To solve this problem, you can try the following approaches:

    • Some affected dependencies have newer versions available. You may want to consider updating them to a newer version to see if the issue has been fixed.

    ntapi v0.3.7 has the following newer versions available: 0.4.0

    • If the issue is not solved by updating the dependencies, a fix has to be implemented by those dependencies. You can help with that by notifying the maintainers of this problem (e.g. by creating a bug report) or by proposing a fix to the maintainers (e.g. by creating a pull request):

    • If waiting for an upstream fix is not an option, you can use the [patch] section in Cargo.toml to use your own version of the dependency. For more information, see: https://doc.rust-lang.org/cargo/reference/overriding-dependencies.html#the-patch-section

    note: this report can be shown with cargo report future-incompatibilities --id 4


    It seems that just updating the dependency would fix this issue, in case it does not i would make a issue in their repo :)

    opened by lexika979 9
  • Modify thread name

    Modify thread name

    I would like the ability to provide an alternate thread name for the threads in for_each() and for_each_async(). If this is a change you are open to incorporating, I can make the change and put up a PR. Any interest?

    enhancement 
    opened by jbabyhacker 3
  • Unable to connect with wss

    Unable to connect with wss

    I can't manage to establish wss connections using message-io version 0.14.2 on macOS Big Sur. Example:

    use message_io::network::{NetEvent, RemoteAddr, Transport};
    use message_io::node::{self, NodeEvent};
    
    fn main() {
        connect("ws://echo.websocket.org".to_string());
        connect("wss://echo.websocket.org".to_string());
    }
    
    fn connect(url: String) {
        let (handler, listener) = node::split::<()>();
        handler
            .network()
            .connect(Transport::Ws, RemoteAddr::Str(url.clone()))
            .unwrap();
        listener.for_each(move |event| match event {
            NodeEvent::Network(NetEvent::Connected(e, success)) => {
                println!("{} success={} {:?}", url, success, e);
                handler.stop();
            }
            _ => panic!(),
        });
    }
    

    Output:

    ws://echo.websocket.org success=true Endpoint { resource_id: [3.R.0], addr: 174.129.224.73:80 }
    wss://echo.websocket.org success=false Endpoint { resource_id: [3.R.0], addr: 174.129.224.73:443 }
    

    However, I can connect with wss using websocat from the same machine:

    $ echo foo | websocat wss://echo.websocket.org
    foo
    

    It looks like it's hitting this code path in the ws adapter:

    WS client handshake error: WebSocket protocol error: Handshake not finished
    

    Would appreciate guidance if I'm doing something incorrectly. Thanks!

    enhancement 
    opened by temclaugh 4
  • Websocket web-sys implementation for wasm

    Websocket web-sys implementation for wasm

    Currently, WebSocket is working for client/server side. However, the browsers do not allow to create tcp connections direclty (which is the current implementation based). Instead, the web-sys must be used.

    • Use a different WebSocket implementation if the target is wasm.
    • wasm example of a client.
    enhancement good first issue 
    opened by lemunozm 10
  • FramedTcp handling the nagle algorithm to improve the performance

    FramedTcp handling the nagle algorithm to improve the performance

    This PR handles the Nagle algorithm in FramedTcp allowing to send a message immediately without sending partially (whatever possible). In other words: it disabled the algorithm until the message is fully in the OS sending buffer, and then enables it to send the full message without waiting for another possible message.

    This protocol is message-oriented. This means that receiving less than a message has no meaning for the receiver who needs to wait for the remaining data to produce a meaningful message (which implies allocating in a Decoder). For that reason, sending partial data before having the whole message does not improve anything. What's more, it saturates the network.

    Although this feature implies a latency reduction since once the complete message is OS output buffer it is sent, could also imply an impact on the throughput. Anyway, the user of a FramedTcp transport would want latency versus throughput. If throughput is the target, they will probably send big messages that will be less impacted by this change, or even more probably they prefer to choose TCP transport that fits better for throughput.

    NOTE: Conceptually it should work, but how to evaluate this improvement?

    opened by lemunozm 0
Owner
Luis Enrique Muñoz Martín
Passionate about programming languages. In love with rust.
Luis Enrique Muñoz Martín
Easy-to-use wrapper for WebRTC DataChannels peer-to-peer connections written in Rust and compiling to WASM.

Easy-to-use wrapper for WebRTC DataChannels peer-to-peer connections written in Rust and compiling to WASM.

null 58 Dec 11, 2022
An easy-to-use tunnel to localhost built in Rust. An alternative to ngrok and frp.

rslocal English | 中文 What is rslocal? Rslocal is like ngrok built in Rust, it builds a tunnel to localhost. Project status support http support tcp su

saltbo 220 Jan 7, 2023
easy to use version controll

Verzcon To start off you need to know are you the server or the client Server If you are the server then you should run verzcon --host then it will ma

Arthur Melton 2 Nov 8, 2021
Drpc-Correct, high performance, robust, easy use Remote invocation framework

Drpc - Correct, high performance, robust, easy use Remote invocation framework

darkrpc 30 Dec 17, 2022
A Rust library for parsing the SOME/IP network protocol (without payload interpretation).

someip_parse A Rust library for parsing the SOME/IP network protocol (without payload interpretation). Usage Add the following to your Cargo.toml: [de

Julian Schmid 18 Oct 31, 2022
Lightweight p2p library. Support build robust stable connection on p2p/distributed network.

Chamomile Build a robust stable connection on p2p network features Support build a robust stable connection between two peers on the p2p network. Supp

CympleTech 94 Jan 6, 2023
Core library for Lightning Network peer-to-peer nostr platform

Mostro server This document explains how Mostro works. Overview Due to the growing need to be able to operate with Bitcoin without giving up personal

Mostro 16 Jan 4, 2023
The Safe Network Core. API message definitions, routing and nodes, client core api.

safe_network The Safe Network Core. API message definitions, routing and nodes, client core api. License This Safe Network repository is licensed unde

MaidSafe 101 Dec 19, 2022
Network Block Storage server, written in Rust. Supports pluggable and chainable underlying storage

nbd-rs Disclaimer DO NEVER USE THIS FOR PRODUCTION Do not use this for any data that you cannot afford to lose any moment. Expect data loss, corruptio

Rainlab Inc 10 Sep 30, 2022
Astar Network is an interoperable blockchain based the Substrate framework and the hub for dApps within the Polkadot Ecosystem

Astar Network is an interoperable blockchain based the Substrate framework and the hub for dApps within the Polkadot Ecosystem. With Astar Network and

Astar Network (Plasm) 43 Dec 14, 2022
A decentralized, censorship-resistant, and incentive-compatible packet-routing overlay network

About Earendil is a decentralized, censorship-resistant packet-routing overlay network designed for performance and censorship resistance. It enables

Eric Tung (Yuhao Dong) 8 Jun 24, 2023
Network simulation in Rust

netsim - A Rust library for network simulation and testing (currently linux-only). netsim is a crate for simulating networks for the sake of testing n

Andrew Cann 115 Dec 15, 2022
A private network system that uses WireGuard under the hood.

innernet A private network system that uses WireGuard under the hood. See the announcement blog post for a longer-winded explanation. innernet is simi

Tonari, Inc 4.1k Dec 29, 2022
A Curve-like AMM for Secret Network

A Curve-like AMM for Secret Network. Supports a varibale number of tokens with the same underlying value.

Enigma 16 Dec 11, 2022
A multi-protocol network relay

A multi-protocol network relay

zephyr 43 Dec 13, 2022
Computational Component of Polkadot Network

Gear is a new Polkadot/Kusama parachain and most advanced L2 smart-contract engine allowing anyone to launch any dApp for networks with untrusted code.

null 145 Dec 19, 2022
Fullstack development framework for UTXO-based dapps on Nervos Network

Trampoline-rs The framework for building powerful dApps on the number one UTXO chain, Nervos Network CKB. This is an early-stage, currently very incom

TannrA 2 Mar 25, 2022
Official Implementation of Findora Network.

Findora Platform Wiki Contribution Guide Licensing The primary license for Platform is the Business Source License 1.1 (BUSL-1.1), see LICENSE. Except

Findora Foundation 61 Dec 9, 2022
Simple in-network file transfer with barely any overhead.

fftp fftp is the "Fast File Transport Protocol". It transfers files quickly between computers on a network with low overhead. Motivation FTP uses two

leo 4 May 12, 2022