From b5a15ecd89bc2ea426a190bd4a632cf0fd709856 Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Thu, 17 Jul 2025 17:24:12 +0800 Subject: [PATCH] shim: re-impl Events trait for async RemotePublisher Fixes: https://github.com/containerd/rust-extensions/pull/318/files#r2212454388 Signed-off-by: Tim Zhang --- crates/shim/src/asynchronous/publisher.rs | 30 ++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/crates/shim/src/asynchronous/publisher.rs b/crates/shim/src/asynchronous/publisher.rs index a8909607..16a4d145 100644 --- a/crates/shim/src/asynchronous/publisher.rs +++ b/crates/shim/src/asynchronous/publisher.rs @@ -16,13 +16,14 @@ use std::os::unix::io::RawFd; +use async_trait::async_trait; use containerd_shim_protos::{ - api::Envelope, + api::{Empty, Envelope}, protobuf::MessageDyn, shim::events, - shim_async::{Client, EventsClient}, + shim_async::{Client, Events, EventsClient}, ttrpc, - ttrpc::context::Context, + ttrpc::{context::Context, r#async::TtrpcContext}, }; use log::{debug, error, warn}; use tokio::sync::mpsc; @@ -169,6 +170,29 @@ impl RemotePublisher { } } +#[async_trait] +impl Events for RemotePublisher { + async fn forward( + &self, + _ctx: &TtrpcContext, + req: events::ForwardRequest, + ) -> ttrpc::Result { + let item = Item { + ev: req.envelope().clone(), + ctx: Context::default(), + count: 0, + }; + + //if channel is full and send fail ,release it after 3 seconds + self.sender + .send_timeout(item, tokio::time::Duration::from_secs(3)) + .await + .map_err(|e| error::Error::Ttrpc(ttrpc::error::Error::Others(e.to_string())))?; + + Ok(Empty::default()) + } +} + #[cfg(test)] mod tests { use std::{