Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/data/expected_context.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ segmentItems:
peer: consumer:8082
skipAnalysis: false
spanId: 1
spanLayer: Http
spanLayer: Unknown
spanType: Exit
startTime: gt 0
- componentId: 11000
Expand Down
4 changes: 2 additions & 2 deletions e2e/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# specific language governing permissions and limitations
# under the License.
#
FROM rust:1.63
RUN apt-get update && apt-get install -y cmake protobuf-compiler=3.12.4-1
FROM rust:1.65
RUN apt-get update && apt-get install -y cmake protobuf-compiler
WORKDIR /build
COPY . /build/
RUN cargo build --release --workspace
Expand Down
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@

pub mod random_generator;
pub(crate) mod system_time;
pub(crate) mod wait_group;
59 changes: 59 additions & 0 deletions src/common/wait_group.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use std::sync::{Arc, Condvar, Mutex};

#[derive(Clone)]
pub(crate) struct WaitGroup {
inner: Arc<Inner>,
}

struct Inner {
var: Condvar,
count: Mutex<usize>,
}

impl Default for WaitGroup {
fn default() -> Self {
Self {
inner: Arc::new(Inner {
var: Condvar::new(),
count: Mutex::new(0),
}),
}
}
}

impl WaitGroup {
pub(crate) fn add(&self, n: usize) {
*self.inner.count.lock().unwrap() += n;
}

pub(crate) fn done(&self) {
let mut count = self.inner.count.lock().unwrap();
*count -= 1;
if *count == 0 {
self.inner.var.notify_all();
}
}

pub(crate) fn wait(self) {
let mut count = self.inner.count.lock().unwrap();
while *count > 0 {
count = self.inner.var.wait(count).unwrap();
}
}
}
5 changes: 4 additions & 1 deletion src/logging/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use crate::{
log_data_body::Content, JsonLog, KeyStringValuePair, LogData, LogDataBody, LogTags,
TextLog, TraceContext, YamlLog,
},
trace::{span::Span, trace_context::TracingContext},
trace::{
span::{AbstractSpan, Span},
trace_context::TracingContext,
},
};
use std::time::{SystemTime, UNIX_EPOCH};

Expand Down
166 changes: 153 additions & 13 deletions src/trace/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,47 @@
//! Span from Google Dapper Paper.

use crate::{
common::system_time::{fetch_time, TimePeriod},
common::{
system_time::{fetch_time, TimePeriod},
wait_group::WaitGroup,
},
proto::v3::{SpanLayer, SpanObject, SpanType},
trace::trace_context::SpanStack,
trace::trace_context::{SpanStack, SpanUid},
};
use std::{fmt::Formatter, mem::take, sync::Arc};
use std::{
fmt::{self, Formatter},
mem::take,
sync::{Arc, Weak},
};

/// [AbstractSpan] contains methods handle [SpanObject].
pub trait AbstractSpan {
/// Get immutable span object reference.
fn span_object(&self) -> &SpanObject;

/// Mutable with inner span object.
fn span_object_mut(&mut self) -> &mut SpanObject;

/// Get span id.
fn span_id(&self) -> i32 {
self.span_object().span_id
}

/// Add logs to the span.
fn add_log<K, V, I>(&mut self, message: I)
where
K: Into<String>,
V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
self.span_object_mut().add_log(message)
}

/// Add tag to the span.
fn add_tag(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.span_object_mut().add_tag(key, value)
}
}

/// Span is a concept that represents trace information for a single RPC.
/// The Rust SDK supports Entry Span to represent inbound to a service
Expand Down Expand Up @@ -61,13 +97,14 @@ use std::{fmt::Formatter, mem::take, sync::Arc};
/// ```
#[must_use = "assign a variable name to guard the span not be dropped immediately."]
pub struct Span {
index: usize,
uid: SpanUid,
obj: Option<SpanObject>,
wg: WaitGroup,
stack: Arc<SpanStack>,
}

