Skip to content

Commit 68103f9

Browse files
authored
Split tracer inner segment sender and receiver into traits. (#37)
1 parent 6232828 commit 68103f9

File tree

4 files changed

+153
-33
lines changed

4 files changed

+153
-33
lines changed

src/context/tracer.rs

Lines changed: 136 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,22 @@ use crate::{
1919
context::trace_context::TracingContext, reporter::DynReporter, reporter::Reporter,
2020
skywalking_proto::v3::SegmentObject,
2121
};
22+
use std::error::Error;
2223
use std::future::Future;
2324
use std::pin::Pin;
25+
use std::sync::atomic::{AtomicBool, Ordering};
2426
use std::sync::Weak;
2527
use std::task::{Context, Poll};
2628
use std::{collections::LinkedList, sync::Arc};
2729
use tokio::sync::OnceCell;
28-
use tokio::task::JoinError;
2930
use tokio::{
3031
sync::{
3132
mpsc::{self},
3233
Mutex,
3334
},
3435
task::JoinHandle,
3536
};
37+
use tonic::async_trait;
3638

3739
static GLOBAL_TRACER: OnceCell<Tracer> = OnceCell::const_new();
3840

@@ -65,12 +67,67 @@ pub fn reporting(shutdown_signal: impl Future<Output = ()> + Send + Sync + 'stat
6567
global_tracer().reporting(shutdown_signal)
6668
}
6769

70+
pub trait SegmentSender: Send + Sync + 'static {
71+
fn send(&self, segment: SegmentObject) -> Result<(), Box<dyn Error>>;
72+
}
73+
74+
impl SegmentSender for () {
75+
fn send(&self, _segment: SegmentObject) -> Result<(), Box<dyn Error>> {
76+
Ok(())
77+
}
78+
}
79+
80+
impl SegmentSender for mpsc::UnboundedSender<SegmentObject> {
81+
fn send(&self, segment: SegmentObject) -> Result<(), Box<dyn Error>> {
82+
Ok(self.send(segment)?)
83+
}
84+
}
85+
86+
#[async_trait]
87+
pub trait SegmentReceiver: Send + Sync + 'static {
88+
async fn recv(&self) -> Result<Option<SegmentObject>, Box<dyn Error + Send>>;
89+
90+
async fn try_recv(&self) -> Result<Option<SegmentObject>, Box<dyn Error + Send>>;
91+
}
92+
93+
#[async_trait]
94+
impl SegmentReceiver for () {
95+
async fn recv(&self) -> Result<Option<SegmentObject>, Box<dyn Error + Send>> {
96+
Ok(None)
97+
}
98+
99+
async fn try_recv(&self) -> Result<Option<SegmentObject>, Box<dyn Error + Send>> {
100+
Ok(None)
101+
}
102+
}
103+
104+
#[async_trait]
105+
impl SegmentReceiver for Mutex<mpsc::UnboundedReceiver<SegmentObject>> {
106+
async fn recv(&self) -> Result<Option<SegmentObject>, Box<dyn Error + Send>> {
107+
Ok(self.lock().await.recv().await)
108+
}
109+
110+
async fn try_recv(&self) -> Result<Option<SegmentObject>, Box<dyn Error + Send>> {
111+
use mpsc::error::TryRecvError;
112+
113+
match self.lock().await.try_recv() {
114+
Ok(segment) => Ok(Some(segment)),
115+
Err(e) => match e {
116+
TryRecvError::Empty => Ok(None),
117+
TryRecvError::Disconnected => Err(Box::new(e)),
118+
},
119+
}
120+
}
121+
}
122+
68123
struct Inner {
69124
service_name: String,
70125
instance_name: String,
71-
segment_sender: mpsc::UnboundedSender<SegmentObject>,
72-
segment_receiver: Mutex<mpsc::UnboundedReceiver<SegmentObject>>,
126+
segment_sender: Box<dyn SegmentSender>,
127+
segment_receiver: Box<dyn SegmentReceiver>,
73128
reporter: Box<Mutex<DynReporter>>,
129+
is_reporting: AtomicBool,
130+
is_closed: AtomicBool,
74131
}
75132

