Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ jobs:
matrix:
features:
- ""
- "--features management"
- "--features vendored"
- "--all-features"
runs-on: ubuntu-20.04
Expand Down Expand Up @@ -103,6 +104,6 @@ jobs:
- name: Install protoc
run: sudo apt-get install -y protobuf-compiler=3.6.1.3-2ubuntu5
- name: Install Rust toolchain
run: rustup toolchain install stable
run: rustup toolchain install nightly-2022-07-30
- name: Run docs
run: cargo rustdoc --release -- -D warnings
run: cargo +nightly-2022-07-30 rustdoc --release --all-features -- --cfg docsrs
19 changes: 19 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ description = "Apache SkyWalking Rust Agent"
license = "Apache-2.0"
homepage = "https://skywalking.apache.org/"
repository = "https://github.com/apache/skywalking-rust"
rust-version = "1.59"

[features]
management = ["hostname", "systemstat"]
vendored = ["protobuf-src"]

mock = [] # For internal integration testing only, do not use.

[dependencies]
Expand All @@ -43,10 +46,13 @@ bytes = "1.2.1"
cfg-if = "1.0.0"
futures-core = "0.3.21"
futures-util = "0.3.21"
hostname = { version = "0.3.1", optional = true }
once_cell = "1.14.0"
portable-atomic = { version = "0.3.13", features = ["float"] }
prost = "0.11.0"
prost-derive = "0.11.0"
serde = { version = "1.0.143", features = ["derive"] }
systemstat = { version = "0.2.0", optional = true }
thiserror = "1.0.32"
tokio = { version = "1.20.1", features = ["parking_lot"] }
tonic = { version = "0.8.0", features = ["codegen"] }
Expand All @@ -73,6 +79,19 @@ required-features = ["mock"]
name = "metrics"
required-features = ["mock"]

[[test]]
name = "management"
required-features = ["management"]

[[example]]
name = "simple_trace_report"
path = "examples/simple_trace_report.rs"

[[example]]
name = "simple_management_report"
path = "examples/simple_management_report.rs"
required-features = ["management"]

[package.metadata.docs.rs]
rustdoc-args = ["--cfg", "docsrs"]
all-features = true
4 changes: 3 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
.compile(
&[
"./skywalking-data-collect-protocol/language-agent/Tracing.proto",
"./skywalking-data-collect-protocol/language-agent/Meter.proto",
"./skywalking-data-collect-protocol/language-agent/Tracing.proto",
"./skywalking-data-collect-protocol/logging/Logging.proto",
"./skywalking-data-collect-protocol/management/Management.proto",
],
&["./skywalking-data-collect-protocol"],
)?;

Ok(())
}
54 changes: 54 additions & 0 deletions examples/simple_management_report.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

use skywalking::{
management::{instance::Properties, manager::Manager},
reporter::grpc::GrpcReporter,
};
use std::{error::Error, time::Duration};
use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Connect to skywalking oap server.
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;

// Spawn the reporting in background, with listening the graceful shutdown
// signal.
let handle = reporter
.reporting()
.await
.with_graceful_shutdown(async move {
signal::ctrl_c().await.expect("failed to listen for event");
})
.spawn();

let manager = Manager::new("service", "instance", reporter);

// Report instance properties.
let mut props = Properties::default();
props.insert_os_info();
manager.report_properties(props);

// Keep alive
manager.keep_alive(Duration::from_secs(10));

handle.await?;

Ok(())
}
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# under the License.
#
[toolchain]
channel = "1.57.0"
channel = "1.59.0"
components = ["rustfmt", "clippy"]
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
#![warn(rust_2018_idioms)]
#![warn(clippy::dbg_macro, clippy::print_stdout)]
#![doc = include_str!("../README.md")]
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod common;
pub(crate) mod error;
pub mod logging;
#[cfg(feature = "management")]
#[cfg_attr(docsrs, doc(cfg(feature = "management")))]
pub mod management;
pub mod metrics;
pub mod reporter;
pub mod skywalking_proto;
Expand Down
10 changes: 5 additions & 5 deletions src/logging/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ pub struct Logger {
impl Logger {
/// New with service info and reporter.
pub fn new(
service_name: impl ToString,
instance_name: impl ToString,
service_name: impl Into<String>,
instance_name: impl Into<String>,
reporter: impl Report + Send + Sync + 'static,
) -> Self {
Self {
inner: Arc::new(Inner {
service_name: service_name.to_string(),
instance_name: instance_name.to_string(),
service_name: service_name.into(),
instance_name: instance_name.into(),
reporter: Box::new(reporter),
}),
}
Expand All @@ -78,6 +78,6 @@ impl Logger {
self.service_name().to_owned(),
self.instance_name().to_owned(),
);
self.inner.reporter.report(CollectItem::Log(data));
self.inner.reporter.report(CollectItem::Log(Box::new(data)));
}
}
22 changes: 10 additions & 12 deletions src/logging/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,24 @@ impl LogRecord {
}

#[inline]
pub fn endpoint(mut self, endpoint: impl ToString) -> Self {
self.endpoint = endpoint.to_string();
pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = endpoint.into();
self
}

pub fn add_tag(mut self, key: impl ToString, value: impl ToString) -> Self {
self.tags.push((key.to_string(), value.to_string()));
pub fn add_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.tags.push((key.into(), value.into()));
self
}

pub fn add_tags<K, V, I>(mut self, tags: I) -> Self
where
K: ToString,
V: ToString,
K: Into<String>,
V: Into<String>,
I: IntoIterator<Item = (K, V)>,
{
self.tags.extend(
tags.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string())),
);
self.tags
.extend(tags.into_iter().map(|(k, v)| (k.into(), v.into())));
self
}

