Skip to content

Commit 6232828

Browse files
authored
Improve LogReporter and fix tests. (#36)
1 parent 9f777ed commit 6232828

File tree

11 files changed

+165
-57
lines changed

11 files changed

+165
-57
lines changed

.github/workflows/ci.yaml

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,18 @@ env:
3232

3333
jobs:
3434
CI:
35+
needs:
36+
- license-check
37+
- fmt
38+
- check-and-test
39+
- doc
40+
runs-on: ubuntu-latest
41+
steps:
42+
- uses: actions/checkout@v2
43+
with:
44+
submodules: 'recursive'
45+
46+
license-check:
3547
runs-on: ubuntu-latest
3648
steps:
3749
- uses: actions/checkout@v2
@@ -41,17 +53,50 @@ jobs:
4153
uses: apache/skywalking-eyes/header/@501a28d2fb4a9b962661987e50cf0219631b32ff
4254
with:
4355
config: .github/licenserc.yaml
44-
- name: Check dependencies License
56+
- name: Check Dependencies License
4557
uses: apache/skywalking-eyes/dependency/@501a28d2fb4a9b962661987e50cf0219631b32ff
4658
with:
4759
config: .github/licenserc.yaml
60+
61+
fmt:
62+
runs-on: ubuntu-latest
63+
steps:
64+
- uses: actions/checkout@v2
65+
with:
66+
submodules: 'recursive'
4867
- name: Install Rust toolchain
49-
run: rustup toolchain install stable --component clippy --component rustfmt
68+
run: rustup toolchain install stable --component rustfmt
5069
- name: Run format
5170
run: cargo fmt --all -- --check
71+
72+
check-and-test:
73+
strategy:
74+
fail-fast: false
75+
matrix:
76+
features:
77+
- ""
78+
- "--all-features"
79+
runs-on: ubuntu-latest
80+
steps:
81+
- uses: actions/checkout@v2
82+
with:
83+
submodules: 'recursive'
84+
- name: Install Rust toolchain
85+
run: rustup toolchain install stable --component clippy
86+
- name: Run check
87+
run: cargo check --workspace --release ${{ matrix.features }}
5288
- name: Run clippy
53-
run: cargo clippy --workspace --tests -- -D warnings
89+
run: cargo clippy --workspace --release ${{ matrix.features }} -- -D warnings
5490
- name: Run tests
55-
run: cargo test --workspace
91+
run: cargo test --workspace --release ${{ matrix.features }}
92+
93+
doc:
94+
runs-on: ubuntu-latest
95+
steps:
96+
- uses: actions/checkout@v2
97+
with:
98+
submodules: 'recursive'
99+
- name: Install Rust toolchain
100+
run: rustup toolchain install stable
56101
- name: Run docs
57-
run: cargo rustdoc --all-features -- -D warnings
102+
run: cargo rustdoc --release -- -D warnings

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ uuid = { version = "1.1.0", features = ["serde", "v4"] }
5555
tonic-build = "0.7.2"
5656

5757
[dev-dependencies]
58-
tokio = { version = "1.18.2", features = ["rt-multi-thread"] }
58+
tokio = { version = "1.18.2", features = ["rt-multi-thread", "signal"] }
5959
tokio-stream = { version = "0.1.8", features = ["net"] }
6060

6161
[[test]]

README.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
8181
tokio::spawn(handle_request(tracer.clone()));
8282
8383
// Start to report.
84-
let handle = tracer.reporting(async move {
85-
let _ = signal::ctrl_c().await;
86-
});
87-
88-
handle.await?;
84+
tracer
85+
.reporting(async move {
86+
let _ = signal::ctrl_c().await.unwrap();
87+
})
88+
.await?;
8989
9090
Ok(())
9191
}
@@ -104,6 +104,7 @@ use `cargo run --example simple_trace_report`
104104
which outputs executable, then run it.
105105

106106
# Release
107+
107108
The SkyWalking committer(PMC included) could follow [this doc](Release-guide.md) to release an official version.
108109

109110
# License

e2e/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use skywalking::context::tracer::{self, Tracer};
2525
use skywalking::reporter::grpc::GrpcReporter;
2626
use std::convert::Infallible;
2727
use std::error::Error;
28-
use std::future::pending;
28+
use std::future;
2929
use std::net::SocketAddr;
3030
use structopt::StructOpt;
3131

@@ -153,12 +153,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
153153

154154
let handle = if opt.mode == "consumer" {
155155
tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter));
156-
let handle = tracer::reporting(pending());
156+
let handle = tracer::reporting(future::pending());
157157
run_consumer_service([0, 0, 0, 0]).await;
158158
handle
159159
} else if opt.mode == "producer" {
160160
tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter));
161-
let handle = tracer::reporting(pending());
161+
let handle = tracer::reporting(future::pending());
162162
run_producer_service([0, 0, 0, 0]).await;
163163
handle
164164
} else {

examples/simple_trace_report.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
5454
tokio::spawn(handle_request(tracer.clone()));
5555

5656
// Start to report.
57-
let handle = tracer.reporting(async move {
58-
let _ = signal::ctrl_c().await;
59-
});
60-
61-
handle.await?;
57+
tracer
58+
.reporting(async move {
59+
let _ = signal::ctrl_c().await.unwrap();
60+
})
61+
.await?;
6262

6363
Ok(())
6464
}

