From 0646dd73dc9d027b438dcbb316c8760c025c3725 Mon Sep 17 00:00:00 2001 From: Yun Li Date: Fri, 19 Sep 2025 07:03:38 +0000 Subject: [PATCH 1/5] Integrate SHOW API probe into telemetry watchdog --- .../docker-telemetry-watchdog/Dockerfile.j2 | 3 +- .../watchdog/src/cmd_list.json | 4 + .../watchdog/src/main.rs | 259 +++++++++++++++++- 3 files changed, 250 insertions(+), 16 deletions(-) create mode 100644 dockers/docker-telemetry-watchdog/watchdog/src/cmd_list.json diff --git a/dockers/docker-telemetry-watchdog/Dockerfile.j2 b/dockers/docker-telemetry-watchdog/Dockerfile.j2 index 49fdc798276..9943ec1297d 100644 --- a/dockers/docker-telemetry-watchdog/Dockerfile.j2 +++ b/dockers/docker-telemetry-watchdog/Dockerfile.j2 @@ -1,5 +1,5 @@ {% from "dockers/dockerfile-macros.j2" import install_debian_packages, install_python_wheels, copy_files %} -ARG BASE=docker-config-engine-bookworm-{{DOCKER_USERNAME}}:{{DOCKER_USERTAG}} +ARG BASE=docker-sonic-gnmi-{{DOCKER_USERNAME}}:{{DOCKER_USERTAG}} FROM $BASE AS builder @@ -36,6 +36,7 @@ COPY ["supervisord.conf", "/etc/supervisor/conf.d/"] # Copy the compiled Rust binary from the builder stage COPY --from=builder /watchdog/target/release/telemetry_watchdog /usr/bin/telemetry_watchdog RUN chmod +x /usr/bin/telemetry_watchdog +COPY --from=builder /watchdog/src/cmd_list.json /cmd_list.json FROM $BASE diff --git a/dockers/docker-telemetry-watchdog/watchdog/src/cmd_list.json b/dockers/docker-telemetry-watchdog/watchdog/src/cmd_list.json new file mode 100644 index 00000000000..8efcf11417e --- /dev/null +++ b/dockers/docker-telemetry-watchdog/watchdog/src/cmd_list.json @@ -0,0 +1,4 @@ +{ + "xpaths": [ + ] +} \ No newline at end of file diff --git a/dockers/docker-telemetry-watchdog/watchdog/src/main.rs b/dockers/docker-telemetry-watchdog/watchdog/src/main.rs index 9e5149539c6..1b47dcb3b59 100644 --- a/dockers/docker-telemetry-watchdog/watchdog/src/main.rs +++ b/dockers/docker-telemetry-watchdog/watchdog/src/main.rs @@ -1,5 +1,9 @@ use std::io::{BufRead, BufReader, Write}; use std::net::{TcpListener, TcpStream}; +use std::time::{Duration, Instant}; +use std::process::{Command, Stdio}; +use std::fs; +use std::env; use serde::Serialize; use redis::Commands; @@ -10,6 +14,221 @@ const DEFAULT_TELEMETRY_SERVICE_PORT: u16 = 50051; #[derive(Serialize)] struct HealthStatus { check_telemetry_port: String, + xpath_commands: Vec, +} + +#[derive(Serialize, Clone)] +struct CommandResult { + xpath: String, + success: bool, + error: Option, + duration_ms: u128, +} + +const CMD_LIST_JSON: &str = "/cmd_list.json"; // absolute path inside container +const XPATH_ENV_VAR: &str = "TELEMETRY_WATCHDOG_XPATHS"; // comma-separated list +const XPATH_ENV_BLACKLIST: &str = "TELEMETRY_WATCHDOG_XPATHS_BLACKLIST"; // comma-separated list to exclude +const CMD_TIMEOUT_ENV_VAR: &str = "TELEMETRY_WATCHDOG_CMD_TIMEOUT_SECS"; // per-command timeout seconds +const GNMI_BASE_CMD: &str = "gnmi_get"; // assumed in PATH +const TARGET_NAME_ENV_VAR: &str = "TELEMETRY_WATCHDOG_TARGET_NAME"; // optional override for target_name +const DEFAULT_TARGET_NAME: &str = "server.ndastreaming.ap.gbl"; +const DEFAULT_CA_CRT: &str = "/etc/sonic/telemetry/dsmsroot.cer"; +const DEFAULT_SERVER_CRT: &str = "/etc/sonic/telemetry/streamingtelemetryserver.cer"; +const DEFAULT_SERVER_KEY: &str = "/etc/sonic/telemetry/streamingtelemetryserver.key"; + +// Configuration: +// 1. JSON file (/cmd_list.json) optional. Format: +// { +// "xpaths": [ +// "reboot-cause/history" +// ] +// } +// 2. Environment variable TELEMETRY_WATCHDOG_XPATHS optional. Comma-separated list of xpaths. +// Both sources are merged; duplicates removed (first occurrence kept). +// During the probe request, after verifying the GNMI port is reachable, each xpath results in a command: +// gnmi_get -xpath_target SHOW -xpath -target_addr 127.0.0.1: -logtostderr [ -ca -cert -key -target_name | -insecure true ] +// Cert paths and client_auth/target_name are pulled from Redis hashes (TELEMETRY|certs, TELEMETRY|gnmi). +// client_auth now: ONLY explicit Redis value "true" (case-insensitive) enables TLS client auth; anything else (missing/other value) -> insecure. +// Any failure (spawn error or non-zero exit status) causes overall HTTP 500 with per-xpath results in JSON body. + +fn load_xpath_list() -> Vec { + let mut list: Vec = Vec::new(); + + // JSON file expected format: { "xpaths": ["reboot-cause/history", "lldp/neighbors"] } + match fs::read_to_string(CMD_LIST_JSON) { + Ok(content) => { + #[derive(serde::Deserialize)] + struct JsonCfg { xpaths: Option> } + match serde_json::from_str::(&content) { + Ok(cfg) => { + if let Some(mut xs) = cfg.xpaths { list.append(&mut xs); } + }, + Err(e) => eprintln!("Failed to parse {}: {}", CMD_LIST_JSON, e), + } + }, + Err(e) => eprintln!("Could not read {}: {} (will continue with env var only)", CMD_LIST_JSON, e), + } + + if let Ok(env_val) = env::var(XPATH_ENV_VAR) { + for part in env_val.split(',') { let trimmed = part.trim(); if !trimmed.is_empty() { list.push(trimmed.to_string()); } } + } + + // dedupe while preserving order + let mut seen = std::collections::HashSet::new(); + list.retain(|x| seen.insert(x.clone())); + // apply blacklist from env + if let Ok(blacklist) = env::var(XPATH_ENV_BLACKLIST) { + let mut blk = std::collections::HashSet::new(); + for part in blacklist.split(',') { + let trimmed = part.trim(); + if !trimmed.is_empty() { blk.insert(trimmed.to_string()); } + } + if !blk.is_empty() { + list.retain(|x| !blk.contains(x)); + } + } + list +} + +struct TelemetrySecurityConfig { + use_client_auth: bool, + ca_crt: String, + server_crt: String, + server_key: String, +} + +fn run_gnmi_for_xpath(xpath: &str, port: u16, sec: &TelemetrySecurityConfig, target_name: &str, timeout: Duration) -> CommandResult { + // Build full command: gnmi_get -xpath_target SHOW -xpath -target_addr 127.0.0.1: -logtostderr -insecure true + let addr = format!("127.0.0.1:{port}"); + let start = Instant::now(); + let mut cmd = Command::new(GNMI_BASE_CMD); + cmd.arg("-xpath_target").arg("SHOW") + .arg("-xpath").arg(xpath) + .arg("-target_addr").arg(addr) + .arg("-logtostderr") + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + if sec.use_client_auth { + cmd.arg("-ca").arg(&sec.ca_crt) + .arg("-cert").arg(&sec.server_crt) + .arg("-key").arg(&sec.server_key) + .arg("-target_name").arg(target_name); + } else { + // no client auth -> insecure mode + cmd.arg("-insecure").arg("true"); + } + // Enforce timeout + let mut child = match cmd.spawn() { + Ok(c) => c, + Err(e) => { + let dur = start.elapsed().as_millis(); + eprintln!("Failed to spawn gnmi_get for {}: {}", xpath, e); + return CommandResult { xpath: xpath.to_string(), success: false, error: Some(e.to_string()), duration_ms: dur }; + } + }; + + let output = { + let start_wait = Instant::now(); + loop { + match child.try_wait() { + Ok(Some(_status)) => { + // Process exited; collect output + match child.wait_with_output() { + Ok(out) => break Ok(out), + Err(e) => break Err(e), + } + } + Ok(None) => { + if start_wait.elapsed() >= timeout { + // kill on timeout + let _ = child.kill(); + let _ = child.wait(); + break Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "gnmi_get timed out")); + } + std::thread::sleep(Duration::from_millis(50)); + } + Err(e) => { + break Err(e); + } + } + } + }; + let dur = start.elapsed().as_millis(); + + match output { + Ok(out) => { + if out.status.success() { + println!("gnmi_get success xpath={}", xpath); + CommandResult { xpath: xpath.to_string(), success: true, error: None, duration_ms: dur } + } else { + let stderr = String::from_utf8_lossy(&out.stderr).to_string(); + eprintln!("gnmi_get failed xpath={} status={:?} err={}", xpath, out.status.code(), stderr); + CommandResult { xpath: xpath.to_string(), success: false, error: Some(stderr), duration_ms: dur } + } + }, + Err(e) => { + eprintln!("Failed to spawn gnmi_get for {}: {}", xpath, e); + CommandResult { xpath: xpath.to_string(), success: false, error: Some(e.to_string()), duration_ms: dur } + } + } +} + +fn get_security_config() -> TelemetrySecurityConfig { + // Redis DB 4 hashes: + // TELEMETRY|certs: ca_crt, server_crt, server_key + // TELEMETRY|gnmi: client_auth, target_name (target_name new; if absent we still proceed) + let client = match redis::Client::open("redis://127.0.0.1:6379/4") { + Ok(c) => c, + Err(e) => { + eprintln!("Redis client error (security): {e}"); + return TelemetrySecurityConfig { use_client_auth: false, ca_crt: DEFAULT_CA_CRT.to_string(), server_crt: DEFAULT_SERVER_CRT.to_string(), server_key: DEFAULT_SERVER_KEY.to_string() }; + } + }; + let mut conn = match client.get_connection() { + Ok(c) => c, + Err(e) => { + eprintln!("Redis connection error (security): {e}"); + return TelemetrySecurityConfig { use_client_auth: false, ca_crt: DEFAULT_CA_CRT.to_string(), server_crt: DEFAULT_SERVER_CRT.to_string(), server_key: DEFAULT_SERVER_KEY.to_string() }; + } + }; + + let mut get_field = |hash: &str, field: &str| -> Option { + let r: redis::RedisResult> = conn.hget(hash, field); + match r { Ok(v) => v, Err(e) => { eprintln!("Redis HGET error {hash}.{field}: {e}"); None } } + }; + + let ca_crt = get_field("TELEMETRY|certs", "ca_crt") + .filter(|v| !v.trim().is_empty()) + .unwrap_or_else(|| DEFAULT_CA_CRT.to_string()); + let server_crt = get_field("TELEMETRY|certs", "server_crt") + .filter(|v| !v.trim().is_empty()) + .unwrap_or_else(|| DEFAULT_SERVER_CRT.to_string()); + let server_key = get_field("TELEMETRY|certs", "server_key") + .filter(|v| !v.trim().is_empty()) + .unwrap_or_else(|| DEFAULT_SERVER_KEY.to_string()); + let client_auth_opt = get_field("TELEMETRY|gnmi", "client_auth"); + // Only explicit "true" turns on client auth; everything else (including None) -> false + let use_client_auth = matches!(client_auth_opt.as_ref(), Some(v) if v.eq_ignore_ascii_case("true")); + TelemetrySecurityConfig { use_client_auth, ca_crt, server_crt, server_key } +} + +fn get_target_name() -> String { + match env::var(TARGET_NAME_ENV_VAR) { + Ok(v) if !v.trim().is_empty() => v.trim().to_string(), + _ => DEFAULT_TARGET_NAME.to_string(), + } +} + +fn read_timeout() -> Duration { + const DEFAULT_SECS: u64 = 5; + match env::var(CMD_TIMEOUT_ENV_VAR) { + Ok(val) => match val.trim().parse::() { + Ok(secs) if secs > 0 => Duration::from_secs(secs), + _ => Duration::from_secs(DEFAULT_SECS), + }, + Err(_) => Duration::from_secs(DEFAULT_SECS), + } } fn get_gnmi_port() -> u16 { @@ -108,22 +327,32 @@ fn main() { let telemetry_enabled = is_telemetry_enabled(); - let (result_string, http_status) = if !telemetry_enabled { - ("SKIPPED: feature disabled".to_string(), "HTTP/1.1 200 OK") + let mut http_status = "HTTP/1.1 200 OK"; + let check_port_result; + let mut cmd_results: Vec = Vec::new(); + + if !telemetry_enabled { + check_port_result = "SKIPPED: feature disabled".to_string(); } else { - let port_result = check_telemetry_port(); - let ok = port_result.starts_with("OK"); - let status_line = if ok { - "HTTP/1.1 200 OK" - } else { - "HTTP/1.1 500 Internal Server Error" - }; - (port_result, status_line) - }; - - let status = HealthStatus { - check_telemetry_port: result_string, - }; + check_port_result = check_telemetry_port(); + if !check_port_result.starts_with("OK") { http_status = "HTTP/1.1 500 Internal Server Error"; } + + // Only run xpath commands if port is OK + if http_status == "HTTP/1.1 200 OK" { + let xpaths = load_xpath_list(); + let port = get_gnmi_port(); + let sec_cfg = get_security_config(); + let timeout = read_timeout(); + let target_name = get_target_name(); + for xp in xpaths { + let res = run_gnmi_for_xpath(&xp, port, &sec_cfg, &target_name, timeout); + if !res.success { http_status = "HTTP/1.1 500 Internal Server Error"; } + cmd_results.push(res); + } + } + } + + let status = HealthStatus { check_telemetry_port: check_port_result, xpath_commands: cmd_results }; let json_body = serde_json::to_string(&status).unwrap(); let response = format!( From 6d9a9e29c6d4d5472cac374aff81761cb712c33d Mon Sep 17 00:00:00 2001 From: Yun Li Date: Fri, 19 Sep 2025 10:03:58 +0000 Subject: [PATCH 2/5] Build telemetry-watchdog after gnmi --- rules/docker-telemetry-watchdog.mk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rules/docker-telemetry-watchdog.mk b/rules/docker-telemetry-watchdog.mk index e8f6500f264..4f22e2ca63f 100644 --- a/rules/docker-telemetry-watchdog.mk +++ b/rules/docker-telemetry-watchdog.mk @@ -4,7 +4,7 @@ DOCKER_TELEMETRY_WATCHDOG_STEM = docker-telemetry-watchdog DOCKER_TELEMETRY_WATCHDOG = $(DOCKER_TELEMETRY_WATCHDOG_STEM).gz DOCKER_TELEMETRY_WATCHDOG_DBG = $(DOCKER_TELEMETRY_WATCHDOG_STEM)-$(DBG_IMAGE_MARK).gz -$(DOCKER_TELEMETRY_WATCHDOG)_LOAD_DOCKERS = $(DOCKER_CONFIG_ENGINE_BOOKWORM) +$(DOCKER_TELEMETRY_WATCHDOG)_LOAD_DOCKERS = $(DOCKER_GNMI) $(DOCKER_TELEMETRY_WATCHDOG)_PATH = $(DOCKERS_PATH)/$(DOCKER_TELEMETRY_WATCHDOG_STEM) From 3430b63f531a5f5641f4d41049a3adf7bdbf409b Mon Sep 17 00:00:00 2001 From: Yun Li Date: Mon, 22 Sep 2025 11:46:58 +0000 Subject: [PATCH 3/5] Fix comments --- .../watchdog/src/cmd_list.json | 56 ++++ .../watchdog/src/main.rs | 307 ++++++++++-------- rules/docker-telemetry-watchdog.mk | 5 +- 3 files changed, 228 insertions(+), 140 deletions(-) diff --git a/dockers/docker-telemetry-watchdog/watchdog/src/cmd_list.json b/dockers/docker-telemetry-watchdog/watchdog/src/cmd_list.json index 8efcf11417e..db138c5c552 100644 --- a/dockers/docker-telemetry-watchdog/watchdog/src/cmd_list.json +++ b/dockers/docker-telemetry-watchdog/watchdog/src/cmd_list.json @@ -1,4 +1,60 @@ { "xpaths": [ + "buffer_pool/persistent-watermark", + "buffer_pool/watermark", + "clock", + "clock/timezones", + "dropcounters/capabilities", + "dropcounters/configuration", + "dropcounters/counts", + "ecn", + "headroom-pool/persistent-watermark", + "headroom-pool/watermark", + "interfaces/alias", + "interfaces/counters", + "interfaces/counters/rif", + "interfaces/description", + "interfaces/fec/status", + "interfaces/flap", + "interfaces/neighbor/expected", + "interfaces/naming_mode", + "interfaces/portchannel", + "interfaces/status", + "interfaces/switchport/config", + "interfaces/switchport/status", + "interfaces/transceiver/error-status", + "interfaces/transceiver/presence", + "interfaces/transceiver/lpmode", + "ipv6/bgp/neighbors", + "ipv6/bgp/network", + "ipv6/bgp/summary", + "ipv6/fib", + "ipv6/interfaces", + "ipv6/link-local-mode", + "ipv6/prefix-list", + "ipv6/protocol", + "ipv6/route", + "lldp/neighbors", + "lldp/table", + "mac", + "mac/aging-time", + "mmu", + "processes", + "processes/memory", + "processes/summary", + "queue/counters", + "queue/wredcounters", + "queue/watermark", + "queue/watermark/all", + "queue/watermark/unicast", + "queue/watermark/multicast", + "reboot-cause/history", + "services", + "srv6/stats", + "system-memory", + "uptime", + "version", + "vlan/brief", + "watermark/telemetry/interval" ] } \ No newline at end of file diff --git a/dockers/docker-telemetry-watchdog/watchdog/src/main.rs b/dockers/docker-telemetry-watchdog/watchdog/src/main.rs index 1b47dcb3b59..2715a2e3912 100644 --- a/dockers/docker-telemetry-watchdog/watchdog/src/main.rs +++ b/dockers/docker-telemetry-watchdog/watchdog/src/main.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::io::{BufRead, BufReader, Write}; use std::net::{TcpListener, TcpStream}; use std::time::{Duration, Instant}; @@ -15,6 +16,7 @@ const DEFAULT_TELEMETRY_SERVICE_PORT: u16 = 50051; struct HealthStatus { check_telemetry_port: String, xpath_commands: Vec, + xpath_load_errors: Vec, } #[derive(Serialize, Clone)] @@ -30,6 +32,8 @@ const XPATH_ENV_VAR: &str = "TELEMETRY_WATCHDOG_XPATHS"; // comma-separated list const XPATH_ENV_BLACKLIST: &str = "TELEMETRY_WATCHDOG_XPATHS_BLACKLIST"; // comma-separated list to exclude const CMD_TIMEOUT_ENV_VAR: &str = "TELEMETRY_WATCHDOG_CMD_TIMEOUT_SECS"; // per-command timeout seconds const GNMI_BASE_CMD: &str = "gnmi_get"; // assumed in PATH +// optional: set to "false" to skip show api (gnmi xpath) probing +const SHOW_API_PROBE_ENV_VAR: &str = "TELEMETRY_WATCHDOG_SHOW_API_PROBE_ENABLED"; const TARGET_NAME_ENV_VAR: &str = "TELEMETRY_WATCHDOG_TARGET_NAME"; // optional override for target_name const DEFAULT_TARGET_NAME: &str = "server.ndastreaming.ap.gbl"; const DEFAULT_CA_CRT: &str = "/etc/sonic/telemetry/dsmsroot.cer"; @@ -45,49 +49,58 @@ const DEFAULT_SERVER_KEY: &str = "/etc/sonic/telemetry/streamingtelemetryserver. // } // 2. Environment variable TELEMETRY_WATCHDOG_XPATHS optional. Comma-separated list of xpaths. // Both sources are merged; duplicates removed (first occurrence kept). -// During the probe request, after verifying the GNMI port is reachable, each xpath results in a command: -// gnmi_get -xpath_target SHOW -xpath -target_addr 127.0.0.1: -logtostderr [ -ca -cert -key -target_name | -insecure true ] -// Cert paths and client_auth/target_name are pulled from Redis hashes (TELEMETRY|certs, TELEMETRY|gnmi). -// client_auth now: ONLY explicit Redis value "true" (case-insensitive) enables TLS client auth; anything else (missing/other value) -> insecure. -// Any failure (spawn error or non-zero exit status) causes overall HTTP 500 with per-xpath results in JSON body. - -fn load_xpath_list() -> Vec { - let mut list: Vec = Vec::new(); - - // JSON file expected format: { "xpaths": ["reboot-cause/history", "lldp/neighbors"] } - match fs::read_to_string(CMD_LIST_JSON) { - Ok(content) => { - #[derive(serde::Deserialize)] - struct JsonCfg { xpaths: Option> } - match serde_json::from_str::(&content) { - Ok(cfg) => { - if let Some(mut xs) = cfg.xpaths { list.append(&mut xs); } - }, - Err(e) => eprintln!("Failed to parse {}: {}", CMD_LIST_JSON, e), - } - }, - Err(e) => eprintln!("Could not read {}: {} (will continue with env var only)", CMD_LIST_JSON, e), +// During probe: for each xpath (after port OK) run command: +// gnmi_get -xpath_target SHOW -xpath -target_addr 127.0.0.1: -logtostderr \ +// [ -ca -cert -key -target_name | -insecure true ] +// Cert paths + client_auth come from Redis (TELEMETRY|certs, TELEMETRY|gnmi). +// client_auth: ONLY explicit Redis value "true" (case-insensitive) enables TLS; anything else -> insecure. +// Any failure (spawn error / non-zero exit) sets HTTP 500; body lists per-xpath results. +// SHOW probe control: env TELEMETRY_WATCHDOG_SHOW_API_PROBE="disable" skips gnmi_get xpaths (default enabled). + +fn load_xpath_list() -> (Vec, Vec) { + let mut set: HashSet = HashSet::new(); + let mut errors: Vec = Vec::new(); + // JSON file format example: + // { "xpaths": ["reboot-cause/history", "lldp/neighbors"] } + if let Ok(content) = fs::read_to_string(CMD_LIST_JSON) { + #[derive(serde::Deserialize)] + struct JsonCfg { xpaths: Option> } + match serde_json::from_str::(&content) { + Ok(cfg) => { + if let Some(xs) = cfg.xpaths { for x in xs { if !x.is_empty() { set.insert(x); } } } + }, + Err(e) => { + let msg = format!("Failed to parse {CMD_LIST_JSON}: {e}"); + errors.push(msg); + }, + } + } else { + if let Err(e) = fs::read_to_string(CMD_LIST_JSON) { + let msg = format!( + "Could not read {CMD_LIST_JSON}: {e} (will continue with env var only)" + ); + errors.push(msg); + } } if let Ok(env_val) = env::var(XPATH_ENV_VAR) { - for part in env_val.split(',') { let trimmed = part.trim(); if !trimmed.is_empty() { list.push(trimmed.to_string()); } } + for part in env_val.split(',') { + let t = part.trim(); + if !t.is_empty() { set.insert(t.to_string()); } + } } - // dedupe while preserving order - let mut seen = std::collections::HashSet::new(); - list.retain(|x| seen.insert(x.clone())); - // apply blacklist from env + // Apply blacklist: remove those entries if let Ok(blacklist) = env::var(XPATH_ENV_BLACKLIST) { - let mut blk = std::collections::HashSet::new(); - for part in blacklist.split(',') { - let trimmed = part.trim(); - if !trimmed.is_empty() { blk.insert(trimmed.to_string()); } - } - if !blk.is_empty() { - list.retain(|x| !blk.contains(x)); + if !blacklist.trim().is_empty() { + for part in blacklist.split(',') { + let t = part.trim(); + if !t.is_empty() { set.remove(t); } + } } } - list + + (set.into_iter().collect(), errors) } struct TelemetrySecurityConfig { @@ -97,8 +110,30 @@ struct TelemetrySecurityConfig { server_key: String, } -fn run_gnmi_for_xpath(xpath: &str, port: u16, sec: &TelemetrySecurityConfig, target_name: &str, timeout: Duration) -> CommandResult { - // Build full command: gnmi_get -xpath_target SHOW -xpath -target_addr 127.0.0.1: -logtostderr -insecure true +// Unified helper: open a Redis connection to DB 4 and fetch one hash field. +// Returns None on any error (client creation, connection, or HGET failure). +fn redis_hget(hash: &str, field: &str) -> Option { + let client = match redis::Client::open("redis://127.0.0.1:6379/4") { + Ok(c) => c, + Err(e) => { eprintln!("Redis client error {hash}.{field}: {e}"); return None; } + }; + let mut conn = match client.get_connection() { + Ok(c) => c, + Err(e) => { eprintln!("Redis connection error {hash}.{field}: {e}"); return None; } + }; + match conn.hget::<_, _, Option>(hash, field) { + Ok(v) => v, + Err(e) => { eprintln!("Redis HGET error {hash}.{field}: {e}"); None } + } +} + +fn run_gnmi_for_xpath( + xpath: &str, + port: u16, + sec: &TelemetrySecurityConfig, + target_name: &str, + timeout: Duration, +) -> CommandResult { let addr = format!("127.0.0.1:{port}"); let start = Instant::now(); let mut cmd = Command::new(GNMI_BASE_CMD); @@ -107,7 +142,9 @@ fn run_gnmi_for_xpath(xpath: &str, port: u16, sec: &TelemetrySecurityConfig, tar .arg("-target_addr").arg(addr) .arg("-logtostderr") .stdin(Stdio::null()) - .stdout(Stdio::piped()) + // We ignore stdout to avoid blocking if gnmi_get prints a lot. + .stdout(Stdio::null()) + // Keep stderr for error diagnostics. .stderr(Stdio::piped()); if sec.use_client_auth { cmd.arg("-ca").arg(&sec.ca_crt) @@ -124,52 +161,88 @@ fn run_gnmi_for_xpath(xpath: &str, port: u16, sec: &TelemetrySecurityConfig, tar Err(e) => { let dur = start.elapsed().as_millis(); eprintln!("Failed to spawn gnmi_get for {}: {}", xpath, e); - return CommandResult { xpath: xpath.to_string(), success: false, error: Some(e.to_string()), duration_ms: dur }; + return CommandResult { + xpath: xpath.to_string(), + success: false, + error: Some(e.to_string()), + duration_ms: dur, + }; } }; - let output = { + let wait_result = { let start_wait = Instant::now(); loop { match child.try_wait() { - Ok(Some(_status)) => { - // Process exited; collect output - match child.wait_with_output() { - Ok(out) => break Ok(out), - Err(e) => break Err(e), - } - } + Ok(Some(status)) => break Ok(status), Ok(None) => { if start_wait.elapsed() >= timeout { - // kill on timeout let _ = child.kill(); let _ = child.wait(); - break Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "gnmi_get timed out")); + break Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "gnmi_get timed out", + )); } std::thread::sleep(Duration::from_millis(50)); } - Err(e) => { - break Err(e); - } + Err(e) => break Err(e), } } }; + // After status known, pull stderr (if any). + let stderr_string = match child.stderr.take() { + Some(mut stderr_pipe) => { + use std::io::Read; + let mut buf = Vec::new(); + // Best-effort read; ignore read errors. + if let Err(e) = stderr_pipe.read_to_end(&mut buf) { + eprintln!("Failed to read gnmi_get stderr: {e}"); + } + String::from_utf8_lossy(&buf).to_string() + } + None => String::new(), + }; let dur = start.elapsed().as_millis(); - match output { - Ok(out) => { - if out.status.success() { - println!("gnmi_get success xpath={}", xpath); - CommandResult { xpath: xpath.to_string(), success: true, error: None, duration_ms: dur } + match wait_result { + Ok(status) => { + if status.success() { + CommandResult { + xpath: xpath.to_string(), + success: true, + error: None, + duration_ms: dur, + } } else { - let stderr = String::from_utf8_lossy(&out.stderr).to_string(); - eprintln!("gnmi_get failed xpath={} status={:?} err={}", xpath, out.status.code(), stderr); - CommandResult { xpath: xpath.to_string(), success: false, error: Some(stderr), duration_ms: dur } + // Possibly large stderr; truncate if huge (optional threshold 16KB). + let mut truncated = if stderr_string.len() > 16 * 1024 { + format!( + "{}...[truncated {} bytes]", + &stderr_string[..16 * 1024], + stderr_string.len() - 16 * 1024 + ) + } else { + stderr_string.clone() + }; + let exit_code = status.code().map(|c| c.to_string()).unwrap_or_else(|| "unknown".to_string()); + truncated.push_str(&format!(" (exit code: {exit_code})")); + CommandResult { + xpath: xpath.to_string(), + success: false, + error: Some(truncated), + duration_ms: dur, + } } }, Err(e) => { eprintln!("Failed to spawn gnmi_get for {}: {}", xpath, e); - CommandResult { xpath: xpath.to_string(), success: false, error: Some(e.to_string()), duration_ms: dur } + CommandResult { + xpath: xpath.to_string(), + success: false, + error: Some(e.to_string()), + duration_ms: dur, + } } } } @@ -177,38 +250,17 @@ fn run_gnmi_for_xpath(xpath: &str, port: u16, sec: &TelemetrySecurityConfig, tar fn get_security_config() -> TelemetrySecurityConfig { // Redis DB 4 hashes: // TELEMETRY|certs: ca_crt, server_crt, server_key - // TELEMETRY|gnmi: client_auth, target_name (target_name new; if absent we still proceed) - let client = match redis::Client::open("redis://127.0.0.1:6379/4") { - Ok(c) => c, - Err(e) => { - eprintln!("Redis client error (security): {e}"); - return TelemetrySecurityConfig { use_client_auth: false, ca_crt: DEFAULT_CA_CRT.to_string(), server_crt: DEFAULT_SERVER_CRT.to_string(), server_key: DEFAULT_SERVER_KEY.to_string() }; - } - }; - let mut conn = match client.get_connection() { - Ok(c) => c, - Err(e) => { - eprintln!("Redis connection error (security): {e}"); - return TelemetrySecurityConfig { use_client_auth: false, ca_crt: DEFAULT_CA_CRT.to_string(), server_crt: DEFAULT_SERVER_CRT.to_string(), server_key: DEFAULT_SERVER_KEY.to_string() }; - } - }; - - let mut get_field = |hash: &str, field: &str| -> Option { - let r: redis::RedisResult> = conn.hget(hash, field); - match r { Ok(v) => v, Err(e) => { eprintln!("Redis HGET error {hash}.{field}: {e}"); None } } - }; - - let ca_crt = get_field("TELEMETRY|certs", "ca_crt") + // TELEMETRY|gnmi: client_auth + let ca_crt = redis_hget("TELEMETRY|certs", "ca_crt") .filter(|v| !v.trim().is_empty()) .unwrap_or_else(|| DEFAULT_CA_CRT.to_string()); - let server_crt = get_field("TELEMETRY|certs", "server_crt") + let server_crt = redis_hget("TELEMETRY|certs", "server_crt") .filter(|v| !v.trim().is_empty()) .unwrap_or_else(|| DEFAULT_SERVER_CRT.to_string()); - let server_key = get_field("TELEMETRY|certs", "server_key") + let server_key = redis_hget("TELEMETRY|certs", "server_key") .filter(|v| !v.trim().is_empty()) .unwrap_or_else(|| DEFAULT_SERVER_KEY.to_string()); - let client_auth_opt = get_field("TELEMETRY|gnmi", "client_auth"); - // Only explicit "true" turns on client auth; everything else (including None) -> false + let client_auth_opt = redis_hget("TELEMETRY|gnmi", "client_auth"); let use_client_auth = matches!(client_auth_opt.as_ref(), Some(v) if v.eq_ignore_ascii_case("true")); TelemetrySecurityConfig { use_client_auth, ca_crt, server_crt, server_key } } @@ -221,7 +273,7 @@ fn get_target_name() -> String { } fn read_timeout() -> Duration { - const DEFAULT_SECS: u64 = 5; + const DEFAULT_SECS: u64 = 10; match env::var(CMD_TIMEOUT_ENV_VAR) { Ok(val) => match val.trim().parse::() { Ok(secs) if secs > 0 => Duration::from_secs(secs), @@ -231,34 +283,26 @@ fn read_timeout() -> Duration { } } -fn get_gnmi_port() -> u16 { - let client = match redis::Client::open("redis://127.0.0.1:6379/4") { - Ok(c) => c, - Err(e) => { - eprintln!("Redis client error (port): {e}"); - return DEFAULT_TELEMETRY_SERVICE_PORT; - } - }; - let mut conn = match client.get_connection() { - Ok(c) => c, - Err(e) => { - eprintln!("Redis connection error (port): {e}"); - return DEFAULT_TELEMETRY_SERVICE_PORT; - } - }; +fn is_show_api_probe_enabled() -> bool { + match env::var(SHOW_API_PROBE_ENV_VAR) { + Ok(v) if v.eq_ignore_ascii_case("false") => false, + _ => true, // default enabled + } +} - let res: redis::RedisResult> = conn.hget("TELEMETRY|gnmi", "port"); - match res { - Ok(Some(p)) => p.parse::().unwrap_or_else(|_| { - eprintln!("Redis: TELEMETRY|gnmi.port not a valid u16: {p}"); +fn get_gnmi_port() -> u16 { + match redis_hget("TELEMETRY|gnmi", "port") { + Some(p) => p.parse::().unwrap_or_else(|_| { + eprintln!( + "Redis: TELEMETRY|gnmi.port not a valid u16: {p}" + ); DEFAULT_TELEMETRY_SERVICE_PORT }), - Ok(None) => { - eprintln!("Redis: TELEMETRY|gnmi.port missing; defaulting to {}", DEFAULT_TELEMETRY_SERVICE_PORT); - DEFAULT_TELEMETRY_SERVICE_PORT - } - Err(e) => { - eprintln!("Redis HGET error (port): {e}"); + None => { + eprintln!( + "Redis: TELEMETRY|gnmi.port missing; defaulting to {}", + DEFAULT_TELEMETRY_SERVICE_PORT + ); DEFAULT_TELEMETRY_SERVICE_PORT } } @@ -267,32 +311,12 @@ fn get_gnmi_port() -> u16 { // Connects to Redis DB 4 and returns true if the telemetry feature is enabled. // If Redis is unavailable or the field is missing, default to enabled (fail-open). fn is_telemetry_enabled() -> bool { - let client = match redis::Client::open("redis://127.0.0.1:6379/4") { - Ok(c) => c, - Err(e) => { - eprintln!("Redis client error (feature): {e}"); - return true; - } - }; - let mut conn = match client.get_connection() { - Ok(c) => c, - Err(e) => { - eprintln!("Redis connection error (feature): {e}"); - return true; - } - }; - - let res: redis::RedisResult> = conn.hget("FEATURE|telemetry", "state"); - match res { - Ok(Some(state)) => !state.eq_ignore_ascii_case("disabled"), - Ok(None) => { + match redis_hget("FEATURE|telemetry", "state") { + Some(state) => !state.eq_ignore_ascii_case("disabled"), + None => { eprintln!("Redis: FEATURE|telemetry.state missing; defaulting to enabled"); true } - Err(e) => { - eprintln!("Redis HGET error (feature): {e}"); - true - } } } @@ -330,6 +354,7 @@ fn main() { let mut http_status = "HTTP/1.1 200 OK"; let check_port_result; let mut cmd_results: Vec = Vec::new(); + let mut load_json_errors: Vec = Vec::new(); if !telemetry_enabled { check_port_result = "SKIPPED: feature disabled".to_string(); @@ -338,8 +363,12 @@ fn main() { if !check_port_result.starts_with("OK") { http_status = "HTTP/1.1 500 Internal Server Error"; } // Only run xpath commands if port is OK - if http_status == "HTTP/1.1 200 OK" { - let xpaths = load_xpath_list(); + if http_status == "HTTP/1.1 200 OK" && is_show_api_probe_enabled() { + let (xpaths, xpath_load_errors) = load_xpath_list(); + if !xpath_load_errors.is_empty() { + load_json_errors.extend(xpath_load_errors); + http_status = "HTTP/1.1 500 Internal Server Error"; + } let port = get_gnmi_port(); let sec_cfg = get_security_config(); let timeout = read_timeout(); @@ -352,7 +381,7 @@ fn main() { } } - let status = HealthStatus { check_telemetry_port: check_port_result, xpath_commands: cmd_results }; + let status = HealthStatus { check_telemetry_port: check_port_result, xpath_commands: cmd_results, xpath_load_errors: load_json_errors }; let json_body = serde_json::to_string(&status).unwrap(); let response = format!( diff --git a/rules/docker-telemetry-watchdog.mk b/rules/docker-telemetry-watchdog.mk index 4f22e2ca63f..6ebd53083bc 100644 --- a/rules/docker-telemetry-watchdog.mk +++ b/rules/docker-telemetry-watchdog.mk @@ -4,7 +4,10 @@ DOCKER_TELEMETRY_WATCHDOG_STEM = docker-telemetry-watchdog DOCKER_TELEMETRY_WATCHDOG = $(DOCKER_TELEMETRY_WATCHDOG_STEM).gz DOCKER_TELEMETRY_WATCHDOG_DBG = $(DOCKER_TELEMETRY_WATCHDOG_STEM)-$(DBG_IMAGE_MARK).gz -$(DOCKER_TELEMETRY_WATCHDOG)_LOAD_DOCKERS = $(DOCKER_GNMI) +$(DOCKER_TELEMETRY_WATCHDOG)_LOAD_DOCKERS += $(DOCKER_CONFIG_ENGINE_BOOKWORM) +$(DOCKER_TELEMETRY_WATCHDOG)_LOAD_DOCKERS += $(DOCKER_GNMI) +# Explicit Make dependency to ensure gnmi is built before telemetry-watchdog +$(DOCKER_TELEMETRY_WATCHDOG)_DEPENDS += $(DOCKER_GNMI) $(DOCKER_TELEMETRY_WATCHDOG)_PATH = $(DOCKERS_PATH)/$(DOCKER_TELEMETRY_WATCHDOG_STEM) From c567150d315d39fe8b3eff89bff99345c999bf11 Mon Sep 17 00:00:00 2001 From: Yun Li Date: Mon, 22 Sep 2025 12:03:03 +0000 Subject: [PATCH 4/5] Fix the line which is too long --- dockers/docker-telemetry-watchdog/watchdog/src/main.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dockers/docker-telemetry-watchdog/watchdog/src/main.rs b/dockers/docker-telemetry-watchdog/watchdog/src/main.rs index 2715a2e3912..ed3113f5ce2 100644 --- a/dockers/docker-telemetry-watchdog/watchdog/src/main.rs +++ b/dockers/docker-telemetry-watchdog/watchdog/src/main.rs @@ -381,7 +381,11 @@ fn main() { } } - let status = HealthStatus { check_telemetry_port: check_port_result, xpath_commands: cmd_results, xpath_load_errors: load_json_errors }; + let status = HealthStatus { + check_telemetry_port: check_port_result, + xpath_commands: cmd_results, + xpath_load_errors: load_json_errors + }; let json_body = serde_json::to_string(&status).unwrap(); let response = format!( From 341bb6086c5ec9d049e58ab3d2d9c44119fe59ab Mon Sep 17 00:00:00 2001 From: Yun Li Date: Tue, 23 Sep 2025 01:19:52 +0000 Subject: [PATCH 5/5] Fix the build error and two more comments --- .../watchdog/src/main.rs | 35 ++++++++++--------- rules/docker-telemetry-watchdog.mk | 5 +-- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/dockers/docker-telemetry-watchdog/watchdog/src/main.rs b/dockers/docker-telemetry-watchdog/watchdog/src/main.rs index ed3113f5ce2..6e649186fdb 100644 --- a/dockers/docker-telemetry-watchdog/watchdog/src/main.rs +++ b/dockers/docker-telemetry-watchdog/watchdog/src/main.rs @@ -39,6 +39,8 @@ const DEFAULT_TARGET_NAME: &str = "server.ndastreaming.ap.gbl"; const DEFAULT_CA_CRT: &str = "/etc/sonic/telemetry/dsmsroot.cer"; const DEFAULT_SERVER_CRT: &str = "/etc/sonic/telemetry/streamingtelemetryserver.cer"; const DEFAULT_SERVER_KEY: &str = "/etc/sonic/telemetry/streamingtelemetryserver.key"; +// Max stderr we keep per gnmi_get (bytes) before truncation. +const STDERR_TRUNCATE_LIMIT: usize = 16 * 1024; // 16KB // Configuration: // 1. JSON file (/cmd_list.json) optional. Format: @@ -62,20 +64,21 @@ fn load_xpath_list() -> (Vec, Vec) { let mut errors: Vec = Vec::new(); // JSON file format example: // { "xpaths": ["reboot-cause/history", "lldp/neighbors"] } - if let Ok(content) = fs::read_to_string(CMD_LIST_JSON) { - #[derive(serde::Deserialize)] - struct JsonCfg { xpaths: Option> } - match serde_json::from_str::(&content) { - Ok(cfg) => { - if let Some(xs) = cfg.xpaths { for x in xs { if !x.is_empty() { set.insert(x); } } } - }, - Err(e) => { - let msg = format!("Failed to parse {CMD_LIST_JSON}: {e}"); - errors.push(msg); - }, + match fs::read_to_string(CMD_LIST_JSON) { + Ok(content) => { + #[derive(serde::Deserialize)] + struct JsonCfg { xpaths: Option> } + match serde_json::from_str::(&content) { + Ok(cfg) => { + if let Some(xs) = cfg.xpaths { for x in xs { if !x.is_empty() { set.insert(x); } } } + }, + Err(e) => { + let msg = format!("Failed to parse {CMD_LIST_JSON}: {e}"); + errors.push(msg); + }, + } } - } else { - if let Err(e) = fs::read_to_string(CMD_LIST_JSON) { + Err(e) => { let msg = format!( "Could not read {CMD_LIST_JSON}: {e} (will continue with env var only)" ); @@ -216,11 +219,11 @@ fn run_gnmi_for_xpath( } } else { // Possibly large stderr; truncate if huge (optional threshold 16KB). - let mut truncated = if stderr_string.len() > 16 * 1024 { + let mut truncated = if stderr_string.len() > STDERR_TRUNCATE_LIMIT { format!( "{}...[truncated {} bytes]", - &stderr_string[..16 * 1024], - stderr_string.len() - 16 * 1024 + &stderr_string[..STDERR_TRUNCATE_LIMIT], + stderr_string.len() - STDERR_TRUNCATE_LIMIT ) } else { stderr_string.clone() diff --git a/rules/docker-telemetry-watchdog.mk b/rules/docker-telemetry-watchdog.mk index 6ebd53083bc..4f22e2ca63f 100644 --- a/rules/docker-telemetry-watchdog.mk +++ b/rules/docker-telemetry-watchdog.mk @@ -4,10 +4,7 @@ DOCKER_TELEMETRY_WATCHDOG_STEM = docker-telemetry-watchdog DOCKER_TELEMETRY_WATCHDOG = $(DOCKER_TELEMETRY_WATCHDOG_STEM).gz DOCKER_TELEMETRY_WATCHDOG_DBG = $(DOCKER_TELEMETRY_WATCHDOG_STEM)-$(DBG_IMAGE_MARK).gz -$(DOCKER_TELEMETRY_WATCHDOG)_LOAD_DOCKERS += $(DOCKER_CONFIG_ENGINE_BOOKWORM) -$(DOCKER_TELEMETRY_WATCHDOG)_LOAD_DOCKERS += $(DOCKER_GNMI) -# Explicit Make dependency to ensure gnmi is built before telemetry-watchdog -$(DOCKER_TELEMETRY_WATCHDOG)_DEPENDS += $(DOCKER_GNMI) +$(DOCKER_TELEMETRY_WATCHDOG)_LOAD_DOCKERS = $(DOCKER_GNMI) $(DOCKER_TELEMETRY_WATCHDOG)_PATH = $(DOCKERS_PATH)/$(DOCKER_TELEMETRY_WATCHDOG_STEM)