Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -336,31 +336,143 @@ def test_reconcile_enables_user_auth_and_cname(ss):
ss, container_fs, host_fs, commands, config_db = ss
# Set module-level flags directly (they're read inside reconcile)
ss.GNMI_VERIFY_ENABLED = True
ss.GNMI_CLIENT_CNAME = "AME Infra CA o6"
ss.GNMI_CLIENT_CERTS = [{"cname": "fake-infra-ca.test.example.com", "role": "gnmi_show_readonly"}]

# Precondition: empty DB
assert config_db == {}

ss.reconcile_config_db_once()

# user_auth must be set to 'cert'
assert config_db.get("GNMI|gnmi", {}).get("user_auth") == "cert"
# CNAME hash must exist with role=gnmi_show_readonly (default GNMI_CLIENT_ROLE)
cname_key = f"GNMI_CLIENT_CERT|{ss.GNMI_CLIENT_CNAME}"
assert config_db.get(cname_key, {}).get("role") == "gnmi_show_readonly"
assert config_db.get("TELEMETRY|gnmi", {}).get("user_auth") == "cert"
# CNAME hash must exist with role=gnmi_show_readonly
assert config_db.get("GNMI_CLIENT_CERT|fake-infra-ca.test.example.com", {}).get("role") == "gnmi_show_readonly"


def test_reconcile_disabled_removes_cname(ss):
ss, container_fs, host_fs, commands, config_db = ss
ss.GNMI_VERIFY_ENABLED = False
ss.GNMI_CLIENT_CNAME = "AME Infra CA o6"
ss.GNMI_CLIENT_CERTS = [{"cname": "fake-infra-ca.test.example.com", "role": "gnmi_show_readonly"}]

# Seed an existing entry to be removed
config_db[f"GNMI_CLIENT_CERT|{ss.GNMI_CLIENT_CNAME}"] = {"role": "gnmi_show_readonly"}
config_db["GNMI_CLIENT_CERT|fake-infra-ca.test.example.com"] = {"role": "gnmi_show_readonly"}

ss.reconcile_config_db_once()

assert f"GNMI_CLIENT_CERT|{ss.GNMI_CLIENT_CNAME}" not in config_db
assert "GNMI_CLIENT_CERT|fake-infra-ca.test.example.com" not in config_db

def test_reconcile_multiple_cnames(ss):
ss, container_fs, host_fs, commands, config_db = ss
ss.GNMI_VERIFY_ENABLED = True
ss.GNMI_CLIENT_CERTS = [
{"cname": "fake-client.test.example.com", "role": "admin"},
{"cname": "fake-server.test.example.com", "role": '["gnmi_show_readonly","admin"]'},
]
assert config_db == {}
ss.reconcile_config_db_once()

assert config_db.get("TELEMETRY|gnmi", {}).get("user_auth") == "cert"
assert config_db.get("GNMI_CLIENT_CERT|fake-client.test.example.com", {}).get("role") == "admin"
assert config_db.get("GNMI_CLIENT_CERT|fake-server.test.example.com", {}).get("role") == '["gnmi_show_readonly","admin"]'

# ─────────────────────────── Tests for _parse_client_certs ───────────────────────────

class TestParseClientCerts:
"""Tests for _parse_client_certs() env-var parsing."""

@pytest.fixture(autouse=True)
def _fresh_module(self, monkeypatch):
if "systemd_stub" in sys.modules:
del sys.modules["systemd_stub"]
self.monkeypatch = monkeypatch

def _import_with_env(self, env_vars):
"""Set env vars, re-import systemd_stub, and return the parsed GNMI_CLIENT_CERTS."""
for k, v in env_vars.items():
if v is None:
self.monkeypatch.delenv(k, raising=False)
else:
self.monkeypatch.setenv(k, v)
# Clear stale env vars not in the dict
for k in ("GNMI_CLIENT_CERTS", "TELEMETRY_CLIENT_CNAME", "GNMI_CLIENT_ROLE"):
if k not in env_vars:
self.monkeypatch.delenv(k, raising=False)
if "systemd_stub" in sys.modules:
del sys.modules["systemd_stub"]
ss = importlib.import_module("systemd_stub")
return ss.GNMI_CLIENT_CERTS

def test_valid_json_array(self):
certs = self._import_with_env({
"GNMI_CLIENT_CERTS": '[{"cname": "client.gbl", "role": "admin"}]'
})
assert certs == [{"cname": "client.gbl", "role": "admin"}]