impl std::fmt::Debug for Span {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
impl fmt::Debug for Span {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Span")
.field(
"data",
Expand All @@ -83,10 +120,11 @@ impl std::fmt::Debug for Span {
const SKYWALKING_RUST_COMPONENT_ID: i32 = 11000;

impl Span {
pub(crate) fn new(index: usize, obj: SpanObject, stack: Arc<SpanStack>) -> Self {
pub(crate) fn new(uid: SpanUid, obj: SpanObject, wg: WaitGroup, stack: Arc<SpanStack>) -> Self {
Self {
index,
uid,
obj: Some(obj),
wg,
stack,
}
}
Expand Down Expand Up @@ -115,6 +153,77 @@ impl Span {
}
}

fn is_active_span(&self) -> bool {
let active_spans = &*self.stack.active();
active_spans
.last()
.map(|span| span.uid() == self.uid)
.unwrap_or_default()
}

/// The [Span] finish at current tracing context, but the current span is
/// still alive, until [AsyncSpan] dropped.
///
/// This method must be called:
///
/// 1. In original thread (tracing context).
/// 2. Current span is active span.
///
/// During alive, tags, logs and attributes of the span could be changed, in
/// any thread.
///
/// # Panics
///
/// Current span could by active span.
pub fn prepare_for_async(mut self) -> AsyncSpan {
if !self.is_active_span() {
panic!("current span isn't active span");
}

self.wg.add(1);

AsyncSpan {
uid: self.uid,
wg: self.wg.clone(),
obj: take(&mut self.obj),
stack: Arc::downgrade(&self.stack),
}
}
}

impl Drop for Span {
/// Set the end time as current time, pop from context active span stack,
/// and push to context spans.
fn drop(&mut self) {
self.stack.finalize_span(self.uid, take(&mut self.obj));
}
}

impl AbstractSpan for Span {
#[inline]
fn span_object(&self) -> &SpanObject {
self.obj.as_ref().unwrap()
}

#[inline]
fn span_object_mut(&mut self) -> &mut SpanObject {
self.obj.as_mut().unwrap()
}
}

/// Generated by [Span::prepare_for_async], tags, logs and attributes of the
/// span could be changed, in any thread.
///
/// It could be finished when dropped.
#[must_use = "assign a variable name to guard the active span not be dropped immediately."]
pub struct AsyncSpan {
uid: SpanUid,
obj: Option<SpanObject>,
wg: WaitGroup,
stack: Weak<SpanStack>,
}

impl AsyncSpan {
/// Get immutable span object reference.
#[inline]
pub fn span_object(&self) -> &SpanObject {
Expand Down Expand Up @@ -148,20 +257,51 @@ impl Span {
}
}

impl Drop for Span {
/// Set the end time as current time, pop from context active span stack,
/// and push to context spans.
impl fmt::Debug for AsyncSpan {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("AsyncSpan")
.field(
"data",
match self.obj {
Some(ref obj) => obj,
None => &"<none>",
},
)
.finish()
}
}

impl Drop for AsyncSpan {
/// Set the end time as current time.
fn drop(&mut self) {
self.stack
.finalize_span(self.index, take(&mut self.obj).unwrap());
.upgrade()
.expect("TracingContext has dropped")
.finalize_async_span(self.uid, take(&mut self.obj).unwrap());

self.wg.done();
}
}

impl AbstractSpan for AsyncSpan {
#[inline]
fn span_object(&self) -> &SpanObject {
self.obj.as_ref().unwrap()
}

#[inline]
fn span_object_mut(&mut self) -> &mut SpanObject {
self.obj.as_mut().unwrap()
}
}

#[cfg(test)]
mod tests {
use super::*;

trait AssertSend: Send {}
trait AssertSend: Send + 'static {}

impl AssertSend for Span {}

impl AssertSend for AsyncSpan {}
}
Loading