High performance I/O framework written by Rust inspired by Netty

Overview

Introduction

Retty is a High performance I/O framework written by Rust inspired by Netty

基于mio的IO多路复用高并发、高性能网络通信开发框架

Feature

  • Rayon 线程池包装 EventLoop / EventLoopGroup
  • IO多路复用模型
  • 内置Bytebuf数据容器
  • ChannelPipeline 模型
  • 默认支持TCP 未来支持UDP

还没写完,刚实现了一部分功能。 我会努力的。。。。。。

  • 2022-1-28 : 完成出入站handler分离

Channel_Handler_Context 分离

Channel_Handler_Context_Pipeline 分离

包装TCPStream 和 Channel

// todo: implement

  • 内置固定消息长度字段解码器
  • 内置HTTP 协议解码器
  • 内置WebSocket 协议解码器
  • 内置flatBuffer 解码器
  • 内置protoBuffer 解码器

Quick Start

: channel_active 新连接上线: {}", addr); channel_handler_ctx.write_and_flush(&mut format!("::: 欢迎你:==>{}", addr)); let attr = channel_handler_ctx.channel().get_attribute("User".to_string()); let attr = attr.lock().unwrap(); let attr = attr.downcast_ref::().unwrap(); println!("========================================================:att:::: {}", attr); } fn channel_inactive(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) { println!("is_active:{}", channel_handler_ctx.channel().is_active()); println!("远端断开连接: Inactive: channel_id : {}", channel_handler_ctx.channel().id()) } fn channel_read(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, message: &mut dyn Any) { let msg = message.downcast_ref::().unwrap(); println!("业务处理 Handler --> :收到消息:{}", msg); println!("reactor-excutor :{}", thread::current().name().unwrap()); channel_handler_ctx.write_and_flush(&mut format!("::: I Love You !!!! :==>{}", msg)); let attr = channel_handler_ctx.channel().get_attribute("User".to_string()); let attr = attr.lock().unwrap(); let attr = attr.downcast_ref::().unwrap(); println!("========================================================:att:::: {}", attr); } fn channel_exception(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, error: RettyErrorKind) { channel_handler_ctx.fire_channel_exception(error); } } struct Decoder { excutor: Arc, } impl ChannelInboundHandler for Decoder { fn id(&self) -> String { return "decoder_handler".to_string(); } fn channel_active(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) { // set attribute channel_handler_ctx.channel().set_attribute("User".to_string(), Box::new("lgphp".to_string())); println!("解码 Handler --> : channel_active 新连接上线: {}", channel_handler_ctx.channel().remote_addr().unwrap()); channel_handler_ctx.fire_channel_active(); } fn channel_inactive(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) { channel_handler_ctx.fire_channel_inactive() } fn channel_read(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, message: &mut dyn Any) { let mut buf = message.downcast_mut::().unwrap(); println!("解码 Handler --> 收到Bytebuf:"); // 解码 let _pkt_len = buf.read_u32_be(); let _ver = buf.read_u32_be(); let mut msg = buf.read_string_with_u8_be_len(); channel_handler_ctx.fire_channel_read(&mut msg); } fn channel_exception(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, error: RettyErrorKind) { channel_handler_ctx.fire_channel_exception(error); } } impl Decoder { fn new() -> Self { Decoder { excutor: Arc::new(rayon_core::ThreadPoolBuilder::new().num_threads(1).build().unwrap()) } } } /// /// 入站异常handler 通常在最后一个 /// struct InboundExceptionHandler {} impl ChannelInboundHandler for InboundExceptionHandler { fn id(&self) -> String { String::from("InboundExceptionHandler") } fn channel_active(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) {} fn channel_inactive(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) {} fn channel_read(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, message: &mut dyn Any) {} fn channel_exception(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, error: RettyErrorKind) { let mut ch = channel_handler_ctx.channel(); // 处理 ReadIdleTimeout if error.kind == ErrorKind::TimedOut { println!("channel_id:{} 在 {}", ch.id(), format!("{} ms 没有读到数据! , error_message:{}", ch.read_idle_timeout_ms(), error.message)); ch.close() } } } impl InboundExceptionHandler { fn new() -> Self { InboundExceptionHandler {} } } struct Encoder { excutor: Arc, } impl ChannelOutboundHandler for Encoder { fn id(&self) -> String { return "encoder_handler".to_string(); } fn channel_write(&mut self, channel_handler_ctx: &mut ChannelOutboundHandlerCtx, message: &mut dyn Any) { let msg = message.downcast_ref::().unwrap(); println!("回执消息,编码器 :====>Encoder Handler:{}", msg); let mut buf = ByteBuf::new_with_capacity(0); let re = format!("回执消息,编码器 :====>Encoder Handler:{}", msg); buf.write_u32_be((1 + re.as_bytes().len()) as u32); buf.write_string_with_u8_be_len(re); channel_handler_ctx.fire_channel_write(&mut buf); } } impl Encoder { fn new() -> Self { Encoder { excutor: Arc::new(rayon_core::ThreadPoolBuilder::new().num_threads(1).build().unwrap()) } } } fn main() { let mut bootstrap = Bootstrap::new_server_bootstrap(); bootstrap.worker_group(8) .bind("0.0.0.0", 1512) .opt_ttl_ms(1000) .opt_keep_alive_ms(30000) .opt_nodelay(false) .opt_send_buf_size(65535) .opt_recv_buf_size(65535) .opt_read_idle_timeout_ms(3000) .initialize_inbound_handler_pipeline(|| { let mut handler_pipe = ChannelInboundHandlerPipe::new(); let decoder_handler = Box::new(Decoder::new()); let biz_handler = Box::new(BizHandler::new()); let excetion_handler = Box::new(InboundExceptionHandler::new()); handler_pipe.add_last(Box::new(FirstIntegerLengthFieldDecoder::new())); handler_pipe.add_last(decoder_handler); handler_pipe.add_last(biz_handler); handler_pipe.add_last(excetion_handler); handler_pipe }) .initialize_outbound_handler_pipeline(|| { let mut handler_pipe = ChannelOutboundHandlerPipe::new(); let encoder_handler = Box::new(Encoder::new()); handler_pipe.add_last(encoder_handler); handler_pipe }).start(); // use default_event_loop let mut new_default_event_loop_group = EventLoopGroup::new_default_event_loop_group(9); new_default_event_loop_group.execute(|| { println!(" default_event_loop execute Task ..... is here") }); WaitGroup::new().clone().wait(); } ">
use std::any::Any;
use std::io::ErrorKind;
use std::sync::{Arc, Mutex};
use std::thread;

