Multithreaded, non-blocking Linux server framework in Rust

Overview

hydrogen

Documentation

hydrogen is a non-blocking socket server framework built atop epoll. It takes care of the tedious connection and I/O marshaling across threads, and leaves the specifics of I/O reading and writing up the consumer, through trait implementations.


Streams

hydrogen works with Stream trait Objects so any custom type can be used. simple-stream was built in conjunction and offers several stream abstractions and types including Plain and Secured streams with basic and WebSocket framing.

Multithreaded

hydrogen is multithreaded. It uses one thread for accepting incoming connections, one for updating epoll reported event, and one used for marshalling I/O into a threadpool of a user specified size.

Slab allocation

The connection pool is managed as a slab, which means traversal times are similar to traversing a Vector, with an insertion and removal time of O(1).

Examples

simple-stream

extern crate hydrogen;
extern crate simple_stream as ss;

use hydrogen;
use hydrogen::{Stream as HydrogenStream, HydrogenSocket};
use ss::frame::Frame;
use ss::frame::simple::{SimpleFrame, SimpleFrameBuilder};
use ss::{Socket, Plain, NonBlocking, SocketOptions};



// Hydrogen requires a type that implements `hydrogen::Stream`.
// We'll implement it atop the `simple-stream` crate.
#[derive(Clone)]
pub struct Stream {
    inner: Plain<Socket, SimpleFrameBuilder>
}

impl HydrogenStream for Stream {
    // This method is called when epoll reports data is available for reading.
    fn recv(&mut self) -> Result<Vec<Vec<u8>>, Error> {
        match self.inner.nb_recv() {
            Ok(frame_vec) => {
                let mut ret_buf = Vec::<Vec<u8>>::with_capacity(frame_vec.len());
                for frame in frame_vec.iter() {
                    ret_buf.push(frame.payload());
                }
                Ok(ret_buf)
            }
            Err(e) => Err(e)
        }
    }

    // This method is called when a previous attempt to write has returned `ErrorKind::WouldBlock`
    // and epoll has reported that the socket is now writable.
    fn send(&mut self, buf: &[u8]) -> Result<(), Error> {
        let frame = SimpleFrame::new(buf);
        self.inner.nb_send(&frame)
    }

    // This method is called when connection has been reported as reset by epoll, or when any
    // `std::io::Error` has been returned.
    fn shutdown(&mut self) -> Result<(), Error> {
        self.inner.shutdown()
    }
}
impl AsRawFd for Stream {
    fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() }
}


// The following will be our server that handles all reported events
struct Server;
impl hydrogen::Handler for Server {
    fn on_server_created(&mut self, fd: RawFd) {
        // Do any secific flag/option setting on the underlying listening fd.
        // This will be the fd that accepts all incoming connections.
    }

    fn on_new_connection(&mut self, fd: RawFd) -> Arc<UnsafeCell<HydrogenStream>> {
        // With the passed fd, create your type that implements `hydrogen::Stream`
        // and return it.
    }

    fn on_data_received(&mut self, socket: HydrogenSocket, buffer: Vec<u8>) {
        // Called when a complete, consumer defined, chunk of data has been read.
    }

    fn on_connection_removed(&mut self, fd: RawFd, err: Error) {
        // Called when a connection has been removed from the watch list, with the
        // `std::io::Error` as the reason removed.
    }
}


fn main() {
    hydrogen::begin(Server, hydrogen::Config {
        addr: "0.0.0.0".to_string(),
        port: 1337,
        max_threads: 8,
        pre_allocated: 100000
    });
}

std::net::TcpStream

extern crate hydrogen;

use std::cell::UnsafeCell;
use std::io::{Read, Write, Error, ErrorKind};
use std::net::{TcpStream, Shutdown};
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;

use hydrogen::{Stream as HydrogenStream, HydrogenSocket};


pub struct Stream {
    inner: TcpStream
}

