Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,48 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
```

# Advanced APIs

## Async Span APIs

`Span::prepare_for_async` designed for async use cases.
When tags, logs, and attributes (including end time) of the span need to be set in another
thread or coroutine.

`TracingContext::wait` wait for all `AsyncSpan` finished.

```rust
use skywalking::{
trace::tracer::Tracer,
trace::span::AbstractSpan,
};

async fn handle(tracer: Tracer) {
let mut ctx = tracer.create_trace_context();

{
let span = ctx.create_entry_span("op1");

// Create AsyncSpan and drop span.
// Internally, span will occupy the position of finalized span stack.
let mut async_span = span.prepare_for_async();

// Start async route, catch async_span with `move` keyword.
tokio::spawn(async move {

async_span.add_tag("foo", "bar");

// Something...

// async_span will drop here, submit modifications to finalized spans stack.
});
}

// Wait for all `AsyncSpan` finished.
ctx.wait();
}
```

# How to compile?

If you have `skywalking-(VERSION).crate`, you can unpack it with the way as follows:
Expand Down
2 changes: 1 addition & 1 deletion e2e/data/expected_context.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ segmentItems:
peer: consumer:8082
skipAnalysis: false
spanId: 1
spanLayer: Http
spanLayer: Unknown
spanType: Exit
startTime: gt 0
- componentId: 11000
Expand Down
4 changes: 2 additions & 2 deletions e2e/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# specific language governing permissions and limitations
# under the License.
#
FROM rust:1.63
RUN apt-get update && apt-get install -y cmake protobuf-compiler=3.12.4-1
FROM rust:1.65
RUN apt-get update && apt-get install -y cmake protobuf-compiler
WORKDIR /build
COPY . /build/
RUN cargo build --release --workspace
Expand Down
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@

pub mod random_generator;
pub(crate) mod system_time;
pub(crate) mod wait_group;
59 changes: 59 additions & 0 deletions src/common/wait_group.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use std::sync::{Arc, Condvar, Mutex};

#[derive(Clone)]
pub(crate) struct WaitGroup {
inner: Arc<Inner>,
}

struct Inner {
var: Condvar,
count: Mutex<usize>,
}

impl Default for WaitGroup {
fn default() -> Self {
Self {
inner: Arc::new(Inner {
var: Condvar::new(),
count: Mutex::new(0),
}),
}
}
}

impl WaitGroup {
pub(crate) fn add(&self, n: usize) {
*self.inner.count.lock().unwrap() += n;
}

pub(crate) fn done(&self) {
let mut count = self.inner.count.lock().unwrap();
*count -= 1;
if *count == 0 {
self.inner.var.notify_all();
}
}

pub(crate) fn wait(self) {
let mut count = self.inner.count.lock().unwrap();
while *count > 0 {
count = self.inner.var.wait(count).unwrap();
}
}
}
5 changes: 4 additions & 1 deletion src/logging/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use crate::{
log_data_body::Content, JsonLog, KeyStringValuePair, LogData, LogDataBody, LogTags,
TextLog, TraceContext, YamlLog,
},
trace::{span::Span, trace_context::TracingContext},
trace::{
span::{AbstractSpan, Span},
trace_context::TracingContext,
},
};
use std::time::{SystemTime, UNIX_EPOCH};

Expand Down
174 changes: 140 additions & 34 deletions src/trace/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,47 @@
//! Span from Google Dapper Paper.

use crate::{
common::system_time::{fetch_time, TimePeriod},
common::{
system_time::{fetch_time, TimePeriod},
wait_group::WaitGroup,
},
proto::v3::{SpanLayer, SpanObject, SpanType},
trace::trace_context::SpanStack,
trace::trace_context::{SpanStack, SpanUid},
};
use std::{fmt::Formatter, mem::take, sync::Arc};
use std::{
fmt::{self, Formatter},
mem::take,
sync::{Arc, Weak},
};

/// [AbstractSpan] contains methods handle [SpanObject].
pub trait AbstractSpan {
/// Get immutable span object reference.
fn span_object(&self) -> &SpanObject;

/// Mutable with inner span object.
fn span_object_mut(&mut self) -> &mut SpanObject;

/// Get span id.
fn span_id(&self) -> i32 {
self.span_object().span_id
}

/// Add logs to the span.
fn add_log<K, V, I>(&mut self, message: I)
where
K: Into<String>,
V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
self.span_object_mut().add_log(message)
}

/// Add tag to the span.
fn add_tag(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.span_object_mut().add_tag(key, value)
}
}

/// Span is a concept that represents trace information for a single RPC.
/// The Rust SDK supports Entry Span to represent inbound to a service
Expand Down Expand Up @@ -61,13 +97,14 @@ use std::{fmt::Formatter, mem::take, sync::Arc};
/// ```
#[must_use = "assign a variable name to guard the span not be dropped immediately."]
pub struct Span {
index: usize,
uid: SpanUid,
obj: Option<SpanObject>,
wg: WaitGroup,
stack: Arc<SpanStack>,
}