def test_valid_json_multiple_entries(self):
certs = self._import_with_env({
"GNMI_CLIENT_CERTS": '[{"cname": "a.gbl", "role": "admin"}, {"cname": "b.gbl", "role": "readonly"}]'
})
assert len(certs) == 2
assert certs[0] == {"cname": "a.gbl", "role": "admin"}
assert certs[1] == {"cname": "b.gbl", "role": "readonly"}

def test_non_array_json_falls_back_to_legacy(self):
certs = self._import_with_env({
"GNMI_CLIENT_CERTS": '{"cname": "c.gbl", "role": "admin"}',
"TELEMETRY_CLIENT_CNAME": "legacy.gbl",
"GNMI_CLIENT_ROLE": "readonly",
})
assert certs == [{"cname": "legacy.gbl", "role": "readonly"}]

def test_invalid_json_falls_back_to_legacy(self):
certs = self._import_with_env({
"GNMI_CLIENT_CERTS": "not-json!",
"TELEMETRY_CLIENT_CNAME": "fallback.gbl",
})
assert certs == [{"cname": "fallback.gbl", "role": "gnmi_show_readonly"}]

def test_entry_not_dict_falls_back(self):
certs = self._import_with_env({
"GNMI_CLIENT_CERTS": '["not-a-dict"]',
"TELEMETRY_CLIENT_CNAME": "fb.gbl",
})
assert certs == [{"cname": "fb.gbl", "role": "gnmi_show_readonly"}]

def test_entry_missing_role_falls_back(self):
certs = self._import_with_env({
"GNMI_CLIENT_CERTS": '[{"cname": "x.gbl"}]',
"TELEMETRY_CLIENT_CNAME": "fb.gbl",
})
assert certs == [{"cname": "fb.gbl", "role": "gnmi_show_readonly"}]

def test_entry_empty_cname_falls_back(self):
certs = self._import_with_env({
"GNMI_CLIENT_CERTS": '[{"cname": " ", "role": "admin"}]',
"TELEMETRY_CLIENT_CNAME": "fb.gbl",
})
assert certs == [{"cname": "fb.gbl", "role": "gnmi_show_readonly"}]

def test_legacy_single_entry(self):
certs = self._import_with_env({
"TELEMETRY_CLIENT_CNAME": "legacy.gbl",
"GNMI_CLIENT_ROLE": "admin",
})
assert certs == [{"cname": "legacy.gbl", "role": "admin"}]

def test_legacy_default_role(self):
certs = self._import_with_env({
"TELEMETRY_CLIENT_CNAME": "legacy.gbl",
})
assert certs == [{"cname": "legacy.gbl", "role": "gnmi_show_readonly"}]

def test_no_env_returns_empty(self):
certs = self._import_with_env({})
assert certs == []

def test_whitespace_stripped(self):
certs = self._import_with_env({
"GNMI_CLIENT_CERTS": '[{"cname": " client.gbl ", "role": " admin "}]'
})
assert certs == [{"cname": "client.gbl", "role": "admin"}]


# ─────────────────────────── Tests for _get_branch_name ───────────────────────────
Expand Down
77 changes: 54 additions & 23 deletions dockers/docker-telemetry-sidecar/systemd_stub.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#!/usr/bin/env python3
from __future__ import annotations

import json
import os
import re
import subprocess
import time
import argparse
from typing import List
from typing import Dict, List

from sonic_py_common.sidecar_common import (
get_bool_env_var, logger, SyncItem,
Expand All @@ -21,11 +22,46 @@

# CONFIG_DB reconcile env
GNMI_VERIFY_ENABLED = get_bool_env_var("TELEMETRY_CLIENT_CERT_VERIFY_ENABLED", default=False)
GNMI_CLIENT_CNAME = os.getenv("TELEMETRY_CLIENT_CNAME", "")
GNMI_CLIENT_ROLE = os.getenv("GNMI_CLIENT_ROLE", "gnmi_show_readonly")
def _parse_client_certs() -> List[Dict[str, str]]:
"""
Build the list of GNMI client cert entries from env vars.

Preferred: GNMI_CLIENT_CERTS (JSON array of {"cname": ..., "role": ...})
Fallback: TELEMETRY_CLIENT_CNAME / GNMI_CLIENT_ROLE (single entry, backward-compat)
"""
raw = os.getenv("GNMI_CLIENT_CERTS", "").strip()
if raw:
try:
entries = json.loads(raw)
if not isinstance(entries, list):
raise ValueError("GNMI_CLIENT_CERTS must be a JSON array")
normalized: List[Dict[str, str]] = []
for e in entries:
if not isinstance(e, dict):
raise ValueError(f"Each entry must be an object: {e!r}")
if "cname" not in e or "role" not in e:
raise ValueError(f"Each entry needs 'cname' and 'role': {e}")
cname = str(e.get("cname", "")).strip()
role = str(e.get("role", "")).strip()
if not cname or not role:
raise ValueError(f"'cname' and 'role' must be non-empty strings: {e}")
normalized.append({"cname": cname, "role": role})
return normalized
except (json.JSONDecodeError, ValueError) as exc:
logger.log_error(f"Bad GNMI_CLIENT_CERTS env var: {exc}; falling back to legacy")

# Legacy single-entry env vars
cname = os.getenv("TELEMETRY_CLIENT_CNAME", "").strip()
role = os.getenv("GNMI_CLIENT_ROLE", "gnmi_show_readonly").strip()
if cname:
return [{"cname": cname, "role": role}]
return []


GNMI_CLIENT_CERTS: List[Dict[str, str]] = _parse_client_certs()
Comment on lines +25 to +61
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New GNMI_CLIENT_CERTS parsing / legacy fallback logic in _parse_client_certs() is not covered by tests (e.g., valid JSON array, non-array JSON, invalid JSON fallback to legacy env vars). Adding focused unit tests would help prevent regressions since this runs at import time and affects CONFIG_DB reconciliation behavior.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added test case TestParseClientCerts


logger.log_notice(f"IS_V1_ENABLED={IS_V1_ENABLED}")
logger.log_notice(f"GNMI_CLIENT_ROLE={GNMI_CLIENT_ROLE}")
logger.log_notice(f"GNMI_CLIENT_CERTS={GNMI_CLIENT_CERTS}")

_TELEMETRY_SRC = (
"/usr/share/sonic/systemd_scripts/telemetry_v1.sh"
Expand Down Expand Up @@ -131,52 +167,47 @@ def _get_branch_name() -> str:


def _ensure_user_auth_cert() -> None:
cur = db_hget("GNMI|gnmi", "user_auth")
cur = db_hget("TELEMETRY|gnmi", "user_auth")
if cur != "cert":
if db_hset("GNMI|gnmi", "user_auth", "cert"):
logger.log_notice(f"Set GNMI|gnmi.user_auth=cert (was: {cur or '<unset>'})")
if db_hset("TELEMETRY|gnmi", "user_auth", "cert"):
logger.log_notice(f"Set TELEMETRY|gnmi.user_auth=cert (was: {cur or '<unset>'})")
else:
logger.log_error("Failed to set GNMI|gnmi.user_auth=cert")
logger.log_error("Failed to set TELEMETRY|gnmi.user_auth=cert")


def _ensure_cname_present(cname: str) -> None:
if not cname:
logger.log_warning("TELEMETRY_CLIENT_CNAME not set; skip CNAME creation")
return

def _ensure_cname_present(cname: str, role: str) -> None:
key = f"GNMI_CLIENT_CERT|{cname}"
entry = db_hgetall(key)
if not entry:
if db_hset(key, "role", GNMI_CLIENT_ROLE):
logger.log_notice(f"Created {key} with role={GNMI_CLIENT_ROLE}")
if db_hset(key, "role", role):
logger.log_notice(f"Created {key} with role={role}")
else:
logger.log_error(f"Failed to create {key}")


def _ensure_cname_absent(cname: str) -> None:
if not cname:
return
key = f"GNMI_CLIENT_CERT|{cname}"
if db_hgetall(key):
if db_del(key):
logger.log_notice(f"Removed {key}")
else:
logger.log_error(f"Failed to remove {key}")


def reconcile_config_db_once() -> None:
"""
Idempotent drift-correction for CONFIG_DB:
- When TELEMETRY_CLIENT_CERT_VERIFY_ENABLED=true:
* Ensure GNMI|gnmi.user_auth=cert
* Ensure GNMI_CLIENT_CERT|<CNAME> exists with role=<GNMI_CLIENT_ROLE>
- When false: ensure the CNAME row is absent
* Ensure TELEMETRY|gnmi.user_auth=cert
* Ensure every GNMI_CLIENT_CERT|<CNAME> entry exists with its role
- When false: ensure all CNAME rows are absent
"""
if GNMI_VERIFY_ENABLED:
_ensure_user_auth_cert()
_ensure_cname_present(GNMI_CLIENT_CNAME)
for entry in GNMI_CLIENT_CERTS:
_ensure_cname_present(entry["cname"], entry["role"])
else:
_ensure_cname_absent(GNMI_CLIENT_CNAME)
for entry in GNMI_CLIENT_CERTS:
_ensure_cname_absent(entry["cname"])

def ensure_sync() -> bool:
branch_name = _get_branch_name()
Expand Down
Loading
Loading