src/context/tracer.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ use crate::{
2020
skywalking_proto::v3::SegmentObject,
2121
};
2222
use std::future::Future;
23+
use std::pin::Pin;
2324
use std::sync::Weak;
25+
use std::task::{Context, Poll};
2426
use std::{collections::LinkedList, sync::Arc};
2527
use tokio::sync::OnceCell;
28+
use tokio::task::JoinError;
2629
use tokio::{
2730
sync::{
2831
mpsc::{self},
@@ -58,9 +61,7 @@ pub fn create_trace_context_from_propagation(context: PropagationContext) -> Tra
5861
/// Start to reporting by global tracer, quit when shutdown_signal received.
5962
///
6063
/// Accept a `shutdown_signal` argument as a graceful shutdown signal.
61-
pub fn reporting(
62-
shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
63-
) -> JoinHandle<()> {
64+
pub fn reporting(shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static) -> Reporting {
6465
global_tracer().reporting(shutdown_signal)
6566
}
6667

@@ -142,8 +143,10 @@ impl Tracer {
142143
pub fn reporting(
143144
&self,
144145
shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
145-
) -> JoinHandle<()> {
146-
tokio::spawn(Self::do_reporting(self.clone(), shutdown_signal))
146+
) -> Reporting {
147+
Reporting {
148+
handle: tokio::spawn(self.clone().do_reporting(shutdown_signal)),
149+
}
147150
}
148151

149152
async fn do_reporting(self, shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static) {
@@ -216,6 +219,19 @@ impl WeakTracer {
216219
}
217220
}
218221

222+
/// Created by [Tracer::reporting].
223+
pub struct Reporting {
224+
handle: JoinHandle<()>,
225+
}
226+
227+
impl Future for Reporting {
228+
type Output = Result<(), JoinError>;
229+
230+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
231+
Pin::new(&mut self.handle).poll(cx)
232+
}
233+
}
234+
219235
#[cfg(test)]
220236
mod tests {
221237
use super::*;

src/reporter/grpc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::skywalking_proto::v3::{
1919
trace_segment_report_service_client::TraceSegmentReportServiceClient, SegmentObject,
2020
};
2121
use futures_util::stream;
22-
use std::collections::LinkedList;
22+
use std::{collections::LinkedList, error::Error};
2323
use tonic::{
2424
async_trait,
2525
transport::{self, Channel, Endpoint},
@@ -47,7 +47,7 @@ impl GrpcReporter {
4747

4848
#[async_trait]
4949
impl Reporter for GrpcReporter {
50-
async fn collect(&mut self, segments: LinkedList<SegmentObject>) -> crate::Result<()> {
50+
async fn collect(&mut self, segments: LinkedList<SegmentObject>) -> Result<(), Box<dyn Error>> {
5151
let stream = stream::iter(segments);
5252
self.client.collect(stream).await?;
5353
Ok(())

src/reporter/log.rs

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,58 @@
1616

1717
use super::Reporter;
1818
use crate::skywalking_proto::v3::SegmentObject;
19-
use std::collections::LinkedList;
19+
use std::{collections::LinkedList, error::Error};
2020
use tonic::async_trait;
2121

22-
pub struct LogReporter;
22+
enum Used {
23+
Println,
24+
Tracing,
25+
}
26+
27+
pub struct LogReporter {
28+
tip: String,
29+
used: Used,
30+
}
31+
32+
impl LogReporter {
33+
#[inline]
34+
pub fn new() -> Self {
35+
Default::default()
36+
}
37+
38+
pub fn tip(mut self, tip: String) -> Self {
39+
self.tip = tip;
40+
self
41+
}
42+
43+
pub fn use_tracing(mut self) -> Self {
44+
self.used = Used::Tracing;
45+
self
46+
}
47+
48+
pub fn use_println(mut self) -> Self {
49+
self.used = Used::Println;
50+
self
51+
}
52+
}
53+
54+
impl Default for LogReporter {
55+
fn default() -> Self {
56+
Self {
57+
tip: "Collect".to_string(),
58+
used: Used::Println,
59+
}
60+
}
61+
}
2362

2463
#[async_trait]
2564
impl Reporter for LogReporter {
26-
async fn collect(&mut self, segments: LinkedList<SegmentObject>) -> crate::Result<()> {
65+
async fn collect(&mut self, segments: LinkedList<SegmentObject>) -> Result<(), Box<dyn Error>> {
2766
for segment in segments {
28-
tracing::info!(?segment, "Do trace");
67+
match self.used {
68+
Used::Println => println!("{} segment={:?}", self.tip, segment),
69+
Used::Tracing => tracing::info!(?segment, "{}", self.tip),
70+
}
2971
}
3072
Ok(())
3173
}

src/reporter/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ pub mod log;
1919

2020
use crate::skywalking_proto::v3::SegmentObject;
2121
use std::collections::LinkedList;
22+
use std::{error::Error, result::Result};
2223
use tonic::async_trait;
2324

2425
pub(crate) type DynReporter = dyn Reporter + Send + Sync + 'static;
2526

2627
#[async_trait]
2728
pub trait Reporter {
28-
async fn collect(&mut self, segments: LinkedList<SegmentObject>) -> crate::Result<()>;
29+
async fn collect(&mut self, segments: LinkedList<SegmentObject>) -> Result<(), Box<dyn Error>>;
2930
}

tests/propagation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ fn invalid_sample() {
6464

6565
#[test]
6666
fn basic_encode() {
67-
let tracer = Tracer::new("mesh", "instance", LogReporter);
67+
let tracer = Tracer::new("mesh", "instance", LogReporter::new());
6868
let tc = tracer.create_trace_context();
6969
let res = encode_propagation(&tc, "/api/v1/health", "example.com:8080");
7070
let res2 = decode_propagation(&res).unwrap();

0 commit comments

Comments
 (0)