impl std::fmt::Debug for Span {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
impl fmt::Debug for Span {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Span")
.field(
"data",
Expand All @@ -83,10 +120,11 @@ impl std::fmt::Debug for Span {
const SKYWALKING_RUST_COMPONENT_ID: i32 = 11000;

impl Span {
pub(crate) fn new(index: usize, obj: SpanObject, stack: Arc<SpanStack>) -> Self {
pub(crate) fn new(uid: SpanUid, obj: SpanObject, wg: WaitGroup, stack: Arc<SpanStack>) -> Self {
Self {
index,
uid,
obj: Some(obj),
wg,
stack,
}
}
Expand Down Expand Up @@ -115,53 +153,121 @@ impl Span {
}
}

/// Get immutable span object reference.
fn is_active_span(&self) -> bool {
let active_spans = &*self.stack.active();
active_spans
.last()
.map(|span| span.uid() == self.uid)
.unwrap_or_default()
}

/// The [Span] finish at current tracing context, but the current span is
/// still alive, until [AsyncSpan] dropped.
///
/// This method must be called:
///
/// 1. In original thread (tracing context).
/// 2. Current span is active span.
///
/// During alive, tags, logs and attributes of the span could be changed, in
/// any thread.
///
/// # Panics
///
/// Current span could by active span.
pub fn prepare_for_async(mut self) -> AsyncSpan {
if !self.is_active_span() {
panic!("current span isn't active span");
}

self.wg.add(1);

AsyncSpan {
uid: self.uid,
wg: self.wg.clone(),
obj: take(&mut self.obj),
stack: Arc::downgrade(&self.stack),
}
}
}

impl Drop for Span {
/// Set the end time as current time, pop from context active span stack,
/// and push to context spans.
fn drop(&mut self) {
self.stack.finalize_span(self.uid, take(&mut self.obj));
}
}

impl AbstractSpan for Span {
#[inline]
pub fn span_object(&self) -> &SpanObject {
fn span_object(&self) -> &SpanObject {
self.obj.as_ref().unwrap()
}

/// Mutable with inner span object.
#[inline]
pub fn span_object_mut(&mut self) -> &mut SpanObject {
fn span_object_mut(&mut self) -> &mut SpanObject {
self.obj.as_mut().unwrap()
}
}

/// Get span id.
pub fn span_id(&self) -> i32 {
self.span_object().span_id
}

/// Add logs to the span.
pub fn add_log<K, V, I>(&mut self, message: I)
where
K: Into<String>,
V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
self.span_object_mut().add_log(message)
}
/// Generated by [Span::prepare_for_async], tags, logs and attributes of the
/// span could be changed, in any thread.
///
/// It could be finished when dropped.
#[must_use = "assign a variable name to guard the active span not be dropped immediately."]
pub struct AsyncSpan {
uid: SpanUid,
obj: Option<SpanObject>,
wg: WaitGroup,
stack: Weak<SpanStack>,
}

/// Add tag to the span.
pub fn add_tag(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.span_object_mut().add_tag(key, value)
impl fmt::Debug for AsyncSpan {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("AsyncSpan")
.field(
"data",
match self.obj {
Some(ref obj) => obj,
None => &"<none>",
},
)
.finish()
}
}

impl Drop for Span {
/// Set the end time as current time, pop from context active span stack,
/// and push to context spans.
impl Drop for AsyncSpan {
/// Set the end time as current time.
fn drop(&mut self) {
self.stack
.finalize_span(self.index, take(&mut self.obj).unwrap());
.upgrade()
.expect("TracingContext has dropped")
.finalize_async_span(self.uid, take(&mut self.obj).unwrap());

self.wg.done();
}
}

impl AbstractSpan for AsyncSpan {
#[inline]
fn span_object(&self) -> &SpanObject {
self.obj.as_ref().unwrap()
}

#[inline]
fn span_object_mut(&mut self) -> &mut SpanObject {
self.obj.as_mut().unwrap()
}
}

#[cfg(test)]
mod tests {
use super::*;

trait AssertSend: Send {}
trait AssertSend: Send + 'static {}

impl AssertSend for Span {}

impl AssertSend for AsyncSpan {}
}
Loading