Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/check-alerts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,5 @@ jobs:
# NOTE: Should be a blank string for pull requests
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
HUD_API_TOKEN: ${{ secrets.HUD_API_TOKEN }}
AWS_INFRA_ALERTS_LAMBDA_URL: ${{ secrets.AWS_INFRA_ALERTS_LAMBDA_URL }}
QUEUE_ALERT_AWS_LAMBDA_TOKEN: ${{ secrets.QUEUE_ALERT_AWS_LAMBDA_TOKEN }}
172 changes: 166 additions & 6 deletions tools/torchci/queue_alert.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import argparse
import datetime
import json
import os
import re
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, List, NamedTuple
from typing import Any, Callable, Dict, List, NamedTuple

import requests
from setuptools import distutils # type: ignore[import]
Expand Down Expand Up @@ -39,6 +42,39 @@
]


class AWSAlertRule(NamedTuple):
machines: list[str]
rule: Callable[[int, int], bool]
team: str


# Rules for alerts sent via the new AWS alerting system in
# https://github.com/pytorch/alerting-infra. Each machine gets its own alert
# even if it is the same rule
AWS_ALERT_RULES = [
AWSAlertRule(
machines=[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work with regex like linux.rocm.*? It seems unyielding to maintain this list here

Copy link
Contributor Author

@clee2000 clee2000 Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have to query the list of all runners otherwise it doesn't know what to close when the queue is gone, maybe I can do that in a separate PR and you tell me if you like that better?

"linux.rocm.gfx942.docker-cache",
"linux.rocm.gpu.2",
"linux.rocm.gpu.4",
"linux.rocm.gpu.gfx1100",
"linux.rocm.gpu.gfx942.1",
"linux.rocm.gpu.gfx942.1.b",
"linux.rocm.gpu.gfx942.2",
"linux.rocm.gpu.gfx942.4",
"linux.rocm.gpu.gfx942.4.b",
"linux.rocm.gpu.gfx942.8",
"linux.rocm.gpu.mi250",
"linux.rocm.gpu.mi250.1",
"linux.rocm.gpu.mi250.4",
"linux.rocm.gpu.mi355.1",
],
rule=lambda count, seconds: count > 20 and seconds > 1 * 60 * 60,
team="rocm-queue",
),
]


class QueueInfo(NamedTuple):
machine: str
count: int
Expand Down Expand Up @@ -85,6 +121,17 @@ def gen_issue(queues: List[QueueInfo]) -> Any:
return issue


@lru_cache
def get_queues() -> List[Dict[str, Any]]:
# %7B%7D = encoded {}
url = (
"https://hud.pytorch.org/api/clickhouse/queued_jobs_by_label?parameters=%7B%7D"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this API continue to work? I suspect that it might not given the bot protection that we have turned on and it will need the auth part like Dr.CI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I can see that it sets HUD_API_TOKEN automatically if that env var is available, so this is good

)
response = requests.get(url, headers=get_hud_headers())
response.raise_for_status()
return response.json()


def filter_long_queues(db_result: List[Dict[str, Any]]) -> List[QueueInfo]:
large_queue: List[QueueInfo] = []

Expand All @@ -106,11 +153,7 @@ def filter_long_queues(db_result: List[Dict[str, Any]]) -> List[QueueInfo]:


def queuing_alert(dry_run: bool) -> None:
# %7B%7D = encoded {}
url = (
"https://hud.pytorch.org/api/clickhouse/queued_jobs_by_label?parameters=%7B%7D"
)
response = requests.get(url, headers=get_hud_headers()).json()
response = get_queues()

large_queue = filter_long_queues(response)

Expand Down Expand Up @@ -145,6 +188,122 @@ def queuing_alert(dry_run: bool) -> None:
print("No new change for queuing alert")


class AWSAlert(NamedTuple):
queue_info: QueueInfo
alerting_rule: AWSAlertRule
status: str # "FIRING" or "RESOLVED"


def get_aws_alerts(
queues: List[Dict[str, Any]], alert_rules: list[AWSAlertRule]
) -> list[AWSAlert]:
"""
Given a list of queues and alerting rules, return a list of AWSAlert objects
representing the alerts that should be fired or resolved. This is only used
by aws_queue_alert_system and is separated out from the main function to
make it easier to test.
"""
alerts = []
machine_to_queue_map = {q["machine_type"]: q for q in queues}

for alerting_rule in alert_rules:
for machine in alerting_rule.machines:
queue = machine_to_queue_map.get(machine)
if queue is None or not alerting_rule.rule(
queue["count"], queue["avg_queue_s"]
):
# close the alert if it exists
alerts.append(
AWSAlert(
queue_info=QueueInfo(machine, 0, 0),
alerting_rule=alerting_rule,
status="RESOLVED",
)
)
continue
queue_info = QueueInfo(
machine,
queue["count"],
queue["avg_queue_s"] / 3600,
)
print(
f"Alerting rule {alerting_rule.team} matched machine {queue_info.machine} with {queue_info.count} in queue for {queue_info.hours} hours"
)
alerts.append(
AWSAlert(
queue_info=queue_info,
alerting_rule=alerting_rule,
status="FIRING",
)
)