use bytebuf_rs::bytebuf::ByteBuf;
use crossbeam::sync::WaitGroup;
use rayon_core::ThreadPool;
use uuid::Uuid;

use retty::core::bootstrap::Bootstrap;
use retty::core::eventloop::EventLoopGroup;
use retty::errors::RettyErrorKind;
use retty::handler::channel_handler_ctx::{ChannelInboundHandlerCtx, ChannelOutboundHandlerCtx};
use retty::handler::codec::first_integer_length_field_decoder::FirstIntegerLengthFieldDecoder;
use retty::handler::handler::{ChannelInboundHandler, ChannelOutboundHandler};
use retty::handler::handler_pipe::{ChannelInboundHandlerPipe, ChannelOutboundHandlerPipe};
struct BizHandler {
    excutor: Arc<ThreadPool>,
}

impl BizHandler {
    fn new() -> Self {
        BizHandler {
            excutor: Arc::new(rayon_core::ThreadPoolBuilder::new().num_threads(1).build().unwrap())
        }
    }
}

impl ChannelInboundHandler for BizHandler {
    fn id(&self) -> String {
        return "biz_handler".to_string();
    }

    fn channel_active(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) {
        let addr = channel_handler_ctx.channel().remote_addr().unwrap();
        println!("业务处理 Handler --> : channel_active 新连接上线: {}", addr);
        channel_handler_ctx.write_and_flush(&mut format!("::: 欢迎你:==>{}", addr));
        let attr = channel_handler_ctx.channel().get_attribute("User".to_string());
        let attr = attr.lock().unwrap();
        let attr = attr.downcast_ref::<String>().unwrap();
        println!("========================================================:att:::: {}", attr);
    }

    fn channel_inactive(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) {
        println!("is_active:{}", channel_handler_ctx.channel().is_active());
        println!("远端断开连接: Inactive: channel_id : {}", channel_handler_ctx.channel().id())
    }

    fn channel_read(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, message: &mut dyn Any) {
        let msg = message.downcast_ref::<String>().unwrap();
        println!("业务处理 Handler  --> :收到消息:{}", msg);
        println!("reactor-excutor :{}", thread::current().name().unwrap());
        channel_handler_ctx.write_and_flush(&mut format!("::: I Love You !!!! :==>{}", msg));
        let attr = channel_handler_ctx.channel().get_attribute("User".to_string());
        let attr = attr.lock().unwrap();
        let attr = attr.downcast_ref::<String>().unwrap();
        println!("========================================================:att:::: {}", attr);
    }

    fn channel_exception(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, error: RettyErrorKind) {
        channel_handler_ctx.fire_channel_exception(error);
    }
}