impl Stream {
    pub fn from_tcp_stream(tcp_stream: TcpStream) -> Stream {
        tcp_stream.set_nonblocking(true);
        Stream {
            inner: tcp_stream
        }
    }
}

impl HydrogenStream for Stream {
    // This method is called when epoll reports data is available for reading.
    fn recv(&mut self) -> Result<Vec<Vec<u8>>, Error> {
        let mut msgs = Vec::<Vec<u8>>::new();

        // Our socket is set to non-blocking, we need to read until
        // there is an error or the system returns WouldBlock.
        // TcpStream offers no guarantee it will return in non-blocking mode.
        // Double check OS specifics on this when using.
        // https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
        let mut total_read = Vec::<u8>::new();
        loop {
            let mut buf = [0u8; 4098];
            let read_result = self.inner.read(&mut buf);
            if read_result.is_err() {
                let err = read_result.unwrap_err();
                if err.kind() == ErrorKind::WouldBlock {
                    break;
                }

                return Err(err);
            }

            let num_read = read_result.unwrap();
            total_read.extend_from_slice(&buf[0..num_read]);
        }

        // Multiple frames, or "msgs", could have been gathered here. Break up
        // your frames here and save remainer somewhere to come back to on the
        // next reads....
        //
        // Frame break out code goes here
        //

        msgs.push(total_read);

        return Ok(msgs);
    }

    // This method is called when a previous attempt to write has returned `ErrorKind::WouldBlock`
    // and epoll has reported that the socket is now writable.
    fn send(&mut self, buf: &[u8]) -> Result<(), Error> {
        self.inner.write_all(buf)
    }

    // This method is called when connection has been reported as reset by epoll, or when any
    // `std::io::Error` has been returned.
    fn shutdown(&mut self) -> Result<(), Error> {
        self.inner.shutdown(Shutdown::Both)
    }
}

impl AsRawFd for Stream {
    fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() }
}

// The following will be our server that handles all reported events
struct Server;
impl hydrogen::Handler for Server {
    fn on_server_created(&mut self, fd: RawFd) {
        // Do any secific flag/option setting on the underlying listening fd.
        // This will be the fd that accepts all incoming connections.
    }

    fn on_new_connection(&mut self, fd: RawFd) -> Arc<UnsafeCell<HydrogenStream>> {
        // With the passed fd, create your type that implements `hydrogen::Stream`
        // and return it.
    }

    fn on_data_received(&mut self, socket: HydrogenSocket, buffer: Vec<u8>) {
        // Called when a complete, consumer defined, chunk of data has been read.
    }

    fn on_connection_removed(&mut self, fd: RawFd, err: Error) {
        // Called when a connection has been removed from the watch list, with the
        // `std::io::Error` as the reason removed.
    }
}


fn main() {
    hydrogen::begin(Server, hydrogen::Config {
        addr: "0.0.0.0".to_string(),
        port: 1337,
        max_threads: 8,
        pre_allocated: 100000
    });
}

Author

Nathan Sizemore, [email protected]

License

hydrogen is available under the MPL-2.0 license. See the LICENSE file for more info.

You might also like...
Rust edit distance routines accelerated using SIMD. Supports fast Hamming, Levenshtein, restricted Damerau-Levenshtein, etc. distance calculations and string search.

triple_accel Rust edit distance routines accelerated using SIMD. Supports fast Hamming, Levenshtein, restricted Damerau-Levenshtein, etc. distance cal

Rust native ready-to-use NLP pipelines and transformer-based models (BERT, DistilBERT, GPT2,...)

rust-bert Rust native Transformer-based models implementation. Port of Hugging Face's Transformers library, using the tch-rs crate and pre-processing

👄 The most accurate natural language detection library in the Rust ecosystem, suitable for long and short text alike
👄 The most accurate natural language detection library in the Rust ecosystem, suitable for long and short text alike

