Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ futures-util = "0.3.21"
prost = "0.10.4"
prost-derive = "0.10.1"
thiserror = "1.0.31"
tokio = { version = "1.18.2", features = ["full"] }
tokio = { version = "1.18.2", features = ["parking_lot"] }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a library should use fewer features.

tonic = { version = "0.7.2", features = ["codegen"] }
tracing = "0.1.35"
uuid = { version = "1.1.0", features = ["serde", "v4"] }
Expand All @@ -54,6 +54,7 @@ uuid = { version = "1.1.0", features = ["serde", "v4"] }
tonic-build = "0.7.2"

[dev-dependencies]
tokio = { version = "1.18.2", features = ["rt-multi-thread"] }
tokio-stream = { version = "0.1.8", features = ["net"] }

[[test]]
Expand Down
35 changes: 34 additions & 1 deletion e2e/data/expected_context.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,30 @@
# under the License.
#
segmentItems:
- segmentSize: 1
- segmentSize: 2
segments:
- segmentId: not null
spans:
- componentId: 11000
endTime: gt 0
isError: false
operationName: async-callback
parentSpanId: -1
peer: ''
refs:
- networkAddress: ''
parentEndpoint: async-job
parentService: producer
parentServiceInstance: node_0
parentSpanId: 2
parentTraceSegmentId: not null
refType: CrossThread
traceId: not null
skipAnalysis: false
spanId: 0
spanLayer: Http
spanType: Entry
startTime: gt 0
- segmentId: not null
spans:
- componentId: 11000
Expand All @@ -31,6 +53,17 @@ segmentItems:
spanLayer: Http
spanType: Exit
startTime: gt 0
- componentId: 11000
endTime: gt 0
isError: false
operationName: async-job
parentSpanId: 0
peer: ''
skipAnalysis: false
spanId: 2
spanLayer: Unknown
spanType: Local
startTime: gt 0
- componentId: 11000
endTime: gt 0
isError: false
Expand Down
12 changes: 12 additions & 0 deletions e2e/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ async fn handle_ping(

client.request(req).await.unwrap();
}
{
let _span3 = context.create_local_span("async-job");
let snapshot = context.capture();

tokio::spawn(async move {
let mut context2 = tracer::create_trace_context();
let _span3 = context2.create_entry_span("async-callback");
context2.continued(snapshot);
})
.await
.unwrap();
}
Ok(Response::new(Body::from("hoge")))
}

