Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/codecov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ jobs:
toolchain: stable
override: true
- uses: actions-rs/[email protected]
with:
version: 0.22.0
- uses: codecov/[email protected]
17 changes: 10 additions & 7 deletions examples/simple_management_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ async fn main() -> Result<(), Box<dyn Error>> {

let manager = Manager::new("service", "instance", reporter);

// Report instance properties.
let mut props = Properties::default();
props.insert_os_info();
manager.report_properties(props);

// Keep alive
manager.keep_alive(Duration::from_secs(10));
// Report instance properties and keep alive.
manager.report_and_keep_alive(
|| {
let mut props = Properties::default();
props.insert_os_info();
props
},
Duration::from_secs(30),
10,
);

handle.await?;

Expand Down
101 changes: 84 additions & 17 deletions src/management/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,40 +64,107 @@ impl Manager {

/// Report instance properties.
pub fn report_properties(&self, properties: Properties) {
let props = properties
.convert_to_instance_properties(self.service_name.clone(), self.instance_name.clone());
self.reporter.report(CollectItem::Instance(Box::new(props)));
Self::reporter_report_properties(
&self.reporter,
self.service_name.clone(),
self.instance_name.clone(),
properties,
);
}

/// Do keep alive (heartbeat), with the interval, will be run in background.
pub fn keep_alive(&self, interval: Duration) -> KeepAlive {
fn reporter_report_properties(
reporter: &Arc<DynReport>,
service_name: String,
instance_name: String,
properties: Properties,
) {
let props = properties.convert_to_instance_properties(service_name, instance_name);
reporter.report(CollectItem::Instance(Box::new(props)));
}

/// Do keep alive once.
pub fn keep_alive(&self) {
Self::reporter_keep_alive(
&self.reporter,
self.service_name.clone(),
self.instance_name.clone(),
);
}

fn reporter_keep_alive(reporter: &Arc<DynReport>, service_name: String, instance_name: String) {
reporter.report(CollectItem::Ping(Box::new(
crate::skywalking_proto::v3::InstancePingPkg {
service: service_name,
service_instance: instance_name,
layer: Default::default(),
},
)));
}

/// Continuously report instance properties and keep alive. Run in
/// background.
///
/// Parameter `heartbeat_period` represents agent heartbeat report period.
///
/// Parameter `properties_report_period_factor` represents agent sends the
/// instance properties to the backend every `heartbeat_period` *
/// `properties_report_period_factor` seconds.
pub fn report_and_keep_alive(
&self,
properties: impl Fn() -> Properties + Send + 'static,
heartbeat_period: Duration,
properties_report_period_factor: usize,
) -> ReportAndKeepAlive {
let service_name = self.service_name.clone();
let instance_name = self.instance_name.clone();
let reporter = self.reporter.clone();

let handle = spawn(async move {
let mut ticker = time::interval(interval);
let mut counter = 0;

let mut ticker = time::interval(heartbeat_period);
loop {
ticker.tick().await;

reporter.report(CollectItem::Ping(Box::new(
crate::skywalking_proto::v3::InstancePingPkg {
service: service_name.clone(),
service_instance: instance_name.clone(),
layer: Default::default(),
},
)));
if counter == 0 {
Self::reporter_report_properties(
&reporter,
service_name.clone(),
instance_name.clone(),
properties(),
);
} else {
Self::reporter_keep_alive(
&reporter,
service_name.clone(),
instance_name.clone(),
);
}

counter += 1;

if counter >= properties_report_period_factor {
counter = 0;
}
}
});
KeepAlive { handle }
ReportAndKeepAlive { handle }
}
}

/// Handle of [Manager::keep_alive].
pub struct KeepAlive {
/// Handle of [Manager::report_and_keep_alive].
pub struct ReportAndKeepAlive {
handle: JoinHandle<()>,
}

impl Future for KeepAlive {
impl ReportAndKeepAlive {
/// Get the inner tokio join handle.
pub fn handle(&self) -> &JoinHandle<()> {
&self.handle
}
}

impl Future for ReportAndKeepAlive {
type Output = Result<(), JoinError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down
90 changes: 77 additions & 13 deletions tests/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,23 @@ use tokio::time::sleep;
async fn management() {
let reporter = Arc::new(MockReporter::default());
let manager = Manager::new("service_name", "instance_name", reporter.clone());
manager.keep_alive(Duration::from_secs(60));
let handling = manager.report_and_keep_alive(
|| {
let mut props = Properties::new();
props.insert_os_info();
props
},
Duration::from_millis(100),
3,
);

{
let mut props = Properties::new();
props.insert_os_info();
manager.report_properties(props);
sleep(Duration::from_secs(1)).await;

handling.handle().abort();

sleep(Duration::from_secs(1)).await;

{
let actual_props = reporter.pop_ins_props();
assert_eq!(actual_props.service, "service_name".to_owned());
assert_eq!(actual_props.service_instance, "instance_name".to_owned());
Expand All @@ -56,7 +66,6 @@ async fn management() {
}

{
sleep(Duration::from_secs(1)).await;
assert_eq!(
reporter.pop_ping(),
InstancePingPkg {
Expand All @@ -66,38 +75,93 @@ async fn management() {
}
);
}

{
reporter.pop_ping();
}

{
reporter.pop_ins_props();
}

{
reporter.pop_ping();
}

{
reporter.pop_ping();
}
}

fn kvs_get_value<'a>(kvs: &'a [KeyStringValuePair], key: &str) -> &'a str {
&kvs.iter().find(|kv| kv.key == key).unwrap().value
}

#[derive(Debug)]
enum Item {
Properties(InstanceProperties),
PingPkg(InstancePingPkg),
}

impl Item {
fn unwrap_properties(self) -> InstanceProperties {
match self {
Item::Properties(props) => props,
Item::PingPkg(_) => panic!("isn't properties"),
}
}

fn unwrap_ping_pkg(self) -> InstancePingPkg {
match self {
Item::Properties(_) => panic!("isn't ping pkg"),
Item::PingPkg(p) => p,
}
}
}

#[derive(Default, Clone)]
struct MockReporter {
props_items: Arc<Mutex<LinkedList<InstanceProperties>>>,
ping_items: Arc<Mutex<LinkedList<InstancePingPkg>>>,
items: Arc<Mutex<LinkedList<Item>>>,
}

impl MockReporter {
fn pop_ins_props(&self) -> InstanceProperties {
self.props_items.try_lock().unwrap().pop_back().unwrap()
self.items
.try_lock()
.unwrap()
.pop_front()
.unwrap()
.unwrap_properties()
}

fn pop_ping(&self) -> InstancePingPkg {
self.ping_items.try_lock().unwrap().pop_back().unwrap()
self.items
.try_lock()
.unwrap()
.pop_front()
.unwrap()
.unwrap_ping_pkg()
}
}

impl Report for MockReporter {
fn report(&self, item: CollectItem) {
match item {
CollectItem::Instance(data) => {
self.props_items.try_lock().unwrap().push_back(*data);
self.items
.try_lock()
.unwrap()
.push_back(Item::Properties(*data));
}
CollectItem::Ping(data) => {
self.ping_items.try_lock().unwrap().push_back(*data);
self.items
.try_lock()
.unwrap()
.push_back(Item::PingPkg(*data));
}
_ => {
unreachable!("unknown collect item type");
}
_ => {}
}
}
}