From d8317eac03f6136c6b43f8713a2fac01cefeb0f2 Mon Sep 17 00:00:00 2001 From: jmjoy <918734043@qq.com> Date: Sun, 17 Jul 2022 15:44:04 +0800 Subject: [PATCH 1/4] Add context capture and continued methods. --- Cargo.toml | 3 +- examples/simple_trace_report.rs | 2 +- src/context/span.rs | 6 ++- src/context/trace_context.rs | 73 ++++++++++++++++++++++++++------- src/reporter/log.rs | 1 - tests/trace_context.rs | 63 +++++++++++++++++++++++++--- 6 files changed, 123 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5ed4d53..b1c1476 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,7 @@ futures-util = "0.3.21" prost = "0.10.4" prost-derive = "0.10.1" thiserror = "1.0.31" -tokio = { version = "1.18.2", features = ["full"] } +tokio = { version = "1.18.2", features = ["parking_lot"] } tonic = { version = "0.7.2", features = ["codegen"] } tracing = "0.1.35" uuid = { version = "1.1.0", features = ["serde", "v4"] } @@ -54,6 +54,7 @@ uuid = { version = "1.1.0", features = ["serde", "v4"] } tonic-build = "0.7.2" [dev-dependencies] +tokio = { version = "1.18.2", features = ["rt-multi-thread"] } tokio-stream = { version = "0.1.8", features = ["net"] } [[test]] diff --git a/examples/simple_trace_report.rs b/examples/simple_trace_report.rs index bad1fff..f7f321b 100644 --- a/examples/simple_trace_report.rs +++ b/examples/simple_trace_report.rs @@ -26,7 +26,7 @@ async fn handle_request(tracer: Tracer) { { // Generate an Entry Span when a request is received. // An Entry Span is generated only once per context. - // You should assign a variable name to guard the span not be dropped immediately. + // Assign a variable name to guard the span not to be dropped immediately. let _span = ctx.create_entry_span("op1"); // Something... diff --git a/src/context/span.rs b/src/context/span.rs index 6d0bf21..c6c5e18 100644 --- a/src/context/span.rs +++ b/src/context/span.rs @@ -37,7 +37,7 @@ use super::{ /// { /// // Generate an Entry Span when a request is received. /// // An Entry Span is generated only once per context. -/// // You should assign a variable name to guard the span not be dropped immediately. +/// // Assign a variable name to guard the span not to be dropped immediately. /// let _span = ctx.create_entry_span("op1"); /// /// // Something... @@ -57,7 +57,7 @@ use super::{ /// // Auto report ctx when dropped. /// } /// ``` -#[derive(Clone)] +#[must_use = "assign a variable name to guard the span not be dropped immediately."] pub struct Span { index: usize, context: WeakTracingContext, @@ -118,6 +118,8 @@ impl Span { self.context.upgrade().expect("Context has dropped") } + // Notice: Perhaps in the future, `RwLock` can be used instead of `Mutex`, so `with_*` can be nested. + // (Although I can't find the meaning of such use at present.) pub fn with_span_object(&self, f: impl FnOnce(&SpanObject) -> T) -> T { self.upgrade_context() .with_active_span_stack(|stack| f(&stack[self.index])) diff --git a/src/context/trace_context.rs b/src/context/trace_context.rs index 014c75b..e56d8a9 100644 --- a/src/context/trace_context.rs +++ b/src/context/trace_context.rs @@ -33,12 +33,12 @@ use std::{ mem::take, sync::{ atomic::{AtomicI32, Ordering}, - Arc, Mutex, Weak, + Arc, Mutex, RwLock, Weak, }, }; struct Inner { - trace_id: String, + trace_id: RwLock, trace_segment_id: String, service: String, service_instance: String, @@ -49,8 +49,7 @@ struct Inner { primary_endpoint_name: Mutex, } -#[derive(Clone)] -#[must_use = "You should call `create_entry_span` after `TracingContext` created."] +#[must_use = "call `create_entry_span` after `TracingContext` created."] pub struct TracingContext { inner: Arc, tracer: WeakTracer, @@ -79,7 +78,7 @@ impl TracingContext { ) -> Self { TracingContext { inner: Arc::new(Inner { - trace_id: RandomGenerator::generate(), + trace_id: RwLock::new(RandomGenerator::generate()), trace_segment_id: RandomGenerator::generate(), service: service_name.to_string(), service_instance: instance_name.to_string(), @@ -104,7 +103,7 @@ impl TracingContext { ) -> Self { TracingContext { inner: Arc::new(Inner { - trace_id: context.parent_trace_id.clone(), + trace_id: RwLock::new(context.parent_trace_id.clone()), trace_segment_id: RandomGenerator::generate(), service: service_name.to_string(), service_instance: instance_name.to_string(), @@ -119,8 +118,16 @@ impl TracingContext { } #[inline] - pub fn trace_id(&self) -> &str { - &self.inner.trace_id + pub fn trace_id(&self) -> String { + self.with_trace_id(ToString::to_string) + } + + fn with_trace_id(&self, f: impl FnOnce(&String) -> T) -> T { + f(&*self.inner.trace_id.try_read().expect(LOCK_MSG)) + } + + fn with_trace_id_mut(&mut self, f: impl FnOnce(&mut String) -> T) -> T { + f(&mut *self.inner.trace_id.try_write().expect(LOCK_MSG)) } #[inline] @@ -182,8 +189,13 @@ impl TracingContext { self.with_active_span_stack(|stack| stack.last().map(f)) } - // TODO Using for capture and continued. - #[allow(dead_code)] + pub(crate) fn with_active_span_mut( + &mut self, + f: impl FnOnce(&mut SpanObject) -> T, + ) -> Option { + self.with_active_span_stack_mut(|stack| stack.last_mut().map(f)) + } + fn with_primary_endpoint_name(&self, f: impl FnOnce(&String) -> T) -> T { f(&*self.inner.primary_endpoint_name.try_lock().expect(LOCK_MSG)) } @@ -209,7 +221,7 @@ impl TracingContext { if let Some(segment_link) = &self.inner.segment_link { span.refs.push(SegmentReference { ref_type: RefType::CrossProcess as i32, - trace_id: self.inner.trace_id.clone(), + trace_id: self.trace_id(), parent_trace_segment_id: segment_link.parent_trace_segment_id.clone(), parent_span_id: segment_link.parent_span_id, parent_service: segment_link.parent_service.clone(), @@ -273,6 +285,40 @@ impl TracingContext { Span::new(index, self.downgrade()) } + /// Capture a snapshot for cross-thread propagation. + pub fn capture(&self) -> ContextSnapshot { + ContextSnapshot { + trace_id: self.trace_id(), + trace_segment_id: self.trace_segment_id().to_owned(), + span_id: self.peek_active_span_id().unwrap_or(-1), + parent_endpoint: self.with_primary_endpoint_name(Clone::clone), + } + } + + /// Build the reference between this segment and a cross-thread segment. + pub fn continued(&mut self, snapshot: ContextSnapshot) { + if snapshot.is_valid() { + self.with_trace_id_mut(|trace_id| *trace_id = snapshot.trace_id.clone()); + + let tracer = self.upgrade_tracer(); + + let segment_ref = SegmentReference { + ref_type: RefType::CrossThread as i32, + trace_id: snapshot.trace_id, + parent_trace_segment_id: snapshot.trace_segment_id, + parent_span_id: snapshot.span_id, + parent_service: tracer.service_name().to_owned(), + parent_service_instance: tracer.instance_name().to_owned(), + parent_endpoint: snapshot.parent_endpoint, + network_address_used_at_peer: Default::default(), + }; + + self.with_active_span_mut(|span| { + span.refs.push(segment_ref); + }); + } + } + /// Close span. We can't use closed span after finalize called. pub(crate) fn finalize_span(&mut self, index: usize) -> Result<(), ()> { let span = self.pop_active_span(index); @@ -290,7 +336,7 @@ impl TracingContext { /// /// Notice: The spans will taked, so this method shouldn't be called twice. pub(crate) fn convert_segment_object(&mut self) -> SegmentObject { - let trace_id = self.trace_id().to_owned(); + let trace_id = self.trace_id(); let trace_segment_id = self.trace_segment_id().to_owned(); let service = self.service().to_owned(); let service_instance = self.service_instance().to_owned(); @@ -368,12 +414,11 @@ impl WeakTracingContext { } } +#[derive(Debug)] pub struct ContextSnapshot { trace_id: String, trace_segment_id: String, span_id: i32, - // TODO Using for capture and continued. - #[allow(dead_code)] parent_endpoint: String, } diff --git a/src/reporter/log.rs b/src/reporter/log.rs index b08ef3c..39fe7e0 100644 --- a/src/reporter/log.rs +++ b/src/reporter/log.rs @@ -16,7 +16,6 @@ use super::Reporter; use crate::skywalking_proto::v3::SegmentObject; - use std::collections::LinkedList; use tonic::async_trait; diff --git a/tests/trace_context.rs b/tests/trace_context.rs index 3d376de..0cd992d 100644 --- a/tests/trace_context.rs +++ b/tests/trace_context.rs @@ -27,9 +27,10 @@ use skywalking::skywalking_proto::v3::{ SpanType, }; use std::collections::LinkedList; -use std::future; use std::sync::Mutex; use std::{cell::Ref, sync::Arc}; +use std::{future, thread}; +use tokio::runtime::Handle; /// Serialize from A should equal Serialize from B #[allow(dead_code)] @@ -236,9 +237,6 @@ fn crossprocess_test() { drop(span3); context2.with_spans(|spans| { - let span3 = spans.last().unwrap(); - return; - let span3 = spans.last().unwrap(); assert_eq!(span3.span_id, 0); assert_eq!(span3.parent_span_id, -1); @@ -261,6 +259,53 @@ fn crossprocess_test() { } } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn cross_threads_test() { + MockReporter::with_many( + |reporter| { + let tracer = Tracer::new("service", "instance", reporter); + let mut ctx1 = tracer.create_trace_context(); + let _span1 = ctx1.create_entry_span("op1"); + let _span2 = ctx1.create_local_span("op2"); + let snapshot = ctx1.capture(); + + let tracer_ = tracer.clone(); + thread::spawn(move || { + let mut ctx2 = tracer_.create_trace_context(); + let _span3 = ctx2.create_entry_span("op3"); + ctx2.continued(snapshot); + }) + .join() + .unwrap(); + + tracer + }, + |segments| { + let iter = segments.iter(); + let first = iter.nth(0).unwrap(); + let second = iter.nth(1).unwrap(); + + assert_eq!(first.trace_id, second.trace_id); + assert_eq!(first.spans.refs.len(), 1); + assert_eq!( + first.spans.refs[0], + SegmentReference { + ref_type: RefType::CrossThread as i32, + trace_id: second.trace_id.clone(), + parent_trace_segment_id: second.trace_segment_id.clone(), + parent_span_id: 1, + parent_service: "service".to_owned(), + parent_service_instance: "instance".to_owned(), + parent_endpoint: "op2".to_owned(), + ..Default::default() + } + ); + assert_eq!(second.spans.len(), 2); + }, + ) + .await; +} + #[derive(Default, Clone)] struct MockReporter { segments: Arc>>, @@ -268,6 +313,13 @@ struct MockReporter { impl MockReporter { async fn with(f1: impl FnOnce(MockReporter) -> Tracer, f2: impl FnOnce(&SegmentObject)) { + Self::with_many(f1, |segments| f2(&segments.front().unwrap())).await; + } + + async fn with_many( + f1: impl FnOnce(MockReporter) -> Tracer, + f2: impl FnOnce(&LinkedList), + ) { let reporter = MockReporter::default(); let tracer = f1(reporter.clone()); @@ -275,8 +327,7 @@ impl MockReporter { tracer.reporting(future::ready(())).await.unwrap(); let segments = reporter.segments.try_lock().unwrap(); - let segment = segments.front().unwrap(); - f2(segment); + f2(&*segments); } } From 59a39c398da6c945a871448ab8370e02c80d2735 Mon Sep 17 00:00:00 2001 From: jmjoy <918734043@qq.com> Date: Tue, 19 Jul 2022 00:10:12 +0800 Subject: [PATCH 2/4] Update e2e. --- e2e/data/expected_context.yaml | 35 +++++++++++++++++++++++++++++++++- e2e/src/main.rs | 12 ++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml index 162046e..2c95730 100644 --- a/e2e/data/expected_context.yaml +++ b/e2e/data/expected_context.yaml @@ -16,8 +16,30 @@ # under the License. # segmentItems: -- segmentSize: 1 +- segmentSize: 2 segments: + - segmentId: not null + spans: + - componentId: 11000 + endTime: gt 0 + isError: false + operationName: async-callback + parentSpanId: -1 + peer: '' + refs: + - networkAddress: '' + parentEndpoint: async-job + parentService: producer + parentServiceInstance: node_0 + parentSpanId: 2 + parentTraceSegmentId: not null + refType: CrossThread + traceId: not null + skipAnalysis: false + spanId: 0 + spanLayer: Http + spanType: Entry + startTime: gt 0 - segmentId: not null spans: - componentId: 11000 @@ -31,6 +53,17 @@ segmentItems: spanLayer: Http spanType: Exit startTime: gt 0 + - componentId: 11000 + endTime: gt 0 + isError: false + operationName: async-job + parentSpanId: 0 + peer: '' + skipAnalysis: false + spanId: 2 + spanLayer: Unknown + spanType: Local + startTime: gt 0 - componentId: 11000 endTime: gt 0 isError: false diff --git a/e2e/src/main.rs b/e2e/src/main.rs index ae2432b..5611d9f 100644 --- a/e2e/src/main.rs +++ b/e2e/src/main.rs @@ -50,6 +50,18 @@ async fn handle_ping( client.request(req).await.unwrap(); } + { + let _span3 = context.create_local_span("async-job"); + let snapshot = context.capture(); + + tokio::spawn(async move { + let mut context2 = tracer::create_trace_context(); + let _span3 = context2.create_entry_span("async-callback"); + context2.continued(snapshot); + }) + .await + .unwrap(); + } Ok(Response::new(Body::from("hoge"))) } From 7b2a672d6a8fe4c12e807be357904e6a9a69323c Mon Sep 17 00:00:00 2001 From: jmjoy <918734043@qq.com> Date: Tue, 19 Jul 2022 10:00:40 +0800 Subject: [PATCH 3/4] Update e2e/data/expected_context.yaml MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 吴晟 Wu Sheng --- e2e/data/expected_context.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml index 2c95730..9cd2fda 100644 --- a/e2e/data/expected_context.yaml +++ b/e2e/data/expected_context.yaml @@ -16,7 +16,7 @@ # under the License. # segmentItems: -- segmentSize: 2 +- segmentSize: gt 2 segments: - segmentId: not null spans: From ecba7f857cd6b95cf675ed19b0141324065a5f3f Mon Sep 17 00:00:00 2001 From: jmjoy <918734043@qq.com> Date: Tue, 19 Jul 2022 10:22:27 +0800 Subject: [PATCH 4/4] Fix e2e. --- e2e/data/expected_context.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml index 9cd2fda..f4ff6e1 100644 --- a/e2e/data/expected_context.yaml +++ b/e2e/data/expected_context.yaml @@ -16,7 +16,7 @@ # under the License. # segmentItems: -- segmentSize: gt 2 +- segmentSize: gt 1 segments: - segmentId: not null spans: