Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions agent/thymis_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import subprocess
import sys
import uuid
from typing import Dict, List, Literal, Optional, Tuple, Union
from datetime import timezone
from typing import Any, Dict, List, Literal, Optional, Tuple, Union

import http_network_relay.edge_agent
import http_network_relay.edge_agent as ea
Expand Down Expand Up @@ -167,7 +168,10 @@ def load_controller_public_key_into_root_authorized_keys():

class AgentToRelayMessage(BaseModel):
# This is a custom message that the agent sends to the relay
inner: Union["EtRSwitchToNewConfigResultMessage",] = Field(discriminator="kind")
inner: Union[
"EtRSwitchToNewConfigResultMessage",
"EtRMetricsMessage",
] = Field(discriminator="kind")


class EtRSwitchToNewConfigResultMessage(BaseModel):
Expand All @@ -182,6 +186,14 @@ class EtRSwitchToNewConfigResultMessage(BaseModel):
stderr: str | None = None # in v3 final


class EtRMetricsMessage(BaseModel):
kind: Literal["metrics"] = "metrics"
timestamp: datetime.datetime
cpu_percent: float
ram_percent: float
disk_percent: float


class RelayToAgentMessage(BaseModel):
# This is a custom message that the relay sends to the agent
inner: Union[
Expand Down Expand Up @@ -226,7 +238,7 @@ class EdgeAgentToRelayStartMessage(ea.EtRStartMessage):
hardware_ids: Dict[str, str]
public_key: str
deployed_config_id: str
ip_addresses: List[str]
network_interfaces: List[Dict[str, Any]]
last_error: Optional[str] = None


Expand Down Expand Up @@ -665,14 +677,15 @@ def place_secrets_on_start(self, token: str):

async def on_connected(self):
self.systemd_notifier.status("Connected to relay")
asyncio.create_task(self.collect_and_send_metrics())

async def create_start_message(self, last_error: Optional[str] = None):
return EdgeAgentToRelayStartMessage(
token=self.token,
hardware_ids=self.detect_hardware_id(),
public_key=self.detect_public_key(),
deployed_config_id=self.detect_system_config()[0],
ip_addresses=self.detect_ip_addresses(),
network_interfaces=self.detect_network_interfaces(),
last_error=last_error,
)

Expand All @@ -681,6 +694,24 @@ async def on_connection_closed(self):
self.systemd_notifier.status("Connection closed, reconnecting...")
await super().on_connection_closed()

async def collect_and_send_metrics(self):
"""Collect system metrics every 60s and send to controller."""
while True:
try:
msg = AgentToRelayMessage(
inner=EtRMetricsMessage(
timestamp=datetime.datetime.now(timezone.utc),
cpu_percent=psutil.cpu_percent(interval=1),
ram_percent=psutil.virtual_memory().percent,
disk_percent=psutil.disk_usage("/").percent,
)
)
if self.websocket and not self.websocket.closed:
await self.websocket.send(msg.model_dump_json())
except Exception as e:
logger.error("Failed to collect/send metrics: %s", e)
await asyncio.sleep(60)

def update_config_commit(self, new_commit: str):
self.agent_metadata["configuration_commit"] = new_commit
metadata_path = find_file(AGENT_DATA_PATHS, AGENT_METADATA_FILENAME)
Expand Down Expand Up @@ -742,6 +773,27 @@ def extract_file_content(path):
}
return {key: value for key, value in hardware_ids.items() if value}

def detect_network_interfaces(self):
interfaces = {}
for interface, snics in psutil.net_if_addrs().items():
if interface == "lo":
continue
if interface not in interfaces:
interfaces[interface] = {
"interface": interface,
"ipv4_addresses": [],
"ipv6_addresses": [],
"mac_address": None,
}
for snic in snics:
if snic.family == socket.AF_INET:
interfaces[interface]["ipv4_addresses"].append(snic.address)
elif snic.family == socket.AF_INET6:
interfaces[interface]["ipv6_addresses"].append(snic.address)
elif snic.family == psutil.AF_LINK:
interfaces[interface]["mac_address"] = snic.address
return list(interfaces.values())