76133
/// Skywalking tracer.
@@ -87,14 +144,30 @@ impl Tracer {
87144
reporter: impl Reporter + Send + Sync + 'static,
88145
) -> Self {
89146
let (segment_sender, segment_receiver) = mpsc::unbounded_channel();
147+
Self::new_with_channel(
148+
service_name,
149+
instance_name,
150+
reporter,
151+
(segment_sender, Mutex::new(segment_receiver)),
152+
)
153+
}
90154

155+
/// New with service info, reporter, and custom channel.
156+
pub fn new_with_channel(
157+
service_name: impl ToString,
158+
instance_name: impl ToString,
159+
reporter: impl Reporter + Send + Sync + 'static,
160+
channel: (impl SegmentSender, impl SegmentReceiver),
161+
) -> Self {
91162
Self {
92163
inner: Arc::new(Inner {
93164
service_name: service_name.to_string(),
94165
instance_name: instance_name.to_string(),
95-
segment_sender,
96-
segment_receiver: Mutex::new(segment_receiver),
166+
segment_sender: Box::new(channel.0),
167+
segment_receiver: Box::new(channel.1),
97168
reporter: Box::new(Mutex::new(reporter)),
169+
is_reporting: Default::default(),
170+
is_closed: Default::default(),
98171
}),
99172
}
100173
}
@@ -131,73 +204,97 @@ impl Tracer {
131204

132205
/// Finalize the trace context.
133206
pub(crate) fn finalize_context(&self, context: &mut TracingContext) {
207+
if self.inner.is_closed.load(Ordering::Relaxed) {
208+
tracing::warn!("tracer closed");
209+
return;
210+
}
211+
134212
let segment_object = context.convert_segment_object();
135-
if self.inner.segment_sender.send(segment_object).is_err() {
136-
tracing::error!("segment object channel has closed");
213+
if let Err(err) = self.inner.segment_sender.send(segment_object) {
214+
tracing::error!(?err, "send segment object failed");
137215
}
138216
}
139217

140218
/// Start to reporting, quit when shutdown_signal received.
141219
///
142220
/// Accept a `shutdown_signal` argument as a graceful shutdown signal.
221+
///
222+
/// # Panics
223+
///
224+
/// Panic if call more than once.
143225
pub fn reporting(
144226
&self,
145227
shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
146228
) -> Reporting {
229+
if self.inner.is_reporting.swap(true, Ordering::Relaxed) {
230+
panic!("reporting already called");
231+
}
232+
147233
Reporting {
148234
handle: tokio::spawn(self.clone().do_reporting(shutdown_signal)),
149235
}
150236
}
151237

152-
async fn do_reporting(self, shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static) {
238+
async fn do_reporting(
239+
self,
240+
shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
241+
) -> crate::Result<()> {
153242
let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
154243

155244
let handle = tokio::spawn(async move {
156245
loop {
157-
let mut segment_receiver = self.inner.segment_receiver.lock().await;
158-
let mut segments = LinkedList::new();
159-
160246
tokio::select! {
161-
segment = segment_receiver.recv() => {
162-
drop(segment_receiver);
163-
164-
if let Some(segment) = segment {
165-
// TODO Implement batch collect in future.
166-
segments.push_back(segment);
167-
Self::report_segment_object(&self.inner.reporter, segments).await;
168-
} else {
169-
break;
247+
segment = self.inner.segment_receiver.recv() => {
248+
match segment {
249+
Ok(Some(segment)) => {
250+
// TODO Implement batch collect in future.
251+
let mut segments = LinkedList::new();
252+
segments.push_back(segment);
253+
Self::report_segment_object(&self.inner.reporter, segments).await;
254+
}
255+
Ok(None) => break,
256+
Err(err) => return Err(err.into()),
170257
}
171258
}
172259
_ = shutdown_rx.recv() => break,
173260
}
174261
}
175262

263+
self.inner.is_closed.store(true, Ordering::Relaxed);
264+
176265
// Flush.
177-
let mut segment_receiver = self.inner.segment_receiver.lock().await;
178266
let mut segments = LinkedList::new();
179-
while let Ok(segment) = segment_receiver.try_recv() {
180-
segments.push_back(segment);
267+
loop {
268+
match self.inner.segment_receiver.try_recv().await {
269+
Ok(Some(segment)) => {
270+
segments.push_back(segment);
271+
}
272+
Ok(None) => break,
273+
Err(err) => return Err(err.into()),
274+
}
181275
}
182276
Self::report_segment_object(&self.inner.reporter, segments).await;
277+
278+
Ok::<_, crate::Error>(())
183279
});
184280

185281
shutdown_signal.await;
186282

187283
if shutdown_tx.send(()).is_err() {
188-
tracing::error!("Shutdown signal send failed");
189-
}
190-
if let Err(e) = handle.await {
191-
tracing::error!("Tokio handle join failed: {:?}", e);
284+
tracing::error!("shutdown signal send failed");
192285
}
286+
287+
handle.await??;
288+
289+
Ok(())
193290
}
194291

195292
async fn report_segment_object(
196293
reporter: &Mutex<DynReporter>,
197294
segments: LinkedList<SegmentObject>,
198295
) {
199-
if let Err(e) = reporter.lock().await.collect(segments).await {
200-
tracing::error!("Collect failed: {:?}", e);
296+
if let Err(err) = reporter.lock().await.collect(segments).await {
297+
tracing::error!(?err, "collect failed");
201298
}
202299
}
203300

@@ -221,22 +318,29 @@ impl WeakTracer {
221318

222319
/// Created by [Tracer::reporting].
223320
pub struct Reporting {
224-
handle: JoinHandle<()>,
321+
handle: JoinHandle<crate::Result<()>>,
225322
}
226323

227324
impl Future for Reporting {
228-
type Output = Result<(), JoinError>;
325+
type Output = crate::Result<()>;
229326

230327
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
231-
Pin::new(&mut self.handle).poll(cx)
328+
Pin::new(&mut self.handle).poll(cx).map(|r| r?)
232329
}
233330
}
234331

235332
#[cfg(test)]
236333
mod tests {
237334
use super::*;
335+
use std::future;
238336

239337
trait AssertSend: Send {}
240338

241339
impl AssertSend for Tracer {}
340+
341+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
342+
async fn custom_channel() {
343+
let tracer = Tracer::new_with_channel("service_name", "instance_name", (), ((), ()));
344+
tracer.reporting(future::ready(())).await.unwrap();
345+
}
242346
}

src/error/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,10 @@ pub enum Error {
3333

3434
#[error("tonic status: {0}")]
3535
TonicStatus(#[from] tonic::Status),
36+
37+
#[error("tokio join failed: {0}")]
38+
TokioJoin(#[from] tokio::task::JoinError),
39+
40+
#[error(transparent)]
41+
Other(#[from] Box<dyn std::error::Error + Send + 'static>),
3642
}

src/reporter/log.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl LogReporter {
5454
impl Default for LogReporter {
5555
fn default() -> Self {
5656
Self {
57-
tip: "Collect".to_string(),
57+
tip: "collect".to_string(),
5858
used: Used::Println,
5959
}
6060
}

src/reporter/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,13 @@ pub(crate) type DynReporter = dyn Reporter + Send + Sync + 'static;
2828
pub trait Reporter {
2929
async fn collect(&mut self, segments: LinkedList<SegmentObject>) -> Result<(), Box<dyn Error>>;
3030
}
31+
32+
#[async_trait]
33+
impl Reporter for () {
34+
async fn collect(
35+
&mut self,
36+
_segments: LinkedList<SegmentObject>,
37+
) -> Result<(), Box<dyn Error>> {
38+
Ok(())
39+
}
40+
}

0 commit comments

Comments
 (0)