Expand Down
2 changes: 1 addition & 1 deletion examples/simple_trace_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn handle_request(tracer: Tracer) {
{
// Generate an Entry Span when a request is received.
// An Entry Span is generated only once per context.
// You should assign a variable name to guard the span not be dropped immediately.
// Assign a variable name to guard the span not to be dropped immediately.
let _span = ctx.create_entry_span("op1");

// Something...
Expand Down
6 changes: 4 additions & 2 deletions src/context/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use super::{
/// {
/// // Generate an Entry Span when a request is received.
/// // An Entry Span is generated only once per context.
/// // You should assign a variable name to guard the span not be dropped immediately.
/// // Assign a variable name to guard the span not to be dropped immediately.
/// let _span = ctx.create_entry_span("op1");
///
/// // Something...
Expand All @@ -57,7 +57,7 @@ use super::{
/// // Auto report ctx when dropped.
/// }
/// ```
#[derive(Clone)]
#[must_use = "assign a variable name to guard the span not be dropped immediately."]
pub struct Span {
index: usize,
context: WeakTracingContext,
Expand Down Expand Up @@ -118,6 +118,8 @@ impl Span {
self.context.upgrade().expect("Context has dropped")
}

// Notice: Perhaps in the future, `RwLock` can be used instead of `Mutex`, so `with_*` can be nested.
// (Although I can't find the meaning of such use at present.)
pub fn with_span_object<T>(&self, f: impl FnOnce(&SpanObject) -> T) -> T {
self.upgrade_context()
.with_active_span_stack(|stack| f(&stack[self.index]))
Expand Down
73 changes: 59 additions & 14 deletions src/context/trace_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ use std::{
mem::take,
sync::{
atomic::{AtomicI32, Ordering},
Arc, Mutex, Weak,
Arc, Mutex, RwLock, Weak,
},
};

struct Inner {
trace_id: String,
trace_id: RwLock<String>,
trace_segment_id: String,
service: String,
service_instance: String,
Expand All @@ -49,8 +49,7 @@ struct Inner {
primary_endpoint_name: Mutex<String>,
}

#[derive(Clone)]
#[must_use = "You should call `create_entry_span` after `TracingContext` created."]
#[must_use = "call `create_entry_span` after `TracingContext` created."]
pub struct TracingContext {
inner: Arc<Inner>,
tracer: WeakTracer,
Expand Down Expand Up @@ -79,7 +78,7 @@ impl TracingContext {
) -> Self {
TracingContext {
inner: Arc::new(Inner {
trace_id: RandomGenerator::generate(),
trace_id: RwLock::new(RandomGenerator::generate()),
trace_segment_id: RandomGenerator::generate(),
service: service_name.to_string(),
service_instance: instance_name.to_string(),
Expand All @@ -104,7 +103,7 @@ impl TracingContext {
) -> Self {
TracingContext {
inner: Arc::new(Inner {
trace_id: context.parent_trace_id.clone(),
trace_id: RwLock::new(context.parent_trace_id.clone()),
trace_segment_id: RandomGenerator::generate(),
service: service_name.to_string(),
service_instance: instance_name.to_string(),
Expand All @@ -119,8 +118,16 @@ impl TracingContext {
}

#[inline]
pub fn trace_id(&self) -> &str {
&self.inner.trace_id
pub fn trace_id(&self) -> String {
self.with_trace_id(ToString::to_string)
}

fn with_trace_id<T>(&self, f: impl FnOnce(&String) -> T) -> T {
f(&*self.inner.trace_id.try_read().expect(LOCK_MSG))
}

fn with_trace_id_mut<T>(&mut self, f: impl FnOnce(&mut String) -> T) -> T {
f(&mut *self.inner.trace_id.try_write().expect(LOCK_MSG))
}

#[inline]
Expand Down Expand Up @@ -182,8 +189,13 @@ impl TracingContext {
self.with_active_span_stack(|stack| stack.last().map(f))
}

// TODO Using for capture and continued.
#[allow(dead_code)]
pub(crate) fn with_active_span_mut<T>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you share what does this method mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method accepts a closure that modifies the top element of the active span stack. Because Arc is used, reference counting has no mutable method, so Mutex is required for internal mutability, and with_* methods can easily handle try_lock and drop, , shielding these details.

&mut self,
f: impl FnOnce(&mut SpanObject) -> T,
) -> Option<T> {
self.with_active_span_stack_mut(|stack| stack.last_mut().map(f))
}

fn with_primary_endpoint_name<T>(&self, f: impl FnOnce(&String) -> T) -> T {
f(&*self.inner.primary_endpoint_name.try_lock().expect(LOCK_MSG))
}
Expand All @@ -209,7 +221,7 @@ impl TracingContext {
if let Some(segment_link) = &self.inner.segment_link {
span.refs.push(SegmentReference {
ref_type: RefType::CrossProcess as i32,
trace_id: self.inner.trace_id.clone(),
trace_id: self.trace_id(),
parent_trace_segment_id: segment_link.parent_trace_segment_id.clone(),
parent_span_id: segment_link.parent_span_id,
parent_service: segment_link.parent_service.clone(),
Expand Down Expand Up @@ -273,6 +285,40 @@ impl TracingContext {
Span::new(index, self.downgrade())
}

/// Capture a snapshot for cross-thread propagation.
pub fn capture(&self) -> ContextSnapshot {
ContextSnapshot {
trace_id: self.trace_id(),
trace_segment_id: self.trace_segment_id().to_owned(),
span_id: self.peek_active_span_id().unwrap_or(-1),
parent_endpoint: self.with_primary_endpoint_name(Clone::clone),
}
}

/// Build the reference between this segment and a cross-thread segment.
pub fn continued(&mut self, snapshot: ContextSnapshot) {
if snapshot.is_valid() {
self.with_trace_id_mut(|trace_id| *trace_id = snapshot.trace_id.clone());

let tracer = self.upgrade_tracer();

let segment_ref = SegmentReference {
ref_type: RefType::CrossThread as i32,
trace_id: snapshot.trace_id,
parent_trace_segment_id: snapshot.trace_segment_id,
parent_span_id: snapshot.span_id,
parent_service: tracer.service_name().to_owned(),
parent_service_instance: tracer.instance_name().to_owned(),
parent_endpoint: snapshot.parent_endpoint,
network_address_used_at_peer: Default::default(),
};

self.with_active_span_mut(|span| {
span.refs.push(segment_ref);
});
}
}

/// Close span. We can't use closed span after finalize called.
pub(crate) fn finalize_span(&mut self, index: usize) -> Result<(), ()> {
let span = self.pop_active_span(index);
Expand All @@ -290,7 +336,7 @@ impl TracingContext {
///
/// Notice: The spans will taked, so this method shouldn't be called twice.
pub(crate) fn convert_segment_object(&mut self) -> SegmentObject {
let trace_id = self.trace_id().to_owned();
let trace_id = self.trace_id();
let trace_segment_id = self.trace_segment_id().to_owned();
let service = self.service().to_owned();
let service_instance = self.service_instance().to_owned();
Expand Down Expand Up @@ -368,12 +414,11 @@ impl WeakTracingContext {
}
}

#[derive(Debug)]
pub struct ContextSnapshot {
trace_id: String,
trace_segment_id: String,
span_id: i32,
// TODO Using for capture and continued.
#[allow(dead_code)]
parent_endpoint: String,
}

Expand Down
1 change: 0 additions & 1 deletion src/reporter/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

use super::Reporter;
use crate::skywalking_proto::v3::SegmentObject;

use std::collections::LinkedList;
use tonic::async_trait;

Expand Down
63 changes: 57 additions & 6 deletions tests/trace_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ use skywalking::skywalking_proto::v3::{
SpanType,
};
use std::collections::LinkedList;
use std::future;
use std::sync::Mutex;
use std::{cell::Ref, sync::Arc};
use std::{future, thread};
use tokio::runtime::Handle;

/// Serialize from A should equal Serialize from B
#[allow(dead_code)]
Expand Down Expand Up @@ -236,9 +237,6 @@ fn crossprocess_test() {
drop(span3);

context2.with_spans(|spans| {
let span3 = spans.last().unwrap();
return;

let span3 = spans.last().unwrap();
assert_eq!(span3.span_id, 0);
assert_eq!(span3.parent_span_id, -1);
Expand All @@ -261,22 +259,75 @@ fn crossprocess_test() {
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn cross_threads_test() {
MockReporter::with_many(
|reporter| {
let tracer = Tracer::new("service", "instance", reporter);
let mut ctx1 = tracer.create_trace_context();
let _span1 = ctx1.create_entry_span("op1");
let _span2 = ctx1.create_local_span("op2");
let snapshot = ctx1.capture();

let tracer_ = tracer.clone();
thread::spawn(move || {
let mut ctx2 = tracer_.create_trace_context();
let _span3 = ctx2.create_entry_span("op3");
ctx2.continued(snapshot);
})
.join()
.unwrap();

tracer
},
|segments| {
let iter = segments.iter();
let first = iter.nth(0).unwrap();
let second = iter.nth(1).unwrap();

assert_eq!(first.trace_id, second.trace_id);
assert_eq!(first.spans.refs.len(), 1);
assert_eq!(
first.spans.refs[0],
SegmentReference {
ref_type: RefType::CrossThread as i32,
trace_id: second.trace_id.clone(),
parent_trace_segment_id: second.trace_segment_id.clone(),
parent_span_id: 1,
parent_service: "service".to_owned(),
parent_service_instance: "instance".to_owned(),
parent_endpoint: "op2".to_owned(),
..Default::default()
}
);
assert_eq!(second.spans.len(), 2);
},
)
.await;
}

#[derive(Default, Clone)]
struct MockReporter {
segments: Arc<Mutex<LinkedList<SegmentObject>>>,
}

impl MockReporter {
async fn with(f1: impl FnOnce(MockReporter) -> Tracer, f2: impl FnOnce(&SegmentObject)) {
Self::with_many(f1, |segments| f2(&segments.front().unwrap())).await;
}

async fn with_many(
f1: impl FnOnce(MockReporter) -> Tracer,
f2: impl FnOnce(&LinkedList<SegmentObject>),
) {
let reporter = MockReporter::default();

let tracer = f1(reporter.clone());

tracer.reporting(future::ready(())).await.unwrap();

let segments = reporter.segments.try_lock().unwrap();
let segment = segments.front().unwrap();
f2(segment);
f2(&*segments);
}
}

Expand Down