Introduction
Retty is a High performance I/O framework written by Rust inspired by Netty
基于mio的IO多路复用高并发、高性能网络通信开发框架
Feature
- Rayon 线程池包装 EventLoop / EventLoopGroup
- IO多路复用模型
- 内置Bytebuf数据容器
- ChannelPipeline 模型
- 默认支持TCP 未来支持UDP
还没写完,刚实现了一部分功能。 我会努力的。。。。。。
- 2022-1-28 : 完成出入站handler分离
Channel_Handler_Context 分离
Channel_Handler_Context_Pipeline 分离
包装TCPStream 和 Channel
// todo: implement
- 内置固定消息长度字段解码器
- 内置HTTP 协议解码器
- 内置WebSocket 协议解码器
- 内置flatBuffer 解码器
- 内置protoBuffer 解码器
Quick Start
use std::any::Any; use std::io::ErrorKind; use std::sync::{Arc, Mutex}; use std::thread; use bytebuf_rs::bytebuf::ByteBuf; use crossbeam::sync::WaitGroup; use rayon_core::ThreadPool; use uuid::Uuid; use retty::core::bootstrap::Bootstrap; use retty::core::eventloop::EventLoopGroup; use retty::errors::RettyErrorKind; use retty::handler::channel_handler_ctx::{ChannelInboundHandlerCtx, ChannelOutboundHandlerCtx}; use retty::handler::codec::first_integer_length_field_decoder::FirstIntegerLengthFieldDecoder; use retty::handler::handler::{ChannelInboundHandler, ChannelOutboundHandler}; use retty::handler::handler_pipe::{ChannelInboundHandlerPipe, ChannelOutboundHandlerPipe}; struct BizHandler { excutor: Arc<ThreadPool>, } impl BizHandler { fn new() -> Self { BizHandler { excutor: Arc::new(rayon_core::ThreadPoolBuilder::new().num_threads(1).build().unwrap()) } } } impl ChannelInboundHandler for BizHandler { fn id(&self) -> String { return "biz_handler".to_string(); } fn channel_active(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) { let addr = channel_handler_ctx.channel().remote_addr().unwrap(); println!("业务处理 Handler --> : channel_active 新连接上线: {}", addr); channel_handler_ctx.write_and_flush(&mut format!("::: 欢迎你:==>{}", addr)); let attr = channel_handler_ctx.channel().get_attribute("User".to_string()); let attr = attr.lock().unwrap(); let attr = attr.downcast_ref::<String>().unwrap(); println!("========================================================:att:::: {}", attr); } fn channel_inactive(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) { println!("is_active:{}", channel_handler_ctx.channel().is_active()); println!("远端断开连接: Inactive: channel_id : {}", channel_handler_ctx.channel().id()) } fn channel_read(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, message: &mut dyn Any) { let msg = message.downcast_ref::<String>().unwrap(); println!("业务处理 Handler --> :收到消息:{}", msg); println!("reactor-excutor :{}", thread::current().name().unwrap()); channel_handler_ctx.write_and_flush(&mut format!("::: I Love You !!!! :==>{}", msg)); let attr = channel_handler_ctx.channel().get_attribute("User".to_string()); let attr = attr.lock().unwrap(); let attr = attr.downcast_ref::<String>().unwrap(); println!("========================================================:att:::: {}", attr); } fn channel_exception(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, error: RettyErrorKind) { channel_handler_ctx.fire_channel_exception(error); } } struct Decoder { excutor: Arc<ThreadPool>, } impl ChannelInboundHandler for Decoder { fn id(&self) -> String { return "decoder_handler".to_string(); } fn channel_active(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) { // set attribute channel_handler_ctx.channel().set_attribute("User".to_string(), Box::new("lgphp".to_string())); println!("解码 Handler --> : channel_active 新连接上线: {}", channel_handler_ctx.channel().remote_addr().unwrap()); channel_handler_ctx.fire_channel_active(); } fn channel_inactive(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) { channel_handler_ctx.fire_channel_inactive() } fn channel_read(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, message: &mut dyn Any) { let mut buf = message.downcast_mut::().unwrap(); println!("解码 Handler --> 收到Bytebuf:"); // 解码 let _pkt_len = buf.read_u32_be(); let _ver = buf.read_u32_be(); let mut msg = buf.read_string_with_u8_be_len(); channel_handler_ctx.fire_channel_read(&mut msg); } fn channel_exception(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, error: RettyErrorKind) { channel_handler_ctx.fire_channel_exception(error); } } impl Decoder { fn new() -> Self { Decoder { excutor: Arc::new(rayon_core::ThreadPoolBuilder::new().num_threads(1).build().unwrap()) } } } /// /// 入站异常handler 通常在最后一个 /// struct InboundExceptionHandler {} impl ChannelInboundHandler for InboundExceptionHandler { fn id(&self) -> String { String::from("InboundExceptionHandler") } fn channel_active(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) {} fn channel_inactive(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx) {} fn channel_read(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, message: &mut dyn Any) {} fn channel_exception(&mut self, channel_handler_ctx: &mut ChannelInboundHandlerCtx, error: RettyErrorKind) { let mut ch = channel_handler_ctx.channel(); // 处理 ReadIdleTimeout if error.kind == ErrorKind::TimedOut { println!("channel_id:{} 在 {}", ch.id(), format!("{} ms 没有读到数据! , error_message:{}", ch.read_idle_timeout_ms(), error.message)); ch.close() } } } impl InboundExceptionHandler { fn new() -> Self { InboundExceptionHandler {} } } struct Encoder { excutor: Arc<ThreadPool>, } impl ChannelOutboundHandler for Encoder { fn id(&self) -> String { return "encoder_handler".to_string(); } fn channel_write(&mut self, channel_handler_ctx: &mut ChannelOutboundHandlerCtx, message: &mut dyn Any) { let msg = message.downcast_ref::<String>().unwrap(); println!("回执消息,编码器 :====>Encoder Handler:{}", msg); let mut buf = ByteBuf::new_with_capacity(0); let re = format!("回执消息,编码器 :====>Encoder Handler:{}", msg); buf.write_u32_be((1 + re.as_bytes().len()) as u32); buf.write_string_with_u8_be_len(re); channel_handler_ctx.fire_channel_write(&mut buf); } } impl Encoder { fn new() -> Self { Encoder { excutor: Arc::new(rayon_core::ThreadPoolBuilder::new().num_threads(1).build().unwrap()) } } } fn main() { let mut bootstrap = Bootstrap::new_server_bootstrap(); bootstrap.worker_group(8) .bind("0.0.0.0", 1512) .opt_ttl_ms(1000) .opt_keep_alive_ms(30000) .opt_nodelay(false) .opt_send_buf_size(65535) .opt_recv_buf_size(65535) .opt_read_idle_timeout_ms(3000) .initialize_inbound_handler_pipeline(|| { let mut handler_pipe = ChannelInboundHandlerPipe::new(); let decoder_handler = Box::new(Decoder::new()); let biz_handler = Box::new(BizHandler::new()); let excetion_handler = Box::new(InboundExceptionHandler::new()); handler_pipe.add_last(Box::new(FirstIntegerLengthFieldDecoder::new())); handler_pipe.add_last(decoder_handler); handler_pipe.add_last(biz_handler); handler_pipe.add_last(excetion_handler); handler_pipe }) .initialize_outbound_handler_pipeline(|| { let mut handler_pipe = ChannelOutboundHandlerPipe::new(); let encoder_handler = Box::new(Encoder::new()); handler_pipe.add_last(encoder_handler); handler_pipe }).start(); // use default_event_loop let mut new_default_event_loop_group = EventLoopGroup::new_default_event_loop_group(9); new_default_event_loop_group.execute(|| { println!(" default_event_loop execute Task ..... is here") }); WaitGroup::new().clone().wait(); }