From ccc5cfca298f775780af48665a325c39f6ee5786 Mon Sep 17 00:00:00 2001 From: RandyLambert Date: Fri, 8 Jul 2022 18:27:29 +0800 Subject: [PATCH 1/9] [Feat] Change to unix domain socket * Use use tokio::net::UnixListener instead of tokio::net::Unix for unix socket communication Signed-off-by: shouxunsun Co-authored-by: Ti Chi Robot Co-authored-by: Yang Keao Co-authored-by: xixi Signed-off-by: RandyLambert --- .../src/cmd/interactive/handler.rs | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/chaos-tproxy-controller/src/cmd/interactive/handler.rs b/chaos-tproxy-controller/src/cmd/interactive/handler.rs index 202ede2..9c9c527 100644 --- a/chaos-tproxy-controller/src/cmd/interactive/handler.rs +++ b/chaos-tproxy-controller/src/cmd/interactive/handler.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; use anyhow::Error; use futures::TryStreamExt; use http::{Method, Request, Response, StatusCode}; -use hyper::server::conn::{Connection, Http}; +use hyper::server::conn::{Http}; use hyper::service::Service; use hyper::Body; use tokio::select; @@ -16,7 +16,9 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::instrument; -use crate::cmd::interactive::stdio::StdStream; +use tokio::net::{UnixListener}; +#[cfg(unix)] +use std::os::unix::io::{FromRawFd}; use crate::proxy::config::Config; use crate::proxy::exec::Proxy; use crate::raw_config::RawConfig; @@ -43,21 +45,27 @@ impl ConfigServer { pub fn serve_interactive(&mut self) { let mut rx = self.rx.take().unwrap(); let mut service = ConfigService(self.proxy.clone()); + self.task = Some(tokio::spawn(async move { - let rx_mut = &mut rx; + let rx_mut = &mut rx; + let unix_listener = UnixListener::from_std(unsafe {std::os::unix::net::UnixListener::from_raw_fd(3)}).unwrap(); + loop { - let stream = StdStream::default(); - let mut conn = Http::new().serve_connection(stream, &mut service); - let conn_mut = &mut conn; select! { _ = &mut *rx_mut => { tracing::trace!("catch signal in config server."); - Connection::graceful_shutdown(Pin::new(conn_mut)); return Ok(()); }, - ret = &mut *conn_mut => if let Err(e) = ret { - tracing::error!("{}",e); - } + stream = unix_listener.accept() => { + let (stream, _) = stream.unwrap(); + + let http = Http::new(); + let conn = http.serve_connection(stream, &mut service); + if let Err(e) = conn.await { + tracing::error!("{}",e); + return Err(anyhow::anyhow!("{}",e)); + } + }, }; } })); From 4f5d9bb4d892fc81ed132e46ab1216004947bb42 Mon Sep 17 00:00:00 2001 From: RandyLambert Date: Fri, 8 Jul 2022 20:24:29 +0800 Subject: [PATCH 2/9] [Fix] Delete serve_interactive return error * Delete serve_interactive return error Signed-off-by: RandyLambert --- chaos-tproxy-controller/src/cmd/interactive/handler.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/chaos-tproxy-controller/src/cmd/interactive/handler.rs b/chaos-tproxy-controller/src/cmd/interactive/handler.rs index 9c9c527..399deb0 100644 --- a/chaos-tproxy-controller/src/cmd/interactive/handler.rs +++ b/chaos-tproxy-controller/src/cmd/interactive/handler.rs @@ -63,7 +63,6 @@ impl ConfigServer { let conn = http.serve_connection(stream, &mut service); if let Err(e) = conn.await { tracing::error!("{}",e); - return Err(anyhow::anyhow!("{}",e)); } }, }; From 7d4795f3f84cc006ce103c5cde099bfe1e204911 Mon Sep 17 00:00:00 2001 From: RandyLambert Date: Mon, 1 Aug 2022 02:20:58 +0800 Subject: [PATCH 3/9] [Feat] Use unix domain socket with path * use unix domain socket with path Signed-off-by: RandyLambert --- chaos-tproxy-controller/src/cmd/command_line.rs | 4 ++++ chaos-tproxy-controller/src/cmd/interactive/handler.rs | 5 +++-- chaos-tproxy-controller/src/main.rs | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/chaos-tproxy-controller/src/cmd/command_line.rs b/chaos-tproxy-controller/src/cmd/command_line.rs index 975ae94..e8ad089 100644 --- a/chaos-tproxy-controller/src/cmd/command_line.rs +++ b/chaos-tproxy-controller/src/cmd/command_line.rs @@ -33,6 +33,10 @@ pub struct Opt { /// ipc path for sub proxy. #[structopt(long)] pub ipc_path: Option, + + /// ipc path to communicate with chaos-mesh. + #[structopt(short = "u", long = "unix-socket-path", default_value = "/chaos-tproxy.sock")] + pub unix_socket_path: PathBuf, } impl Opt { diff --git a/chaos-tproxy-controller/src/cmd/interactive/handler.rs b/chaos-tproxy-controller/src/cmd/interactive/handler.rs index 399deb0..49cd33a 100644 --- a/chaos-tproxy-controller/src/cmd/interactive/handler.rs +++ b/chaos-tproxy-controller/src/cmd/interactive/handler.rs @@ -15,6 +15,7 @@ use tokio::sync::oneshot::{channel, Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::instrument; +use std::path::PathBuf; use tokio::net::{UnixListener}; #[cfg(unix)] @@ -42,13 +43,13 @@ impl ConfigServer { } } - pub fn serve_interactive(&mut self) { + pub fn serve_interactive(&mut self, unix_socket_path: PathBuf) { let mut rx = self.rx.take().unwrap(); let mut service = ConfigService(self.proxy.clone()); self.task = Some(tokio::spawn(async move { let rx_mut = &mut rx; - let unix_listener = UnixListener::from_std(unsafe {std::os::unix::net::UnixListener::from_raw_fd(3)}).unwrap(); + let unix_listener = UnixListener::bind(unix_socket_path).unwrap(); loop { select! { diff --git a/chaos-tproxy-controller/src/main.rs b/chaos-tproxy-controller/src/main.rs index fa7a832..e244d17 100644 --- a/chaos-tproxy-controller/src/main.rs +++ b/chaos-tproxy-controller/src/main.rs @@ -46,7 +46,7 @@ async fn main() -> anyhow::Result<()> { if opt.interactive { let mut config_server = ConfigServer::new(Proxy::new(opt.verbose).await); - config_server.serve_interactive(); + config_server.serve_interactive(opt.unix_socket_path.clone()); let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?; signals.wait().await?; From 26df9e24c1286db3a79543ef3fe7682ac2f7c5a9 Mon Sep 17 00:00:00 2001 From: RandyLambert Date: Sun, 7 Aug 2022 21:08:00 +0800 Subject: [PATCH 4/9] [Feat] Use unix domain socket with path * use unix domain socket with path Signed-off-by: RandyLambert Co-authored-by: Ti Chi Robot --- chaos-tproxy-controller/src/cmd/command_line.rs | 10 +++------- .../src/cmd/interactive/handler.rs | 6 +++--- chaos-tproxy-controller/src/main.rs | 9 +++++---- chaos-tproxy-proxy/src/lib.rs | 2 +- chaos-tproxy-proxy/src/signal.rs | 15 ++++++++++++--- 5 files changed, 24 insertions(+), 18 deletions(-) diff --git a/chaos-tproxy-controller/src/cmd/command_line.rs b/chaos-tproxy-controller/src/cmd/command_line.rs index e8ad089..b36058d 100644 --- a/chaos-tproxy-controller/src/cmd/command_line.rs +++ b/chaos-tproxy-controller/src/cmd/command_line.rs @@ -17,10 +17,6 @@ pub struct Opt { #[structopt(name = "FILE", parse(from_os_str))] pub input: Option, - /// Allows applying json config by stdin/stdout - #[structopt(short, long)] - pub interactive: bool, - // The number of occurrences of the `v/verbose` flag /// Verbose mode (-v, -vv, -vvv, etc.) #[structopt(short, long, parse(from_occurrences))] @@ -35,8 +31,8 @@ pub struct Opt { pub ipc_path: Option, /// ipc path to communicate with chaos-mesh. - #[structopt(short = "u", long = "unix-socket-path", default_value = "/chaos-tproxy.sock")] - pub unix_socket_path: PathBuf, + #[structopt(long = "interactive-path")] + pub interactive_path: Option, } impl Opt { @@ -54,7 +50,7 @@ impl Opt { } fn checked(self) -> Result { - if !self.interactive && !self.proxy && self.input.is_none() { + if !self.interactive_path.is_none() && !self.proxy && self.input.is_none() { return Err(anyhow!("config file is required when interactive mode and daemon mode is all disabled, use `-h | --help` for more details")); } Ok(self) diff --git a/chaos-tproxy-controller/src/cmd/interactive/handler.rs b/chaos-tproxy-controller/src/cmd/interactive/handler.rs index 49cd33a..841af16 100644 --- a/chaos-tproxy-controller/src/cmd/interactive/handler.rs +++ b/chaos-tproxy-controller/src/cmd/interactive/handler.rs @@ -19,7 +19,6 @@ use std::path::PathBuf; use tokio::net::{UnixListener}; #[cfg(unix)] -use std::os::unix::io::{FromRawFd}; use crate::proxy::config::Config; use crate::proxy::exec::Proxy; use crate::raw_config::RawConfig; @@ -43,13 +42,14 @@ impl ConfigServer { } } - pub fn serve_interactive(&mut self, unix_socket_path: PathBuf) { + pub fn serve_interactive(&mut self, interactive_path: PathBuf) { let mut rx = self.rx.take().unwrap(); let mut service = ConfigService(self.proxy.clone()); self.task = Some(tokio::spawn(async move { let rx_mut = &mut rx; - let unix_listener = UnixListener::bind(unix_socket_path).unwrap(); + tracing::info!("ConfigServer listener try binding {:?}", interactive_path); + let unix_listener = UnixListener::bind(interactive_path).unwrap(); loop { select! { diff --git a/chaos-tproxy-controller/src/main.rs b/chaos-tproxy-controller/src/main.rs index e244d17..9b2b750 100644 --- a/chaos-tproxy-controller/src/main.rs +++ b/chaos-tproxy-controller/src/main.rs @@ -6,6 +6,7 @@ use tokio::signal::unix::SignalKind; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{fmt, EnvFilter}; +use std::path::PathBuf; use crate::cmd::command_line::{get_config_from_opt, Opt}; use crate::cmd::interactive::handler::ConfigServer; @@ -38,17 +39,17 @@ async fn main() -> anyhow::Result<()> { let cfg = get_config_from_opt(&opt).await?; let mut proxy = Proxy::new(opt.verbose).await; proxy.reload(cfg.proxy_config).await?; - let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?; + let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()], PathBuf::new())?; signals.wait().await?; proxy.stop().await?; return Ok(()); } - if opt.interactive { + if opt.interactive_path.is_some() { let mut config_server = ConfigServer::new(Proxy::new(opt.verbose).await); - config_server.serve_interactive(opt.unix_socket_path.clone()); + config_server.serve_interactive(opt.interactive_path.clone().unwrap()); - let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?; + let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()], opt.interactive_path.clone().unwrap())?; signals.wait().await?; config_server.stop().await?; diff --git a/chaos-tproxy-proxy/src/lib.rs b/chaos-tproxy-proxy/src/lib.rs index 2e5f6c4..9e5adc6 100644 --- a/chaos-tproxy-proxy/src/lib.rs +++ b/chaos-tproxy-proxy/src/lib.rs @@ -29,7 +29,7 @@ pub async fn proxy_main(path: PathBuf) -> anyhow::Result<()> { server.serve(rx).await.unwrap(); }); - let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?; + let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()], PathBuf::new())?; signals.wait().await?; let _ = sender.send(()); diff --git a/chaos-tproxy-proxy/src/signal.rs b/chaos-tproxy-proxy/src/signal.rs index f353964..6ae9d0e 100644 --- a/chaos-tproxy-proxy/src/signal.rs +++ b/chaos-tproxy-proxy/src/signal.rs @@ -1,21 +1,30 @@ use futures::future::select_all; use tokio::signal::unix::{signal, Signal, SignalKind}; +use std::path::PathBuf; -pub struct Signals(Vec); +pub struct Signals +{ + pub signals :Vec, + interactive_path: PathBuf, +} impl Signals { pub fn from_kinds<'a>( kinds: impl 'a + IntoIterator, + interactive_path: PathBuf, ) -> anyhow::Result { let signals = kinds .into_iter() .map(|kind| signal(*kind)) .collect::, _>>()?; - Ok(Self(signals)) + Ok(Signals{signals, interactive_path}) } pub async fn wait(&mut self) -> anyhow::Result<()> { - select_all(self.0.iter_mut().map(|sig| Box::pin(sig.recv()))).await; + select_all(self.signals.iter_mut().map(|sig| Box::pin(sig.recv()))).await; + if self.interactive_path != PathBuf::new() { + std::fs::remove_file(self.interactive_path.clone()).unwrap(); + } Ok(()) } } From f0c54342d724ae6a8f44cab6629930c30c6242ac Mon Sep 17 00:00:00 2001 From: RandyLambert Date: Sun, 14 Aug 2022 23:22:21 +0800 Subject: [PATCH 5/9] [Feat] Use unix domain socket with path * use unix domain socket with path Signed-off-by: RandyLambert Co-authored-by: Ti Chi Robot --- chaos-tproxy-controller/src/cmd/command_line.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chaos-tproxy-controller/src/cmd/command_line.rs b/chaos-tproxy-controller/src/cmd/command_line.rs index b36058d..15f9eef 100644 --- a/chaos-tproxy-controller/src/cmd/command_line.rs +++ b/chaos-tproxy-controller/src/cmd/command_line.rs @@ -50,7 +50,7 @@ impl Opt { } fn checked(self) -> Result { - if !self.interactive_path.is_none() && !self.proxy && self.input.is_none() { + if self.interactive_path.is_none() && !self.proxy && self.input.is_none() { return Err(anyhow!("config file is required when interactive mode and daemon mode is all disabled, use `-h | --help` for more details")); } Ok(self) From 492d386c83ab11dd96804a9dc5d0619e2e03c9dd Mon Sep 17 00:00:00 2001 From: RandyLambert Date: Mon, 15 Aug 2022 00:55:21 +0800 Subject: [PATCH 6/9] [Feat] Use unix domain socket with path * use unix domain socket with path Signed-off-by: RandyLambert Co-authored-by: Ti Chi Robot --- chaos-tproxy-proxy/src/signal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chaos-tproxy-proxy/src/signal.rs b/chaos-tproxy-proxy/src/signal.rs index 6ae9d0e..29872f4 100644 --- a/chaos-tproxy-proxy/src/signal.rs +++ b/chaos-tproxy-proxy/src/signal.rs @@ -23,7 +23,7 @@ impl Signals { pub async fn wait(&mut self) -> anyhow::Result<()> { select_all(self.signals.iter_mut().map(|sig| Box::pin(sig.recv()))).await; if self.interactive_path != PathBuf::new() { - std::fs::remove_file(self.interactive_path.clone()).unwrap(); + std::fs::remove_file(self.interactive_path.clone())?; } Ok(()) } From 6cde853e923fdb78135cadce5b326e6952f84784 Mon Sep 17 00:00:00 2001 From: RandyLambert Date: Wed, 31 Aug 2022 02:18:39 +0800 Subject: [PATCH 7/9] [Feat] Use unix domain socket with path * use unix domain socket with path Signed-off-by: RandyLambert --- chaos-tproxy-controller/src/main.rs | 10 ++++++---- chaos-tproxy-proxy/src/lib.rs | 2 +- chaos-tproxy-proxy/src/signal.rs | 15 +++------------ 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/chaos-tproxy-controller/src/main.rs b/chaos-tproxy-controller/src/main.rs index 9b2b750..c557d9e 100644 --- a/chaos-tproxy-controller/src/main.rs +++ b/chaos-tproxy-controller/src/main.rs @@ -6,7 +6,6 @@ use tokio::signal::unix::SignalKind; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{fmt, EnvFilter}; -use std::path::PathBuf; use crate::cmd::command_line::{get_config_from_opt, Opt}; use crate::cmd::interactive::handler::ConfigServer; @@ -39,7 +38,7 @@ async fn main() -> anyhow::Result<()> { let cfg = get_config_from_opt(&opt).await?; let mut proxy = Proxy::new(opt.verbose).await; proxy.reload(cfg.proxy_config).await?; - let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()], PathBuf::new())?; + let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?; signals.wait().await?; proxy.stop().await?; return Ok(()); @@ -49,11 +48,14 @@ async fn main() -> anyhow::Result<()> { let mut config_server = ConfigServer::new(Proxy::new(opt.verbose).await); config_server.serve_interactive(opt.interactive_path.clone().unwrap()); - let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()], opt.interactive_path.clone().unwrap())?; + let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?; signals.wait().await?; + // Currently we cannot graceful shutdown the config server. config_server.stop().await?; - // Currently we cannot graceful shutdown the config server. + // delete the unix socket file + std::fs::remove_file(opt.interactive_path.clone().unwrap())?; + exit(0); } Ok(()) diff --git a/chaos-tproxy-proxy/src/lib.rs b/chaos-tproxy-proxy/src/lib.rs index 9e5adc6..2e5f6c4 100644 --- a/chaos-tproxy-proxy/src/lib.rs +++ b/chaos-tproxy-proxy/src/lib.rs @@ -29,7 +29,7 @@ pub async fn proxy_main(path: PathBuf) -> anyhow::Result<()> { server.serve(rx).await.unwrap(); }); - let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()], PathBuf::new())?; + let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?; signals.wait().await?; let _ = sender.send(()); diff --git a/chaos-tproxy-proxy/src/signal.rs b/chaos-tproxy-proxy/src/signal.rs index 29872f4..f353964 100644 --- a/chaos-tproxy-proxy/src/signal.rs +++ b/chaos-tproxy-proxy/src/signal.rs @@ -1,30 +1,21 @@ use futures::future::select_all; use tokio::signal::unix::{signal, Signal, SignalKind}; -use std::path::PathBuf; -pub struct Signals -{ - pub signals :Vec, - interactive_path: PathBuf, -} +pub struct Signals(Vec); impl Signals { pub fn from_kinds<'a>( kinds: impl 'a + IntoIterator, - interactive_path: PathBuf, ) -> anyhow::Result { let signals = kinds .into_iter() .map(|kind| signal(*kind)) .collect::, _>>()?; - Ok(Signals{signals, interactive_path}) + Ok(Self(signals)) } pub async fn wait(&mut self) -> anyhow::Result<()> { - select_all(self.signals.iter_mut().map(|sig| Box::pin(sig.recv()))).await; - if self.interactive_path != PathBuf::new() { - std::fs::remove_file(self.interactive_path.clone())?; - } + select_all(self.0.iter_mut().map(|sig| Box::pin(sig.recv()))).await; Ok(()) } } From 8321a2e1645a25b51f74420545e5733afe44153e Mon Sep 17 00:00:00 2001 From: RandyLambert Date: Sun, 11 Sep 2022 21:27:29 +0800 Subject: [PATCH 8/9] [Feat] Use unix domain socket with path * use unix domain socket with path Signed-off-by: RandyLambert --- chaos-tproxy-controller/src/cmd/interactive/handler.rs | 10 +++++----- chaos-tproxy-controller/src/main.rs | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/chaos-tproxy-controller/src/cmd/interactive/handler.rs b/chaos-tproxy-controller/src/cmd/interactive/handler.rs index 841af16..9f22187 100644 --- a/chaos-tproxy-controller/src/cmd/interactive/handler.rs +++ b/chaos-tproxy-controller/src/cmd/interactive/handler.rs @@ -1,5 +1,6 @@ use std::convert::TryInto; use std::future::Future; +use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -7,17 +8,16 @@ use std::task::{Context, Poll}; use anyhow::Error; use futures::TryStreamExt; use http::{Method, Request, Response, StatusCode}; -use hyper::server::conn::{Http}; +use hyper::server::conn::Http; use hyper::service::Service; use hyper::Body; +use tokio::net::UnixListener; use tokio::select; use tokio::sync::oneshot::{channel, Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::instrument; -use std::path::PathBuf; -use tokio::net::{UnixListener}; #[cfg(unix)] use crate::proxy::config::Config; use crate::proxy::exec::Proxy; @@ -42,12 +42,12 @@ impl ConfigServer { } } - pub fn serve_interactive(&mut self, interactive_path: PathBuf) { + pub fn serve_interactive(&mut self, interactive_path: PathBuf) { let mut rx = self.rx.take().unwrap(); let mut service = ConfigService(self.proxy.clone()); self.task = Some(tokio::spawn(async move { - let rx_mut = &mut rx; + let rx_mut = &mut rx; tracing::info!("ConfigServer listener try binding {:?}", interactive_path); let unix_listener = UnixListener::bind(interactive_path).unwrap(); diff --git a/chaos-tproxy-controller/src/main.rs b/chaos-tproxy-controller/src/main.rs index c557d9e..09e5e20 100644 --- a/chaos-tproxy-controller/src/main.rs +++ b/chaos-tproxy-controller/src/main.rs @@ -44,9 +44,9 @@ async fn main() -> anyhow::Result<()> { return Ok(()); } - if opt.interactive_path.is_some() { + if let Some(path) = opt.interactive_path { let mut config_server = ConfigServer::new(Proxy::new(opt.verbose).await); - config_server.serve_interactive(opt.interactive_path.clone().unwrap()); + config_server.serve_interactive(path.clone()); let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?; signals.wait().await?; @@ -54,7 +54,7 @@ async fn main() -> anyhow::Result<()> { config_server.stop().await?; // delete the unix socket file - std::fs::remove_file(opt.interactive_path.clone().unwrap())?; + std::fs::remove_file(path.clone())?; exit(0); } From 22bc2ce3b9be1151a4c329efbcf73b5091271bee Mon Sep 17 00:00:00 2001 From: RandyLambert Date: Sat, 17 Sep 2022 18:09:11 +0800 Subject: [PATCH 9/9] [Feat] Use unix domain socket with path * use unix domain socket with path Signed-off-by: RandyLambert --- .../src/cmd/interactive/handler.rs | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/chaos-tproxy-controller/src/cmd/interactive/handler.rs b/chaos-tproxy-controller/src/cmd/interactive/handler.rs index 9f22187..9f71d10 100644 --- a/chaos-tproxy-controller/src/cmd/interactive/handler.rs +++ b/chaos-tproxy-controller/src/cmd/interactive/handler.rs @@ -44,27 +44,29 @@ impl ConfigServer { pub fn serve_interactive(&mut self, interactive_path: PathBuf) { let mut rx = self.rx.take().unwrap(); - let mut service = ConfigService(self.proxy.clone()); - - self.task = Some(tokio::spawn(async move { + let proxy = self.proxy.clone(); + self.task = Some(tokio::task::spawn(async move { let rx_mut = &mut rx; tracing::info!("ConfigServer listener try binding {:?}", interactive_path); let unix_listener = UnixListener::bind(interactive_path).unwrap(); loop { + let mut service = ConfigService(proxy.clone()); select! { _ = &mut *rx_mut => { tracing::trace!("catch signal in config server."); return Ok(()); }, stream = unix_listener.accept() => { - let (stream, _) = stream.unwrap(); - - let http = Http::new(); - let conn = http.serve_connection(stream, &mut service); - if let Err(e) = conn.await { - tracing::error!("{}",e); - } + tokio::task::spawn(async move { + let (stream, _) = stream.unwrap(); + + let http = Http::new(); + let conn = http.serve_connection(stream, &mut service); + if let Err(e) = conn.await { + tracing::error!("{}",e); + } + }); }, }; }