Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ the context after the span finished.
# Example

```rust, no_run
use skywalking::context::tracer::Tracer;
use skywalking::reporter::grpc::GrpcReporter;
use skywalking::trace::{reporter::grpc::GrpcTraceReporter, tracer::Tracer};
use std::error::Error;
use tokio::signal;

Expand Down Expand Up @@ -75,7 +74,7 @@ async fn handle_request(tracer: Tracer) {

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
let reporter = GrpcTraceReporter::connect("http://0.0.0.0:11800").await?;
let tracer = Tracer::new("service", "instance", reporter);

tokio::spawn(handle_request(tracer.clone()));
Expand Down
5 changes: 4 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.build_server(false)
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
.compile(
&["./skywalking-data-collect-protocol/language-agent/Tracing.proto"],
&[
"./skywalking-data-collect-protocol/language-agent/Tracing.proto",
"./skywalking-data-collect-protocol/logging/Logging.proto",
],
&["./skywalking-data-collect-protocol"],
)?;
Ok(())
Expand Down
16 changes: 7 additions & 9 deletions e2e/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ use hyper::{
service::{make_service_fn, service_fn},
Body, Client, Method, Request, Response, Server, StatusCode,
};
use skywalking::{
context::{
propagation::{
context::SKYWALKING_HTTP_CONTEXT_HEADER_KEY, decoder::decode_propagation,
encoder::encode_propagation,
},
tracer::{self, Tracer},
use skywalking::trace::{
propagation::{
context::SKYWALKING_HTTP_CONTEXT_HEADER_KEY, decoder::decode_propagation,
encoder::encode_propagation,
},
reporter::grpc::GrpcReporter,
reporter::grpc::GrpcTraceReporter,
tracer::{self, Tracer},
};
use std::{convert::Infallible, error::Error, future, net::SocketAddr};
use structopt::StructOpt;
Expand Down Expand Up @@ -153,7 +151,7 @@ struct Opt {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let opt = Opt::from_args();
let reporter = GrpcReporter::connect("http://collector:19876").await?;
let reporter = GrpcTraceReporter::connect("http://collector:19876").await?;

let handle = if opt.mode == "consumer" {
tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter));
Expand Down
4 changes: 2 additions & 2 deletions examples/simple_trace_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
//
use skywalking::{context::tracer::Tracer, reporter::grpc::GrpcReporter};
use skywalking::trace::{reporter::grpc::GrpcTraceReporter, tracer::Tracer};
use std::error::Error;
use tokio::signal;

Expand Down Expand Up @@ -47,7 +47,7 @@ async fn handle_request(tracer: Tracer) {

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
let reporter = GrpcTraceReporter::connect("http://0.0.0.0:11800").await?;
let tracer = Tracer::new("service", "instance", reporter);

tokio::spawn(handle_request(tracer.clone()));
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
#![doc = include_str!("../README.md")]

pub mod common;
pub mod context;
pub(crate) mod error;
pub mod reporter;
pub mod logging;
pub mod skywalking_proto;
pub mod trace;

pub use error::{Error, Result};
17 changes: 17 additions & 0 deletions src/logging/logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

pub struct Logger {}
18 changes: 18 additions & 0 deletions src/logging/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

pub mod logger;
pub mod record;
17 changes: 17 additions & 0 deletions src/logging/record.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

pub struct LogRecord {}
2 changes: 1 addition & 1 deletion src/skywalking_proto/v3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
use crate::context::system_time::{fetch_time, TimePeriod};
use crate::trace::system_time::{fetch_time, TimePeriod};

tonic::include_proto!("skywalking.v3");

Expand Down
1 change: 1 addition & 0 deletions src/context/mod.rs → src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//

pub mod propagation;
pub mod reporter;
pub mod span;
pub(crate) mod system_time;
pub mod trace_context;
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.
//

use crate::context::propagation::context::PropagationContext;
use crate::trace::propagation::context::PropagationContext;
use base64::decode;

/// Decode context value packed in `sw8` header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.
//

use crate::context::trace_context::TracingContext;
use crate::trace::trace_context::TracingContext;
use base64::encode;

/// Encode TracingContext to carry current trace info to the destination of RPC
Expand Down
File renamed without changes.
8 changes: 4 additions & 4 deletions src/reporter/grpc.rs → src/trace/reporter/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.
//

use super::Reporter;
use super::TraceReporter;
use crate::skywalking_proto::v3::{
trace_segment_report_service_client::TraceSegmentReportServiceClient, SegmentObject,
};
Expand All @@ -27,11 +27,11 @@ use tonic::{

type ReporterClient = TraceSegmentReportServiceClient<Channel>;

pub struct GrpcReporter {
pub struct GrpcTraceReporter {
client: ReporterClient,
}

impl GrpcReporter {
impl GrpcTraceReporter {
pub fn new(channel: Channel) -> Self {
let client = ReporterClient::new(channel);
Self { client }
Expand All @@ -46,7 +46,7 @@ impl GrpcReporter {
}

#[async_trait]
impl Reporter for GrpcReporter {
impl TraceReporter for GrpcTraceReporter {
async fn collect(&mut self, segments: LinkedList<SegmentObject>) -> Result<(), Box<dyn Error>> {
let stream = stream::iter(segments);
self.client.collect(stream).await?;
Expand Down
10 changes: 5 additions & 5 deletions src/reporter/log.rs → src/trace/reporter/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.
//

use super::Reporter;
use super::TraceReporter;
use crate::skywalking_proto::v3::SegmentObject;
use std::{collections::LinkedList, error::Error};
use tonic::async_trait;
Expand All @@ -24,12 +24,12 @@ enum Used {
Tracing,
}

pub struct LogReporter {
pub struct LogTraceReporter {
tip: String,
used: Used,
}

impl LogReporter {
impl LogTraceReporter {
#[inline]
pub fn new() -> Self {
Default::default()
Expand All @@ -51,7 +51,7 @@ impl LogReporter {
}
}

impl Default for LogReporter {
impl Default for LogTraceReporter {
fn default() -> Self {
Self {
tip: "collect".to_string(),
Expand All @@ -61,7 +61,7 @@ impl Default for LogReporter {
}

#[async_trait]
impl Reporter for LogReporter {
impl TraceReporter for LogTraceReporter {
async fn collect(&mut self, segments: LinkedList<SegmentObject>) -> Result<(), Box<dyn Error>> {
for segment in segments {
match self.used {
Expand Down
6 changes: 3 additions & 3 deletions src/reporter/mod.rs → src/trace/reporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ use crate::skywalking_proto::v3::SegmentObject;
use std::{collections::LinkedList, error::Error, result::Result};
use tonic::async_trait;

pub(crate) type DynReporter = dyn Reporter + Send + Sync + 'static;
pub(crate) type DynTraceReporter = dyn TraceReporter + Send + Sync + 'static;

#[async_trait]
pub trait Reporter {
pub trait TraceReporter {
async fn collect(&mut self, segments: LinkedList<SegmentObject>) -> Result<(), Box<dyn Error>>;
}

#[async_trait]
impl Reporter for () {
impl TraceReporter for () {
async fn collect(
&mut self,
_segments: LinkedList<SegmentObject>,
Expand Down
2 changes: 1 addition & 1 deletion src/context/span.rs → src/trace/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::{
/// # Example
///
/// ```
/// use skywalking::context::tracer::Tracer;
/// use skywalking::trace::tracer::Tracer;
///
/// async fn handle_request(tracer: Tracer) {
/// let mut ctx = tracer.create_trace_context();
Expand Down
File renamed without changes.
3 changes: 2 additions & 1 deletion src/context/trace_context.rs → src/trace/trace_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use super::{
};
use crate::{
common::random_generator::RandomGenerator,
context::propagation::context::PropagationContext,
error::LOCK_MSG,
skywalking_proto::v3::{
RefType, SegmentObject, SegmentReference, SpanLayer, SpanObject, SpanType,
},
trace::propagation::context::PropagationContext,
};
use std::{
fmt::Formatter,
Expand All @@ -35,6 +35,7 @@ use std::{

#[derive(Default)]
pub(crate) struct SpanStack {
// TODO Swith to use `try_rwlock` instead of `RwLock` for better performance.
pub(crate) finialized: RwLock<Vec<SpanObject>>,
pub(crate) active: RwLock<Vec<SpanObject>>,
}
Expand Down
16 changes: 9 additions & 7 deletions src/context/tracer.rs → src/trace/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
//

use crate::{
context::trace_context::TracingContext,
reporter::{DynReporter, Reporter},
skywalking_proto::v3::SegmentObject,
trace::{
reporter::{DynTraceReporter, TraceReporter},
trace_context::TracingContext,
},
};
use std::{
collections::LinkedList,
Expand Down Expand Up @@ -123,7 +125,7 @@ struct Inner {
instance_name: String,
segment_sender: Box<dyn SegmentSender>,
segment_receiver: Box<dyn SegmentReceiver>,
reporter: Mutex<Box<DynReporter>>,
reporter: Mutex<Box<DynTraceReporter>>,
is_reporting: AtomicBool,
is_closed: AtomicBool,
}
Expand All @@ -139,7 +141,7 @@ impl Tracer {
pub fn new(
service_name: impl ToString,
instance_name: impl ToString,
reporter: impl Reporter + Send + Sync + 'static,
reporter: impl TraceReporter + Send + Sync + 'static,
) -> Self {
let (segment_sender, segment_receiver) = mpsc::unbounded_channel();
Self::new_with_channel(
Expand All @@ -154,7 +156,7 @@ impl Tracer {
pub fn new_with_channel(
service_name: impl ToString,
instance_name: impl ToString,
reporter: impl Reporter + Send + Sync + 'static,
reporter: impl TraceReporter + Send + Sync + 'static,
channel: (impl SegmentSender, impl SegmentReceiver),
) -> Self {
Self {
Expand All @@ -179,7 +181,7 @@ impl Tracer {
}

/// Set the reporter, only valid if [`Tracer::reporting`] not started.
pub fn set_reporter(&self, reporter: impl Reporter + Send + Sync + 'static) {
pub fn set_reporter(&self, reporter: impl TraceReporter + Send + Sync + 'static) {
if !self.inner.is_reporting.load(Ordering::Relaxed) {
if let Ok(mut lock) = self.inner.reporter.try_lock() {
*lock = Box::new(reporter);
Expand Down Expand Up @@ -284,7 +286,7 @@ impl Tracer {
}

async fn report_segment_object(
reporter: &Mutex<Box<DynReporter>>,
reporter: &Mutex<Box<DynTraceReporter>>,
segments: LinkedList<SegmentObject>,
) {
if let Err(err) = reporter.lock().await.collect(segments).await {
Expand Down
16 changes: 7 additions & 9 deletions tests/propagation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
//

#![allow(unused_imports)]
use skywalking::{
context::{
propagation::{
context::PropagationContext, decoder::decode_propagation, encoder::encode_propagation,
},
trace_context::TracingContext,
tracer::Tracer,
use skywalking::trace::{
propagation::{
context::PropagationContext, decoder::decode_propagation, encoder::encode_propagation,
},
reporter::log::LogReporter,
reporter::log::LogTraceReporter,
trace_context::TracingContext,
tracer::Tracer,
};
use std::sync::Arc;

Expand Down Expand Up @@ -68,7 +66,7 @@ fn invalid_sample() {

#[test]
fn basic_encode() {
let tracer = Tracer::new("mesh", "instance", LogReporter::new());
let tracer = Tracer::new("mesh", "instance", LogTraceReporter::new());
let tc = tracer.create_trace_context();
let res = encode_propagation(&tc, "/api/v1/health", "example.com:8080");
let res2 = decode_propagation(&res).unwrap();
Expand Down
Loading