def detect_ip_addresses(self):
def get_ip_addresses(family):
for interface, snics in psutil.net_if_addrs().items():
Expand Down
33 changes: 33 additions & 0 deletions controller/tests/crud/test_agent_connection_extensions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import uuid
from datetime import datetime, timezone

from thymis_controller import db_models
from thymis_controller.crud import agent_connection as crud


def _make_di(db_session):
di = db_models.DeploymentInfo(
ssh_public_key=f"ssh-ed25519 AAAA{uuid.uuid4().hex}",
deployed_config_id="cfg",
)
db_session.add(di)
db_session.commit()
db_session.refresh(di)
return di


def test_get_by_deployment_info_returns_last_10(db_session):
di = _make_di(db_session)
# Create 15 connections
for i in range(15):
conn = db_models.AgentConnection(
deployment_info_id=di.id,
connected_at=datetime(2026, 1, i + 1, tzinfo=timezone.utc),
)
db_session.add(conn)
db_session.commit()

results = crud.get_by_deployment_info(db_session, di.id)
assert len(results) == 10
# Most recent first
assert results[0].connected_at > results[-1].connected_at
42 changes: 42 additions & 0 deletions controller/tests/crud/test_deployment_info_extensions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import uuid

from thymis_controller import db_models
from thymis_controller.crud import deployment_info as crud


def _make_di(db_session):
di = db_models.DeploymentInfo(
ssh_public_key=f"ssh-ed25519 AAAA{uuid.uuid4().hex}",
deployed_config_id="cfg",
)
db_session.add(di)
db_session.commit()
db_session.refresh(di)
return di


def test_update_location(db_session):
di = _make_di(db_session)
updated = crud.update_location(db_session, di.id, "Server Room A")
assert updated.location == "Server Room A"


def test_update_location_to_none(db_session):
di = _make_di(db_session)
crud.update_location(db_session, di.id, "Old Location")
updated = crud.update_location(db_session, di.id, None)
assert updated.location is None


def test_update_stores_network_interfaces(db_session):
di = _make_di(db_session)
ifaces = [
{
"interface": "eth0",
"ipv4_addresses": ["192.168.1.1"],
"ipv6_addresses": [],
"mac_address": "aa:bb:cc:dd:ee:ff",
}
]
updated = crud.update(db_session, di.id, network_interfaces=ifaces)
assert updated.network_interfaces == ifaces
118 changes: 118 additions & 0 deletions controller/tests/crud/test_device_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import uuid
from datetime import datetime, timedelta, timezone

from thymis_controller import db_models
from thymis_controller.crud import device_metric as crud


def _make_deployment_info(db_session):
di = db_models.DeploymentInfo(
ssh_public_key=f"ssh-ed25519 AAAA{uuid.uuid4().hex}",
deployed_config_id="test-config",
)
db_session.add(di)
db_session.commit()
db_session.refresh(di)
return di


def test_device_metric_model_exists():
"""DeviceMetric model has expected columns."""
metric = db_models.DeviceMetric(
deployment_info_id=uuid.uuid4(),
timestamp=datetime.now(timezone.utc),
cpu_percent=42.5,
ram_percent=67.3,
disk_percent=10.1,
)
assert metric.cpu_percent == 42.5
assert metric.ram_percent == 67.3
assert metric.disk_percent == 10.1


def test_create_metric(db_session):
di = _make_deployment_info(db_session)
now = datetime.now(timezone.utc)
metric = crud.create_metric(db_session, di.id, 50.0, 70.0, 30.0, now)
assert metric.id is not None
assert metric.cpu_percent == 50.0
assert metric.ram_percent == 70.0
assert metric.disk_percent == 30.0