Expand All @@ -107,8 +105,8 @@ impl LogRecord {
self
}

pub fn content(mut self, content: impl ToString) -> Self {
self.content = content.to_string();
pub fn content(mut self, content: impl Into<String>) -> Self {
self.content = content.into();
self
}

Expand Down
147 changes: 147 additions & 0 deletions src/management/instance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use crate::skywalking_proto::v3::{InstanceProperties, KeyStringValuePair};
use once_cell::sync::Lazy;
use std::{collections::HashMap, process};
use systemstat::{IpAddr, Platform, System};

pub static IPS: Lazy<Vec<String>> = Lazy::new(|| {
System::new()
.networks()
.ok()
.map(|networks| {
networks
.values()
.filter(|network| {
network.name != "lo"
&& !network.name.starts_with("docker")
&& !network.name.starts_with("br-")
})
.flat_map(|network| {
network.addrs.iter().filter_map(|addr| match addr.addr {
IpAddr::V4(addr) => Some(addr.to_string()),
_ => None,
})
})
.collect()
})
.unwrap_or_default()
});

pub static HOST_NAME: Lazy<Option<String>> = Lazy::new(|| {
hostname::get()
.ok()
.and_then(|hostname| hostname.into_string().ok())
});

pub const OS_NAME: Option<&str> = if cfg!(target_os = "linux") {
Some("Linux")
} else if cfg!(target_os = "windows") {
Some("Windows")
} else if cfg!(target_os = "macos") {
Some("Mac OS X")
} else {
None
};

#[derive(Debug, Default)]
pub struct Properties {
inner: HashMap<String, Vec<String>>,
}

impl Properties {
pub const KEY_HOST_NAME: &'static str = "hostname";
pub const KEY_IPV4: &'static str = "ipv4";
pub const KEY_LANGUAGE: &'static str = "language";
pub const KEY_OS_NAME: &'static str = "OS Name";
pub const KEY_PROCESS_NO: &'static str = "Process No.";
}

impl Properties {
#[inline]
pub fn new() -> Self {
Default::default()
}

pub fn insert(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.inner.entry(key.into()).or_default().push(value.into());
}

pub fn update(&mut self, key: &str, value: impl Into<String>) {
if let Some(values) = self.inner.get_mut(key) {
*values = vec![value.into()];
}
}

pub fn remove(&mut self, key: &str) {
self.inner.remove(key);
}

pub fn insert_os_info(&mut self) {
for (key, value) in build_os_info() {
self.insert(key, value);
}
}

pub(crate) fn convert_to_instance_properties(
self,
service_name: String,
instance_name: String,
) -> InstanceProperties {
let mut properties = Vec::new();
for (key, values) in self.inner {
for value in values {
properties.push(KeyStringValuePair {
key: key.clone(),
value,
});
}
}

InstanceProperties {
service: service_name,
service_instance: instance_name,
properties,
layer: Default::default(),
}
}
}

fn build_os_info() -> Vec<(String, String)> {
let mut items = Vec::new();

if let Some(os_name) = OS_NAME.as_ref() {
items.push((Properties::KEY_OS_NAME.to_string(), os_name.to_string()));
}

if let Some(host_name) = HOST_NAME.as_ref() {
items.push((Properties::KEY_HOST_NAME.to_string(), host_name.clone()));
}

for ip in IPS.iter() {
items.push((Properties::KEY_IPV4.to_string(), ip.to_string()));
}

items.push((
Properties::KEY_PROCESS_NO.to_string(),
process::id().to_string(),
));

items.push((Properties::KEY_LANGUAGE.to_string(), "rust".to_string()));

items
}
Loading