I want to wrap Bootstrapper
in a async trait, which is consistent of RefCell<Option<RawSender>>
and is not thread safe.
Here is a complete demo, TaskService
is an asynchronous trait of Task
abstraction. IpcSender
is wrap of Bootstrapper
, and IpcReceiver
is wrap of Receiver<Task>
. Both of them implement TaskService
to send or receive ipc request.
use std::{env, process};
use tokio_unix_ipc::{Bootstrapper, channel, Receiver, Sender};
use serde::Serialize;
use serde::Deserialize;
const ENV_VAR: &str = "PROC_CONNECT_TO";
#[derive(Serialize, Deserialize)]
pub enum Task {
Sum(Vec<i64>, Sender<i64>),
Shutdown
}
#[async_trait::async_trait]
pub trait TaskService {
async fn sum(&self, vec: Vec<i64>) -> i64;
async fn shutdown(&self);
}
#[tokio::main]
async fn main() {
if let Ok(path) = env::var(ENV_VAR) {
let server = IpcReceiver::new(path).await;
server.start().await;
} else {
let client = IpcSender::new();
let mut child = process::Command::new(env::current_exe().unwrap())
.env(ENV_VAR, client.bootstrap.path())
.spawn()
.unwrap();
let sum = client.sum(vec![1, 2, 3]).await;
println!("sum = {}", sum);
child.kill().ok();
child.wait().ok();
}
}
/// ipc_receiver
pub struct IpcReceiver {
pub recv: Receiver<Task>,
}
impl IpcReceiver {
pub async fn new(path: String) -> Self {
let receiver = Receiver::<Task>::connect(path).await.unwrap();
IpcReceiver {
recv: receiver,
}
}
pub async fn start(&self) {
loop {
let task = self.recv.recv().await.unwrap();
match task {
Task::Sum(vec, tx) => {
let sum = self.sum(vec).await;
tx.send(sum).await.unwrap();
},
Task::Shutdown => {
self.shutdown().await;
break
}
}
}
}
}
#[async_trait::async_trait]
impl TaskService for IpcReceiver {
async fn sum(&self, vec: Vec<i64>) -> i64 {
vec.into_iter().sum::<i64>()
}
async fn shutdown(&self) {
}
}
/// ipc_sender
pub struct IpcSender {
pub bootstrap: Bootstrapper,
}
impl IpcSender {
pub fn new() -> Self {
let bootstrapper = Bootstrapper::new().unwrap();
IpcSender {
bootstrap: bootstrapper
}
}
}
#[async_trait::async_trait]
impl TaskService for IpcSender {
async fn sum(&self, vec: Vec<i64>) -> i64 {
let (tx, rx) = channel().unwrap();
self.bootstrap.send(Task::Sum(vec, tx));
rx.recv().await.unwrap()
}
async fn shutdown(&self) {
self.bootstrap.send(Task::Shutdown).await.unwrap()
}
}
I wonder if Bootstrapper#sender
can change to Mutex
. Maybe I can modify it later.