diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 27f4355..1d608ce 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -75,6 +75,7 @@ jobs: matrix: features: - "" + - "--features management" - "--features vendored" - "--all-features" runs-on: ubuntu-20.04 @@ -103,6 +104,6 @@ jobs: - name: Install protoc run: sudo apt-get install -y protobuf-compiler=3.6.1.3-2ubuntu5 - name: Install Rust toolchain - run: rustup toolchain install stable + run: rustup toolchain install nightly-2022-07-30 - name: Run docs - run: cargo rustdoc --release -- -D warnings + run: cargo +nightly-2022-07-30 rustdoc --release --all-features -- --cfg docsrs diff --git a/Cargo.toml b/Cargo.toml index 4468738..d1e5db2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,9 +31,12 @@ description = "Apache SkyWalking Rust Agent" license = "Apache-2.0" homepage = "https://skywalking.apache.org/" repository = "https://github.com/apache/skywalking-rust" +rust-version = "1.59" [features] +management = ["hostname", "systemstat"] vendored = ["protobuf-src"] + mock = [] # For internal integration testing only, do not use. [dependencies] @@ -43,10 +46,13 @@ bytes = "1.2.1" cfg-if = "1.0.0" futures-core = "0.3.21" futures-util = "0.3.21" +hostname = { version = "0.3.1", optional = true } +once_cell = "1.14.0" portable-atomic = { version = "0.3.13", features = ["float"] } prost = "0.11.0" prost-derive = "0.11.0" serde = { version = "1.0.143", features = ["derive"] } +systemstat = { version = "0.2.0", optional = true } thiserror = "1.0.32" tokio = { version = "1.20.1", features = ["parking_lot"] } tonic = { version = "0.8.0", features = ["codegen"] } @@ -73,6 +79,19 @@ required-features = ["mock"] name = "metrics" required-features = ["mock"] +[[test]] +name = "management" +required-features = ["management"] + [[example]] name = "simple_trace_report" path = "examples/simple_trace_report.rs" + +[[example]] +name = "simple_management_report" +path = "examples/simple_management_report.rs" +required-features = ["management"] + +[package.metadata.docs.rs] +rustdoc-args = ["--cfg", "docsrs"] +all-features = true diff --git a/README.md b/README.md index 97211d4..af00725 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,31 @@ LogRecord is the simple builder for the LogData, which is the Log format of Skyw - **Gauge** API represents a single numerical value. - **Histogram** API represents a summary sample observations with customized buckets. +## Management + +Reporting the extra information of the instance. + +### Report instance properties + +The method `insert_os_info` of `skywalking::management::instance::Properties` will insert the predefined os info. +In addition, you can use `insert`, `update`, and `remove` to customize your instance information. + +The predefined os info: + +| Key | Value | +| ------------------------ | ------------------------------ | +| hostname | The hostname of os. | +| ipv4 (probably multiple) | The ipv4 addresses of network. | +| language | rust | +| OS Name | Linux / Windows / Mac OS X | +| Process No. | The ID of Process. | + +### Keep alive + +Keep the instance alive in the backend analysis. +Only recommend to do separate keepAlive report when no trace and metrics needs to be reported. +Otherwise, it is duplicated. + # Example ```rust, no_run diff --git a/build.rs b/build.rs index 7ef9a20..e83a7e5 100644 --- a/build.rs +++ b/build.rs @@ -23,11 +23,13 @@ fn main() -> Result<(), Box> { .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") .compile( &[ - "./skywalking-data-collect-protocol/language-agent/Tracing.proto", "./skywalking-data-collect-protocol/language-agent/Meter.proto", + "./skywalking-data-collect-protocol/language-agent/Tracing.proto", "./skywalking-data-collect-protocol/logging/Logging.proto", + "./skywalking-data-collect-protocol/management/Management.proto", ], &["./skywalking-data-collect-protocol"], )?; + Ok(()) } diff --git a/examples/simple_management_report.rs b/examples/simple_management_report.rs new file mode 100644 index 0000000..107aa95 --- /dev/null +++ b/examples/simple_management_report.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +use skywalking::{ + management::{instance::Properties, manager::Manager}, + reporter::grpc::GrpcReporter, +}; +use std::{error::Error, time::Duration}; +use tokio::signal; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Connect to skywalking oap server. + let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?; + + // Spawn the reporting in background, with listening the graceful shutdown + // signal. + let handle = reporter + .reporting() + .await + .with_graceful_shutdown(async move { + signal::ctrl_c().await.expect("failed to listen for event"); + }) + .spawn(); + + let manager = Manager::new("service", "instance", reporter); + + // Report instance properties. + let mut props = Properties::default(); + props.insert_os_info(); + manager.report_properties(props); + + // Keep alive + manager.keep_alive(Duration::from_secs(10)); + + handle.await?; + + Ok(()) +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 09cf6d7..32766df 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -16,5 +16,5 @@ # under the License. # [toolchain] -channel = "1.57.0" +channel = "1.59.0" components = ["rustfmt", "clippy"] diff --git a/src/lib.rs b/src/lib.rs index 7d4cc0c..65999d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,10 +16,14 @@ #![warn(rust_2018_idioms)] #![warn(clippy::dbg_macro, clippy::print_stdout)] #![doc = include_str!("../README.md")] +#![cfg_attr(docsrs, feature(doc_cfg))] pub mod common; pub(crate) mod error; pub mod logging; +#[cfg(feature = "management")] +#[cfg_attr(docsrs, doc(cfg(feature = "management")))] +pub mod management; pub mod metrics; pub mod reporter; pub mod skywalking_proto; diff --git a/src/logging/logger.rs b/src/logging/logger.rs index f1bb2dc..a831058 100644 --- a/src/logging/logger.rs +++ b/src/logging/logger.rs @@ -52,14 +52,14 @@ pub struct Logger { impl Logger { /// New with service info and reporter. pub fn new( - service_name: impl ToString, - instance_name: impl ToString, + service_name: impl Into, + instance_name: impl Into, reporter: impl Report + Send + Sync + 'static, ) -> Self { Self { inner: Arc::new(Inner { - service_name: service_name.to_string(), - instance_name: instance_name.to_string(), + service_name: service_name.into(), + instance_name: instance_name.into(), reporter: Box::new(reporter), }), } @@ -78,6 +78,6 @@ impl Logger { self.service_name().to_owned(), self.instance_name().to_owned(), ); - self.inner.reporter.report(CollectItem::Log(data)); + self.inner.reporter.report(CollectItem::Log(Box::new(data))); } } diff --git a/src/logging/record.rs b/src/logging/record.rs index 02f5a83..02b5e81 100644 --- a/src/logging/record.rs +++ b/src/logging/record.rs @@ -68,26 +68,24 @@ impl LogRecord { } #[inline] - pub fn endpoint(mut self, endpoint: impl ToString) -> Self { - self.endpoint = endpoint.to_string(); + pub fn endpoint(mut self, endpoint: impl Into) -> Self { + self.endpoint = endpoint.into(); self } - pub fn add_tag(mut self, key: impl ToString, value: impl ToString) -> Self { - self.tags.push((key.to_string(), value.to_string())); + pub fn add_tag(mut self, key: impl Into, value: impl Into) -> Self { + self.tags.push((key.into(), value.into())); self } pub fn add_tags(mut self, tags: I) -> Self where - K: ToString, - V: ToString, + K: Into, + V: Into, I: IntoIterator, { - self.tags.extend( - tags.into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())), - ); + self.tags + .extend(tags.into_iter().map(|(k, v)| (k.into(), v.into()))); self } @@ -107,8 +105,8 @@ impl LogRecord { self } - pub fn content(mut self, content: impl ToString) -> Self { - self.content = content.to_string(); + pub fn content(mut self, content: impl Into) -> Self { + self.content = content.into(); self } diff --git a/src/management/instance.rs b/src/management/instance.rs new file mode 100644 index 0000000..6b72f02 --- /dev/null +++ b/src/management/instance.rs @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use crate::skywalking_proto::v3::{InstanceProperties, KeyStringValuePair}; +use once_cell::sync::Lazy; +use std::{collections::HashMap, process}; +use systemstat::{IpAddr, Platform, System}; + +static IPS: Lazy> = Lazy::new(|| { + System::new() + .networks() + .ok() + .map(|networks| { + networks + .values() + .filter(|network| { + network.name != "lo" + && !network.name.starts_with("docker") + && !network.name.starts_with("br-") + }) + .flat_map(|network| { + network.addrs.iter().filter_map(|addr| match addr.addr { + IpAddr::V4(addr) => Some(addr.to_string()), + _ => None, + }) + }) + .collect() + }) + .unwrap_or_default() +}); + +static HOST_NAME: Lazy> = Lazy::new(|| { + hostname::get() + .ok() + .and_then(|hostname| hostname.into_string().ok()) +}); + +const OS_NAME: Option<&str> = if cfg!(target_os = "linux") { + Some("Linux") +} else if cfg!(target_os = "windows") { + Some("Windows") +} else if cfg!(target_os = "macos") { + Some("Mac OS X") +} else { + None +}; + +#[derive(Debug, Default)] +pub struct Properties { + inner: HashMap>, +} + +impl Properties { + pub const KEY_HOST_NAME: &'static str = "hostname"; + pub const KEY_IPV4: &'static str = "ipv4"; + pub const KEY_LANGUAGE: &'static str = "language"; + pub const KEY_OS_NAME: &'static str = "OS Name"; + pub const KEY_PROCESS_NO: &'static str = "Process No."; +} + +impl Properties { + #[inline] + pub fn new() -> Self { + Default::default() + } + + pub fn insert(&mut self, key: impl Into, value: impl Into) { + self.inner.entry(key.into()).or_default().push(value.into()); + } + + pub fn update(&mut self, key: &str, value: impl Into) { + if let Some(values) = self.inner.get_mut(key) { + *values = vec![value.into()]; + } + } + + pub fn remove(&mut self, key: &str) { + self.inner.remove(key); + } + + pub fn insert_os_info(&mut self) { + for (key, value) in build_os_info() { + self.insert(key, value); + } + } + + pub(crate) fn convert_to_instance_properties( + self, + service_name: String, + instance_name: String, + ) -> InstanceProperties { + let mut properties = Vec::new(); + for (key, values) in self.inner { + for value in values { + properties.push(KeyStringValuePair { + key: key.clone(), + value, + }); + } + } + + InstanceProperties { + service: service_name, + service_instance: instance_name, + properties, + layer: Default::default(), + } + } +} + +fn build_os_info() -> Vec<(String, String)> { + let mut items = Vec::new(); + + if let Some(os_name) = OS_NAME.as_ref() { + items.push((Properties::KEY_OS_NAME.to_string(), os_name.to_string())); + } + + if let Some(host_name) = HOST_NAME.as_ref() { + items.push((Properties::KEY_HOST_NAME.to_string(), host_name.clone())); + } + + for ip in IPS.iter() { + items.push((Properties::KEY_IPV4.to_string(), ip.to_string())); + } + + items.push(( + Properties::KEY_PROCESS_NO.to_string(), + process::id().to_string(), + )); + + items.push((Properties::KEY_LANGUAGE.to_string(), "rust".to_string())); + + items +} diff --git a/src/management/manager.rs b/src/management/manager.rs new file mode 100644 index 0000000..fe45f79 --- /dev/null +++ b/src/management/manager.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use super::instance::Properties; +use crate::reporter::{CollectItem, DynReport, Report}; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; +use tokio::{ + spawn, + task::{JoinError, JoinHandle}, + time, +}; + +pub struct Manager { + service_name: String, + instance_name: String, + reporter: Arc, +} + +impl Manager { + /// New with service info and reporter. + pub fn new( + service_name: impl Into, + instance_name: impl Into, + reporter: impl Report + Send + Sync + 'static, + ) -> Self { + Self { + service_name: service_name.into(), + instance_name: instance_name.into(), + reporter: Arc::new(reporter), + } + } + + pub fn service_name(&self) -> &str { + &self.service_name + } + + pub fn instance_name(&self) -> &str { + &self.instance_name + } + + pub fn report_properties(&self, properties: Properties) { + let props = properties + .convert_to_instance_properties(self.service_name.clone(), self.instance_name.clone()); + self.reporter.report(CollectItem::Instance(Box::new(props))); + } + + pub fn keep_alive(&self, interval: Duration) -> KeepAlive { + let service_name = self.service_name.clone(); + let instance_name = self.instance_name.clone(); + let reporter = self.reporter.clone(); + let handle = spawn(async move { + let mut ticker = time::interval(interval); + loop { + ticker.tick().await; + + reporter.report(CollectItem::Ping(Box::new( + crate::skywalking_proto::v3::InstancePingPkg { + service: service_name.clone(), + service_instance: instance_name.clone(), + layer: Default::default(), + }, + ))); + } + }); + KeepAlive { handle } + } +} + +pub struct KeepAlive { + handle: JoinHandle<()>, +} + +impl Future for KeepAlive { + type Output = Result<(), JoinError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.handle).poll(cx) + } +} diff --git a/src/management/mod.rs b/src/management/mod.rs new file mode 100644 index 0000000..831bad1 --- /dev/null +++ b/src/management/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +pub mod instance; +pub mod manager; diff --git a/src/metrics/meter.rs b/src/metrics/meter.rs index ea97852..2ccf401 100644 --- a/src/metrics/meter.rs +++ b/src/metrics/meter.rs @@ -48,21 +48,19 @@ pub struct MeterId { } impl MeterId { - fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { - self.labels.push((key.to_string(), value.to_string())); + fn add_label(mut self, key: impl Into, value: impl Into) -> Self { + self.labels.push((key.into(), value.into())); self } fn add_labels(mut self, tags: I) -> Self where - K: ToString, - V: ToString, + K: Into, + V: Into, I: IntoIterator, { - self.labels.extend( - tags.into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())), - ); + self.labels + .extend(tags.into_iter().map(|(k, v)| (k.into(), v.into()))); self } } @@ -86,10 +84,10 @@ pub struct Counter { impl Counter { #[inline] - pub fn new(name: impl ToString) -> Self { + pub fn new(name: impl Into) -> Self { Self { id: MeterId { - name: name.to_string(), + name: name.into(), typ: MeterType::Counter, labels: vec![], }, @@ -100,7 +98,7 @@ impl Counter { } #[inline] - pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + pub fn add_label(mut self, key: impl Into, value: impl Into) -> Self { self.id = self.id.add_label(key, value); self } @@ -108,8 +106,8 @@ impl Counter { #[inline] pub fn add_labels(mut self, tags: I) -> Self where - K: ToString, - V: ToString, + K: Into, + V: Into, I: IntoIterator, { self.id = self.id.add_labels(tags); @@ -173,10 +171,10 @@ pub struct Gauge { impl f64> Gauge { #[inline] - pub fn new(name: impl ToString, getter: G) -> Self { + pub fn new(name: impl Into, getter: G) -> Self { Self { id: MeterId { - name: name.to_string(), + name: name.into(), typ: MeterType::Gauge, labels: vec![], }, @@ -185,7 +183,7 @@ impl f64> Gauge { } #[inline] - pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + pub fn add_label(mut self, key: impl Into, value: impl Into) -> Self { self.id = self.id.add_label(key, value); self } @@ -193,8 +191,8 @@ impl f64> Gauge { #[inline] pub fn add_labels(mut self, tags: I) -> Self where - K: ToString, - V: ToString, + K: Into, + V: Into, I: IntoIterator, { self.id = self.id.add_labels(tags); @@ -253,10 +251,10 @@ pub struct Histogram { } impl Histogram { - pub fn new(name: impl ToString, mut steps: Vec) -> Self { + pub fn new(name: impl Into, mut steps: Vec) -> Self { Self { id: MeterId { - name: name.to_string(), + name: name.into(), typ: MeterType::Histogram, labels: vec![], }, @@ -269,7 +267,7 @@ impl Histogram { } #[inline] - pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + pub fn add_label(mut self, key: impl Into, value: impl Into) -> Self { self.id = self.id.add_label(key, value); self } @@ -277,8 +275,8 @@ impl Histogram { #[inline] pub fn add_labels(mut self, tags: I) -> Self where - K: ToString, - V: ToString, + K: Into, + V: Into, I: IntoIterator, { self.id = self.id.add_labels(tags); diff --git a/src/metrics/metricer.rs b/src/metrics/metricer.rs index 81b8351..ad8f3de 100644 --- a/src/metrics/metricer.rs +++ b/src/metrics/metricer.rs @@ -42,13 +42,13 @@ pub struct Metricer { impl Metricer { /// New with service info and reporter. pub fn new( - service_name: impl ToString, - instance_name: impl ToString, + service_name: impl Into, + instance_name: impl Into, reporter: impl Report + Send + Sync + 'static, ) -> Self { Self { - service_name: service_name.to_string(), - instance_name: instance_name.to_string(), + service_name: service_name.into(), + instance_name: instance_name.into(), reporter: Box::new(reporter), meter_map: Default::default(), report_interval: Duration::from_secs(20), @@ -86,7 +86,7 @@ impl Metricer { for trans in metricer_.meter_map.values() { metricer_ .reporter - .report(CollectItem::Meter(trans.transform(&metricer_))); + .report(CollectItem::Meter(Box::new(trans.transform(&metricer_)))); } }) .await; diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index fcea63d..5b0843a 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -14,6 +14,8 @@ // limitations under the License. // +#[cfg(feature = "management")] +use crate::skywalking_proto::v3::management_service_client::ManagementServiceClient; use crate::{ reporter::{CollectItem, Report}, skywalking_proto::v3::{ @@ -104,6 +106,9 @@ struct Inner { trace_client: Mutex>, log_client: Mutex>, meter_client: Mutex>, + #[cfg(feature = "management")] + #[cfg_attr(docsrs, doc(cfg(feature = "management")))] + management_client: Mutex>, producer: P, consumer: Mutex>, is_reporting: AtomicBool, @@ -138,6 +143,8 @@ impl GrpcReporter { inner: Arc::new(Inner { trace_client: Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())), log_client: Mutex::new(LogReportServiceClient::new(channel.clone())), + #[cfg(feature = "management")] + management_client: Mutex::new(ManagementServiceClient::new(channel.clone())), meter_client: Mutex::new(MeterReportServiceClient::new(channel)), producer, consumer: Mutex::new(Some(consumer)), @@ -216,13 +223,43 @@ impl ReporterAndBuffer { // TODO Implement batch collect in future. match item { CollectItem::Trace(item) => { - self.trace_buffer.push_back(item); + self.trace_buffer.push_back(*item); } CollectItem::Log(item) => { - self.log_buffer.push_back(item); + self.log_buffer.push_back(*item); } CollectItem::Meter(item) => { - self.meter_buffer.push_back(item); + self.meter_buffer.push_back(*item); + } + #[cfg(feature = "management")] + CollectItem::Instance(item) => { + if let Err(e) = self + .inner + .management_client + .lock() + .await + .report_instance_properties(*item) + .await + { + if let Some(status_handle) = &self.status_handle { + status_handle(e); + } + } + } + #[cfg(feature = "management")] + CollectItem::Ping(item) => { + if let Err(e) = self + .inner + .management_client + .lock() + .await + .keep_alive(*item) + .await + { + if let Some(status_handle) = &self.status_handle { + status_handle(e); + } + } } } diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs index 79735bf..921cb02 100644 --- a/src/reporter/mod.rs +++ b/src/reporter/mod.rs @@ -17,6 +17,8 @@ pub mod grpc; pub mod print; +#[cfg(feature = "management")] +use crate::skywalking_proto::v3::{InstancePingPkg, InstanceProperties}; use crate::skywalking_proto::v3::{LogData, MeterData, SegmentObject}; use serde::{Deserialize, Serialize}; use std::{ops::Deref, sync::Arc}; @@ -25,9 +27,15 @@ use tokio::sync::OnceCell; #[derive(Debug, Serialize, Deserialize)] #[non_exhaustive] pub enum CollectItem { - Trace(SegmentObject), - Log(LogData), - Meter(MeterData), + Trace(Box), + Log(Box), + Meter(Box), + #[cfg(feature = "management")] + #[cfg_attr(docsrs, doc(cfg(feature = "management")))] + Instance(Box), + #[cfg(feature = "management")] + #[cfg_attr(docsrs, doc(cfg(feature = "management")))] + Ping(Box), } pub(crate) type DynReport = dyn Report + Send + Sync + 'static; diff --git a/src/reporter/print.rs b/src/reporter/print.rs index e5b640f..6124c46 100644 --- a/src/reporter/print.rs +++ b/src/reporter/print.rs @@ -57,6 +57,22 @@ impl Report for PrintReporter { println!("meter data={:?}", data); } } + #[cfg(feature = "management")] + CollectItem::Instance(data) => { + if self.use_stderr { + eprintln!("instance properties data={:?}", data); + } else { + println!("instance properties data={:?}", data); + } + } + #[cfg(feature = "management")] + CollectItem::Ping(data) => { + if self.use_stderr { + eprintln!("ping data={:?}", data); + } else { + println!("ping data={:?}", data); + } + } } } } diff --git a/src/skywalking_proto/v3/mod.rs b/src/skywalking_proto/v3/mod.rs index 8623eaa..dc74965 100644 --- a/src/skywalking_proto/v3/mod.rs +++ b/src/skywalking_proto/v3/mod.rs @@ -21,8 +21,8 @@ impl SpanObject { /// Add logs to the span. pub fn add_log(&mut self, message: I) where - K: ToString, - V: ToString, + K: Into, + V: Into, I: IntoIterator, { let log = Log { @@ -32,8 +32,8 @@ impl SpanObject { .map(|v| { let (key, value) = v; KeyStringValuePair { - key: key.to_string(), - value: value.to_string(), + key: key.into(), + value: value.into(), } }) .collect(), @@ -42,10 +42,10 @@ impl SpanObject { } /// Add tag to the span. - pub fn add_tag(&mut self, key: impl ToString, value: impl ToString) { + pub fn add_tag(&mut self, key: impl Into, value: impl Into) { self.tags.push(KeyStringValuePair { - key: key.to_string(), - value: value.to_string(), + key: key.into(), + value: value.into(), }); } } diff --git a/src/trace/span.rs b/src/trace/span.rs index ce49c6b..84d97fa 100644 --- a/src/trace/span.rs +++ b/src/trace/span.rs @@ -141,15 +141,15 @@ impl Span { /// Add logs to the span. pub fn add_log(&mut self, message: I) where - K: ToString, - V: ToString, + K: Into, + V: Into, I: IntoIterator, { self.with_span_object_mut(|span| span.add_log(message)) } /// Add tag to the span. - pub fn add_tag(&mut self, key: impl ToString, value: impl ToString) { + pub fn add_tag(&mut self, key: impl Into, value: impl Into) { self.with_span_object_mut(|span| span.add_tag(key, value)) } } diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs index 659b3de..d0daa94 100644 --- a/src/trace/trace_context.rs +++ b/src/trace/trace_context.rs @@ -119,15 +119,15 @@ impl std::fmt::Debug for TracingContext { impl TracingContext { /// Generate a new trace context. pub(crate) fn new( - service_name: impl ToString, - instance_name: impl ToString, + service_name: impl Into, + instance_name: impl Into, tracer: WeakTracer, ) -> Self { TracingContext { trace_id: RandomGenerator::generate(), trace_segment_id: RandomGenerator::generate(), - service: service_name.to_string(), - service_instance: instance_name.to_string(), + service: service_name.into(), + service_instance: instance_name.into(), next_span_id: Default::default(), span_stack: Default::default(), primary_endpoint_name: Default::default(), diff --git a/src/trace/tracer.rs b/src/trace/tracer.rs index 963d270..599ec95 100644 --- a/src/trace/tracer.rs +++ b/src/trace/tracer.rs @@ -55,14 +55,14 @@ pub struct Tracer { impl Tracer { /// New with service info and reporter. pub fn new( - service_name: impl ToString, - instance_name: impl ToString, + service_name: impl Into, + instance_name: impl Into, reporter: impl Report + Send + Sync + 'static, ) -> Self { Self { inner: Arc::new(Inner { - service_name: service_name.to_string(), - instance_name: instance_name.to_string(), + service_name: service_name.into(), + instance_name: instance_name.into(), reporter: Box::new(reporter), }), } @@ -90,7 +90,7 @@ impl Tracer { let segment_object = context.convert_to_segment_object(); self.inner .reporter - .report(CollectItem::Trace(segment_object)); + .report(CollectItem::Trace(Box::new(segment_object))); } fn downgrade(&self) -> WeakTracer { diff --git a/tests/logging.rs b/tests/logging.rs index e909bf6..53483d2 100644 --- a/tests/logging.rs +++ b/tests/logging.rs @@ -161,7 +161,7 @@ impl Report for MockReporter { fn report(&self, item: CollectItem) { match item { CollectItem::Log(data) => { - self.items.try_lock().unwrap().push_back(data); + self.items.try_lock().unwrap().push_back(*data); } _ => {} } diff --git a/tests/management.rs b/tests/management.rs new file mode 100644 index 0000000..5b10419 --- /dev/null +++ b/tests/management.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use skywalking::{ + management::{instance::Properties, manager::Manager}, + reporter::{CollectItem, Report}, + skywalking_proto::v3::{InstancePingPkg, InstanceProperties, KeyStringValuePair}, +}; +use std::{ + collections::LinkedList, + process, + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::time::sleep; + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn management() { + let reporter = Arc::new(MockReporter::default()); + let manager = Manager::new("service_name", "instance_name", reporter.clone()); + manager.keep_alive(Duration::from_secs(60)); + + { + let mut props = Properties::new(); + props.insert_os_info(); + manager.report_properties(props); + + let actual_props = reporter.pop_ins_props(); + assert_eq!(actual_props.service, "service_name".to_owned()); + assert_eq!(actual_props.service_instance, "instance_name".to_owned()); + assert_eq!( + kvs_get_value(&actual_props.properties, Properties::KEY_LANGUAGE), + "rust" + ); + assert_eq!( + kvs_get_value(&actual_props.properties, Properties::KEY_HOST_NAME), + hostname::get().unwrap() + ); + assert_eq!( + kvs_get_value(&actual_props.properties, Properties::KEY_PROCESS_NO), + process::id().to_string() + ); + } + + { + sleep(Duration::from_secs(1)).await; + assert_eq!( + reporter.pop_ping(), + InstancePingPkg { + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + ..Default::default() + } + ); + } +} + +fn kvs_get_value<'a>(kvs: &'a [KeyStringValuePair], key: &str) -> &'a str { + &kvs.iter().find(|kv| kv.key == key).unwrap().value +} + +#[derive(Default, Clone)] +struct MockReporter { + props_items: Arc>>, + ping_items: Arc>>, +} + +impl MockReporter { + fn pop_ins_props(&self) -> InstanceProperties { + self.props_items.try_lock().unwrap().pop_back().unwrap() + } + + fn pop_ping(&self) -> InstancePingPkg { + self.ping_items.try_lock().unwrap().pop_back().unwrap() + } +} + +impl Report for MockReporter { + fn report(&self, item: CollectItem) { + match item { + CollectItem::Instance(data) => { + self.props_items.try_lock().unwrap().push_back(*data); + } + CollectItem::Ping(data) => { + self.ping_items.try_lock().unwrap().push_back(*data); + } + _ => {} + } + } +} diff --git a/tests/metrics.rs b/tests/metrics.rs index f1d95d0..43246ec 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -166,7 +166,7 @@ impl Report for MockReporter { fn report(&self, item: CollectItem) { match item { CollectItem::Meter(data) => { - self.items.try_lock().unwrap().push_back(data); + self.items.try_lock().unwrap().push_back(*data); } _ => {} } diff --git a/tests/trace_context.rs b/tests/trace_context.rs index b967e53..4603ad9 100644 --- a/tests/trace_context.rs +++ b/tests/trace_context.rs @@ -387,6 +387,6 @@ impl Report for MockReporter { CollectItem::Trace(segment) => segment, _ => unreachable!(), }; - self.segments.try_lock().unwrap().push_back(segment); + self.segments.try_lock().unwrap().push_back(*segment); } }