Table of Contents What does this library do? Why does this library exist? Which languages are supported? How good is it? Why is it better than other l

Snips NLU rust implementation

Snips NLU Rust Installation Add it to your Cargo.toml: [dependencies] snips-nlu-lib = { git = "https://github.com/snipsco/snips-nlu-rs", branch = "mas

A fast, low-resource Natural Language Processing and Text Correction library written in Rust.

nlprule A fast, low-resource Natural Language Processing and Error Correction library written in Rust. nlprule implements a rule- and lookup-based app

A fast implementation of Aho-Corasick in Rust.

aho-corasick A library for finding occurrences of many patterns at once with SIMD acceleration in some cases. This library provides multiple pattern s

Natural Language Processing for Rust

rs-natural Natural language processing library written in Rust. Still very much a work in progress. Basically an experiment, but hey maybe something c

finalfusion embeddings in Rust

Introduction finalfusion is a crate for reading, writing, and using embeddings in Rust. finalfusion primarily works with its own format which supports

Rust-tokenizer offers high-performance tokenizers for modern language models, including WordPiece, Byte-Pair Encoding (BPE) and Unigram (SentencePiece) models

rust-tokenizers Rust-tokenizer offers high-performance tokenizers for modern language models, including WordPiece, Byte-Pair Encoding (BPE) and Unigra

Owner
Nathan Sizemore
Nathan Sizemore
Blazingly fast framework for in-process microservices on top of Tower ecosystem

norpc = not remote procedure call Motivation Developing an async application is often a very difficult task but building an async application as a set

Akira Hayakawa 15 Dec 12, 2022
Rust-nlp is a library to use Natural Language Processing algorithm with RUST

nlp Rust-nlp Implemented algorithm Distance Levenshtein (Explanation) Jaro / Jaro-Winkler (Explanation) Phonetics Soundex (Explanation) Metaphone (Exp

Simon Paitrault 34 Dec 20, 2022
Fast suffix arrays for Rust (with Unicode support).

suffix Fast linear time & space suffix arrays for Rust. Supports Unicode! Dual-licensed under MIT or the UNLICENSE. Documentation https://docs.rs/suff

Andrew Gallant 207 Dec 26, 2022
Elastic tabstops for Rust.

tabwriter is a crate that implements elastic tabstops. It provides both a library for wrapping Rust Writers and a small program that exposes the same

Andrew Gallant 212 Dec 16, 2022
An efficient and powerful Rust library for word wrapping text.

Textwrap Textwrap is a library for wrapping and indenting text. It is most often used by command-line programs to format dynamic output nicely so it l

Martin Geisler 322 Dec 26, 2022
⏮ ⏯ ⏭ A Rust library to easily read forwards, backwards or randomly through the lines of huge files.

EasyReader The main goal of this library is to allow long navigations through the lines of large files, freely moving forwards and backwards or gettin

Michele Federici 81 Dec 6, 2022
An implementation of regular expressions for Rust. This implementation uses finite automata and guarantees linear time matching on all inputs.

regex A Rust library for parsing, compiling, and executing regular expressions. Its syntax is similar to Perl-style regular expressions, but lacks a f

The Rust Programming Language 2.6k Jan 8, 2023
Natural language detection library for Rust. Try demo online: https://www.greyblake.com/whatlang/

Whatlang Natural language detection for Rust with focus on simplicity and performance. Content Features Get started Documentation Supported languages

Sergey Potapov 805 Dec 28, 2022
Multilingual implementation of RAKE algorithm for Rust

RAKE.rs The library provides a multilingual implementation of Rapid Automatic Keyword Extraction (RAKE) algorithm for Rust. How to Use Append rake to

Navid 26 Dec 16, 2022
A Rust library for generically joining iterables with a separator

joinery A Rust library for generically joining iterables with a separator. Provides the tragically missing string join functionality to rust. extern c

Nathan West 72 Dec 16, 2022