def test_get_metrics_downsampled_returns_averaged_buckets(db_session):
di = _make_deployment_info(db_session)
base = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
# Two entries in same 1h bucket
crud.create_metric(db_session, di.id, 40.0, 60.0, 20.0, base)
crud.create_metric(
db_session, di.id, 60.0, 80.0, 40.0, base + timedelta(minutes=30)
)

results = crud.get_metrics_downsampled(
db_session,
di.id,
from_datetime=base - timedelta(minutes=1),
to_datetime=base + timedelta(hours=1),
granularity="1h",
)
assert len(results) == 1
assert abs(results[0]["cpu_percent"] - 50.0) < 0.01 # average of 40 and 60


def test_get_metrics_downsampled_1min_granularity(db_session):
di = _make_deployment_info(db_session)
base = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
crud.create_metric(db_session, di.id, 30.0, 50.0, 10.0, base)
crud.create_metric(
db_session, di.id, 50.0, 70.0, 30.0, base + timedelta(seconds=30)
)

results = crud.get_metrics_downsampled(
db_session,
di.id,
from_datetime=base - timedelta(seconds=1),
to_datetime=base + timedelta(minutes=1),
granularity="1min",
)
assert len(results) == 1
assert abs(results[0]["cpu_percent"] - 40.0) < 0.01 # average of 30 and 50


def test_get_metrics_downsampled_15min_granularity(db_session):
di = _make_deployment_info(db_session)
base = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
# Both entries fall in the same 15-minute bucket (12:00)
crud.create_metric(db_session, di.id, 20.0, 40.0, 10.0, base)
crud.create_metric(db_session, di.id, 40.0, 60.0, 30.0, base + timedelta(minutes=7))

results = crud.get_metrics_downsampled(
db_session,
di.id,
from_datetime=base - timedelta(seconds=1),
to_datetime=base + timedelta(minutes=15),
granularity="15min",
)
assert len(results) == 1
assert abs(results[0]["cpu_percent"] - 30.0) < 0.01 # average of 20 and 40


def test_delete_expired_metrics(db_session):
di = _make_deployment_info(db_session)
old = datetime(2025, 1, 1, tzinfo=timezone.utc)
recent = datetime.now(timezone.utc)
crud.create_metric(db_session, di.id, 1.0, 1.0, 1.0, old)
crud.create_metric(db_session, di.id, 2.0, 2.0, 2.0, recent)

cutoff = datetime(2026, 1, 1, tzinfo=timezone.utc)
deleted = crud.delete_expired_metrics(db_session, cutoff)
assert deleted == 1

remaining = crud.get_metrics_downsampled(
db_session,
di.id,
from_datetime=datetime(2020, 1, 1, tzinfo=timezone.utc),
to_datetime=datetime(2030, 1, 1, tzinfo=timezone.utc),
granularity="1h",
)
assert len(remaining) == 1
37 changes: 37 additions & 0 deletions controller/tests/routes/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
Route-level conftest: provides a working test_client without needing
the Project fixture (which requires a real Engine, not a Session).
The new device-details endpoints don't use ProjectAD, so we can
override get_project with a simple None-returning mock.
"""
import pytest
from fastapi.testclient import TestClient
from thymis_controller.dependencies import (
get_db_session,
get_project,
require_valid_user_session,
)


@pytest.fixture(scope="function")
def test_client(db_session) -> TestClient:
"""Test client that overrides DB session and stubs out auth/project."""
from thymis_controller.main import app

def override_get_db():
try:
yield db_session
finally:
pass

def override_get_project():
return None

def override_authenticate():
return True

app.dependency_overrides[get_db_session] = override_get_db
app.dependency_overrides[get_project] = override_get_project
app.dependency_overrides[require_valid_user_session] = override_authenticate
yield TestClient(app)
app.dependency_overrides.clear()
Loading
Loading