return alerts


def aws_queue_alert_system(dry_run: bool) -> None:
def send_to_aws_alerting_lambda(
team: str,
title: str,
description: str,
alarm_id: str,
state: str,
dry_run: bool,
) -> None:
"""Helper for sending alerts to the AWS alerting lambda function"""
now = datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)
alert = {
"schema_version": 1,
"source": "test-infra-queue-alerts",
"state": state,
"title": title,
"description": description,
"summary": description,
"priority": "P2",
"occurred_at": now,
"teams": [team],
"identity": {
"alarm_id": f"queue_alert_{alarm_id}",
},
"links": {"dashboard_url": "https://hud.pytorch.org/metrics"},
}

data = json.dumps(alert).encode()
headers = {
"Content-Type": "application/json",
"x-test-infra-queue-alerts-signature": os.environ[
"QUEUE_ALERT_AWS_LAMBDA_TOKEN"
],
}
if dry_run:
print(f"Dry run, not sending alert: {json.dumps(alert, indent=2)}")
return
requests.post(
os.environ["AWS_INFRA_ALERTS_LAMBDA_URL"],
data=data,
headers=headers,
)

def get_alarm_id(team: str, machine: str) -> str:
return f"{team}_{machine.replace('.', '_')}"

# The title needs to be the same to close the alert
def gen_title(machine: str) -> str:
return f"[Pytorch] Machine {machine} has a long queue"

alerts = get_aws_alerts(get_queues(), AWS_ALERT_RULES)
for alert in alerts:
send_to_aws_alerting_lambda(
team=alert.alerting_rule.team,
title=gen_title(alert.queue_info.machine),
description=f"Machine {alert.queue_info.machine} has {alert.queue_info.count} jobs in queue for {round(alert.queue_info.hours, 2)} hours",
alarm_id=get_alarm_id(alert.alerting_rule.team, alert.queue_info.machine),
state=alert.status,
dry_run=dry_run,
)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
Expand All @@ -158,3 +317,4 @@ def parse_args() -> argparse.Namespace:
if __name__ == "__main__":
args = parse_args()
queuing_alert(args.dry_run)
aws_queue_alert_system(args.dry_run)
75 changes: 74 additions & 1 deletion tools/torchci/tests/test_queue_alert.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from typing import Any, Dict
from unittest import main, TestCase

from torchci.queue_alert import filter_long_queues, gen_update_comment, QueueInfo
from torchci.queue_alert import (
AWSAlertRule,
filter_long_queues,
gen_update_comment,
get_aws_alerts,
QueueInfo,
)


class TestGitHubPR(TestCase):
Expand Down Expand Up @@ -49,5 +55,72 @@ def test_gen_update_comment(self):
self.assertTrue("- machine2, 2 machines, 3 hours" not in comment)


class TestAWSAlert(TestCase):
def create_queue_row(
self, count: int, avg_queue_s: int, machine_type: str
) -> Dict[str, Any]:
# Helper function to create a queue row dict for testing
return {
"count": count,
"avg_queue_s": avg_queue_s,
"machine_type": machine_type,
}

def test_filter_aws_alerts(self):
# Test that the correct alerts are generated based on the rules and
# queue data, including resolved alerts and 1 rule with multiple machine
db_results = [
self.create_queue_row(1, 1, "machine1"),
self.create_queue_row(2, 2, "machine2"),
self.create_queue_row(3, 3, "machine3"),
]

rules = [
AWSAlertRule(
machines=["machine1", "machine2"],
rule=lambda count, seconds: count > 1 and seconds > 1,
team="team1",
),
AWSAlertRule(
machines=["machine3"],
rule=lambda count, seconds: count > 2 and seconds > 2,
team="team2",
),
]

alerts = get_aws_alerts(db_results, rules)
self.assertEqual(len(alerts), 3)
alerts.sort(key=lambda a: a.queue_info.machine)
self.assertEqual(alerts[0].status, "RESOLVED")
self.assertEqual(alerts[0].queue_info.machine, "machine1")
self.assertEqual(alerts[0].alerting_rule.team, "team1")
self.assertEqual(alerts[1].status, "FIRING")
self.assertEqual(alerts[2].status, "FIRING")

def test_filter_aws_alerts_(self):
# Two teams listed for the same machine, both should get an alert if the
# rule is satisfied

db_results = [
self.create_queue_row(1, 1, "machine1"),
]

rules = [
AWSAlertRule(
machines=["machine1"],
rule=lambda count, seconds: count > 0 and seconds > 0,
team="team1",
),
AWSAlertRule(
machines=["machine1"],
rule=lambda count, seconds: count > 0 and seconds > 0,
team="team2",
),
]

alerts = get_aws_alerts(db_results, rules)
self.assertEqual(len(alerts), 2)


if __name__ == "__main__":
main()