struct Decoder {
    excutor: Arc<ThreadPool>,
}

impl ChannelInboundHandler for Decoder {
    fn id(&self) -> String {
        return "decoder_handler".to_string();
    }

    fn channel_active(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) {
        // set attribute
        channel_handler_ctx.channel().set_attribute("User".to_string(), Box::new("lgphp".to_string()));
        println!("解码 Handler --> : channel_active 新连接上线: {}", channel_handler_ctx.channel().remote_addr().unwrap());
        channel_handler_ctx.fire_channel_active();
    }

    fn channel_inactive(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) {
        channel_handler_ctx.fire_channel_inactive()
    }

    fn channel_read(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, message: &mut dyn Any) {
        let mut buf = message.downcast_mut::().unwrap();
        println!("解码 Handler --> 收到Bytebuf:");
        // 解码
        let _pkt_len = buf.read_u32_be();
        let _ver = buf.read_u32_be();
        let mut msg = buf.read_string_with_u8_be_len();
        channel_handler_ctx.fire_channel_read(&mut msg);
    }

    fn channel_exception(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, error: RettyErrorKind) {
        channel_handler_ctx.fire_channel_exception(error);
    }
}

impl Decoder {
    fn new() -> Self {
        Decoder {
            excutor: Arc::new(rayon_core::ThreadPoolBuilder::new().num_threads(1).build().unwrap())
        }
    }
}

///
/// 入站异常handler 通常在最后一个
///
struct InboundExceptionHandler {}

impl ChannelInboundHandler for InboundExceptionHandler {
    fn id(&self) -> String {
        String::from("InboundExceptionHandler")
    }

    fn channel_active(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) {}

    fn channel_inactive(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) {}

    fn channel_read(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, message: &mut dyn Any) {}

    fn channel_exception(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, error: RettyErrorKind) {
        let mut ch = channel_handler_ctx.channel();

        // 处理 ReadIdleTimeout

        if error.kind == ErrorKind::TimedOut {
            println!("channel_id:{} 在 {}", ch.id(), format!("{} ms 没有读到数据! , error_message:{}", ch.read_idle_timeout_ms(), error.message));
            ch.close()
        }
    }
}

impl InboundExceptionHandler {
    fn new() -> Self {
        InboundExceptionHandler {}
    }
}


struct Encoder {
    excutor: Arc<ThreadPool>,
}

impl ChannelOutboundHandler for Encoder {
    fn id(&self) -> String {
        return "encoder_handler".to_string();
    }


    fn channel_write(&mut self, channel_handler_ctx: &mut ChannelOutboundHandlerCtx, message: &mut dyn Any) {
        let msg = message.downcast_ref::<String>().unwrap();
        println!("回执消息,编码器 :====>Encoder Handler:{}", msg);
        let mut buf = ByteBuf::new_with_capacity(0);
        let re = format!("回执消息,编码器 :====>Encoder Handler:{}", msg);
        buf.write_u32_be((1 + re.as_bytes().len()) as u32);
        buf.write_string_with_u8_be_len(re);
        channel_handler_ctx.fire_channel_write(&mut buf);
    }
}

impl Encoder {
    fn new() -> Self {
        Encoder {
            excutor: Arc::new(rayon_core::ThreadPoolBuilder::new().num_threads(1).build().unwrap())
        }
    }
}


fn main() {
    let mut bootstrap = Bootstrap::new_server_bootstrap();
    bootstrap.worker_group(8)
        .bind("0.0.0.0", 1512)
        .opt_ttl_ms(1000)
        .opt_keep_alive_ms(30000)
        .opt_nodelay(false)
        .opt_send_buf_size(65535)
        .opt_recv_buf_size(65535)
        .opt_read_idle_timeout_ms(3000)
        .initialize_inbound_handler_pipeline(|| {
            let mut handler_pipe = ChannelInboundHandlerPipe::new();
            let decoder_handler = Box::new(Decoder::new());
            let biz_handler = Box::new(BizHandler::new());
            let excetion_handler = Box::new(InboundExceptionHandler::new());
            handler_pipe.add_last(Box::new(FirstIntegerLengthFieldDecoder::new()));
            handler_pipe.add_last(decoder_handler);
            handler_pipe.add_last(biz_handler);
            handler_pipe.add_last(excetion_handler);
            handler_pipe
        })
        .initialize_outbound_handler_pipeline(|| {
            let mut handler_pipe = ChannelOutboundHandlerPipe::new();
            let encoder_handler = Box::new(Encoder::new());
            handler_pipe.add_last(encoder_handler);
            handler_pipe
        }).start();

    // use  default_event_loop
    let mut new_default_event_loop_group = EventLoopGroup::new_default_event_loop_group(9);
    new_default_event_loop_group.execute(|| {
        println!(" default_event_loop  execute Task ..... is here")
    });
    WaitGroup::new().clone().wait();
}
You might also like...
Fast Function Dispatch: Improving the performance of Rust's dynamic function calls

Fast Function Dispatch: Improving the performance of Rust's dynamic function calls A safe, pragmatic toolkit for high-performance virtual function cal

TCP is so widely used, however QUIC may have a better performance.

TCP is so widely used, however QUIC may have a better performance. For softwares which use protocols built on TCP, this program helps them take FULL advantage of QUIC.

An end-to-end encrypted, anonymous IP-hiding, decentralized, audio/video/file sharing/offline messaging multi-device platform built for both communications and application security and performance.

An end-to-end encrypted, anonymous IP-hiding, decentralized, audio/video/file sharing/offline messaging multi-device platform built for both communications and application security and performance.

A high performence Socks5 proxy server with bind/reverse support implementation by Rust.

rsocx A high performence Socks5 proxy server with bind/reverse support implementation by Rust Features Async-std No unsafe code Single executable Linu

Actor framework for Rust.

Actix Actor framework for Rust Documentation User Guide API Documentation API Documentation (master branch) Features Async and sync actors Actor commu

rpcx microservice framework in Rust

rpcx-rs Rust library for rpcx rpc/microservice framework. Use the simplest style to explore Rust function as cross-platform rpc services. If you can w

Implementing Bendersnatch curve using Arkwork's framework in Rust.

This is a reference implementation of Bendersnatch curve using Arkwork's framework in Rust. The spec of the curve is available here. There was also a Python reference implementation here.

Rust implementation of PRECIS Framework: Preparation, Enforcement, and Comparison of Internationalized Strings in Application Protocols

Rust PRECIS Framework libray PRECIS Framework: Preparation, Enforcement, and Comparison of Internationalized Strings in Application Protocols as descr

A simple message based networking library for the bevy framework

Spicy Networking for Bevy bevy_spicy_networking is a solution to the "How do I connect multiple clients to a single server" problem in your bevy games

Owner
lgphp
true man!
lgphp
Drpc-Correct, high performance, robust, easy use Remote invocation framework

Drpc - Correct, high performance, robust, easy use Remote invocation framework

darkrpc 30 Dec 17, 2022
A high performance/low-overhead OpenMetrics library for Rust

* * * EXPERIMENTAL * * * discreet-metrics A high-performance/low-overhead metrics library aiming to conform with OpenMetrics and to satisfy the follow

null 2 Sep 14, 2022
A library-first, lightweight, high-performance, cloud-native supported API gateway🪐 by RUST

Preview version, will not guarantee the stability of the API! Do NOT use in production environment! A library-first, lightweight, high-performance, cl

Ideal World 4 May 7, 2023
High-performance link shortener

shorty High-performance link shortener written in Rust ?? Hosting In addition to being easy to build from source, shorty is available as a Docker imag

Caleb Denio 49 Jan 3, 2023
A high performance TCP SYN port scanner.

Armada A High-Performance TCP SYN scanner What is Armada? Armada is a high performance TCP SYN scanner. This is equivalent to the type of scanning tha

resync 259 Dec 19, 2022
Cloud Native high performance security and privacy SQL proxy.

Fern proxy With the advent of Cloud Native applications, architecture patterns evolved and emerged to take advantage of cloud computing, and build mor

Fern 12 Nov 7, 2022
🚀 10x easier, 🚀 10x cheaper, 🚀 high performance, 🚀 petabyte scale - Elasticsearch/Splunk/Datadog alternative for 🚀 (logs, metrics, traces).

?? 10x easier, ?? 10x cheaper, ?? petabyte scale - Elasticsearch/Splunk/Datadog alternative for ?? (logs, metrics, traces). ZincObserve ZincObserve is

Zinc Labs Inc. 80 Feb 22, 2023
The High Performance Proxy/Load Balancer

Silverwind-The Next Generation High-Performance Proxy English 简体中文 The Silverwind is a high-performance reverse proxy/load balancer. And it could be a

null 112 Apr 7, 2023
A high-performance, lightweight, and cross-platform QUIC library

TQUIC English | 中文 TQUIC is a high-performance, lightweight, and cross-platform library for the IETF QUIC protocol. Advantages High performance: TQUIC

Tencent 11 Oct 27, 2023
A high performance http proxy server & extensions platform & net packet capture tool

CthulhuRs A high performance http proxy server A browser extensions platform A net packet capture tool Demonstration Main features of CthulhuRs Inject

null 5 Apr 30, 2024