diff --git a/.github/workflows/check-alerts.yml b/.github/workflows/check-alerts.yml index 488d005e53..05931dce82 100644 --- a/.github/workflows/check-alerts.yml +++ b/.github/workflows/check-alerts.yml @@ -68,3 +68,5 @@ jobs: # NOTE: Should be a blank string for pull requests GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} HUD_API_TOKEN: ${{ secrets.HUD_API_TOKEN }} + AWS_INFRA_ALERTS_LAMBDA_URL: ${{ secrets.AWS_INFRA_ALERTS_LAMBDA_URL }} + QUEUE_ALERT_AWS_LAMBDA_TOKEN: ${{ secrets.QUEUE_ALERT_AWS_LAMBDA_TOKEN }} diff --git a/tools/torchci/queue_alert.py b/tools/torchci/queue_alert.py index 0ed99b7f40..503fd4d043 100644 --- a/tools/torchci/queue_alert.py +++ b/tools/torchci/queue_alert.py @@ -1,8 +1,11 @@ import argparse +import datetime +import json import os import re +from functools import lru_cache from pathlib import Path -from typing import Any, Dict, List, NamedTuple +from typing import Any, Callable, Dict, List, NamedTuple import requests from setuptools import distutils # type: ignore[import] @@ -39,6 +42,39 @@ ] +class AWSAlertRule(NamedTuple): + machines: list[str] + rule: Callable[[int, int], bool] + team: str + + +# Rules for alerts sent via the new AWS alerting system in +# https://github.com/pytorch/alerting-infra. Each machine gets its own alert +# even if it is the same rule +AWS_ALERT_RULES = [ + AWSAlertRule( + machines=[ + "linux.rocm.gfx942.docker-cache", + "linux.rocm.gpu.2", + "linux.rocm.gpu.4", + "linux.rocm.gpu.gfx1100", + "linux.rocm.gpu.gfx942.1", + "linux.rocm.gpu.gfx942.1.b", + "linux.rocm.gpu.gfx942.2", + "linux.rocm.gpu.gfx942.4", + "linux.rocm.gpu.gfx942.4.b", + "linux.rocm.gpu.gfx942.8", + "linux.rocm.gpu.mi250", + "linux.rocm.gpu.mi250.1", + "linux.rocm.gpu.mi250.4", + "linux.rocm.gpu.mi355.1", + ], + rule=lambda count, seconds: count > 20 and seconds > 1 * 60 * 60, + team="rocm-queue", + ), +] + + class QueueInfo(NamedTuple): machine: str count: int @@ -85,6 +121,17 @@ def gen_issue(queues: List[QueueInfo]) -> Any: return issue +@lru_cache +def get_queues() -> List[Dict[str, Any]]: + # %7B%7D = encoded {} + url = ( + "https://hud.pytorch.org/api/clickhouse/queued_jobs_by_label?parameters=%7B%7D" + ) + response = requests.get(url, headers=get_hud_headers()) + response.raise_for_status() + return response.json() + + def filter_long_queues(db_result: List[Dict[str, Any]]) -> List[QueueInfo]: large_queue: List[QueueInfo] = [] @@ -106,11 +153,7 @@ def filter_long_queues(db_result: List[Dict[str, Any]]) -> List[QueueInfo]: def queuing_alert(dry_run: bool) -> None: - # %7B%7D = encoded {} - url = ( - "https://hud.pytorch.org/api/clickhouse/queued_jobs_by_label?parameters=%7B%7D" - ) - response = requests.get(url, headers=get_hud_headers()).json() + response = get_queues() large_queue = filter_long_queues(response) @@ -145,6 +188,122 @@ def queuing_alert(dry_run: bool) -> None: print("No new change for queuing alert") +class AWSAlert(NamedTuple): + queue_info: QueueInfo + alerting_rule: AWSAlertRule + status: str # "FIRING" or "RESOLVED" + + +def get_aws_alerts( + queues: List[Dict[str, Any]], alert_rules: list[AWSAlertRule] +) -> list[AWSAlert]: + """ + Given a list of queues and alerting rules, return a list of AWSAlert objects + representing the alerts that should be fired or resolved. This is only used + by aws_queue_alert_system and is separated out from the main function to + make it easier to test. + """ + alerts = [] + machine_to_queue_map = {q["machine_type"]: q for q in queues} + + for alerting_rule in alert_rules: + for machine in alerting_rule.machines: + queue = machine_to_queue_map.get(machine) + if queue is None or not alerting_rule.rule( + queue["count"], queue["avg_queue_s"] + ): + # close the alert if it exists + alerts.append( + AWSAlert( + queue_info=QueueInfo(machine, 0, 0), + alerting_rule=alerting_rule, + status="RESOLVED", + ) + ) + continue + queue_info = QueueInfo( + machine, + queue["count"], + queue["avg_queue_s"] / 3600, + ) + print( + f"Alerting rule {alerting_rule.team} matched machine {queue_info.machine} with {queue_info.count} in queue for {queue_info.hours} hours" + ) + alerts.append( + AWSAlert( + queue_info=queue_info, + alerting_rule=alerting_rule, + status="FIRING", + ) + ) + + return alerts + + +def aws_queue_alert_system(dry_run: bool) -> None: + def send_to_aws_alerting_lambda( + team: str, + title: str, + description: str, + alarm_id: str, + state: str, + dry_run: bool, + ) -> None: + """Helper for sending alerts to the AWS alerting lambda function""" + now = datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + alert = { + "schema_version": 1, + "source": "test-infra-queue-alerts", + "state": state, + "title": title, + "description": description, + "summary": description, + "priority": "P2", + "occurred_at": now, + "teams": [team], + "identity": { + "alarm_id": f"queue_alert_{alarm_id}", + }, + "links": {"dashboard_url": "https://hud.pytorch.org/metrics"}, + } + + data = json.dumps(alert).encode() + headers = { + "Content-Type": "application/json", + "x-test-infra-queue-alerts-signature": os.environ[ + "QUEUE_ALERT_AWS_LAMBDA_TOKEN" + ], + } + if dry_run: + print(f"Dry run, not sending alert: {json.dumps(alert, indent=2)}") + return + requests.post( + os.environ["AWS_INFRA_ALERTS_LAMBDA_URL"], + data=data, + headers=headers, + ) + + def get_alarm_id(team: str, machine: str) -> str: + return f"{team}_{machine.replace('.', '_')}" + + # The title needs to be the same to close the alert + def gen_title(machine: str) -> str: + return f"[Pytorch] Machine {machine} has a long queue" + + alerts = get_aws_alerts(get_queues(), AWS_ALERT_RULES) + for alert in alerts: + send_to_aws_alerting_lambda( + team=alert.alerting_rule.team, + title=gen_title(alert.queue_info.machine), + description=f"Machine {alert.queue_info.machine} has {alert.queue_info.count} jobs in queue for {round(alert.queue_info.hours, 2)} hours", + alarm_id=get_alarm_id(alert.alerting_rule.team, alert.queue_info.machine), + state=alert.status, + dry_run=dry_run, + ) + + def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument( @@ -158,3 +317,4 @@ def parse_args() -> argparse.Namespace: if __name__ == "__main__": args = parse_args() queuing_alert(args.dry_run) + aws_queue_alert_system(args.dry_run) diff --git a/tools/torchci/tests/test_queue_alert.py b/tools/torchci/tests/test_queue_alert.py index f100cbed34..8a75e645ff 100644 --- a/tools/torchci/tests/test_queue_alert.py +++ b/tools/torchci/tests/test_queue_alert.py @@ -1,7 +1,13 @@ from typing import Any, Dict from unittest import main, TestCase -from torchci.queue_alert import filter_long_queues, gen_update_comment, QueueInfo +from torchci.queue_alert import ( + AWSAlertRule, + filter_long_queues, + gen_update_comment, + get_aws_alerts, + QueueInfo, +) class TestGitHubPR(TestCase): @@ -49,5 +55,72 @@ def test_gen_update_comment(self): self.assertTrue("- machine2, 2 machines, 3 hours" not in comment) +class TestAWSAlert(TestCase): + def create_queue_row( + self, count: int, avg_queue_s: int, machine_type: str + ) -> Dict[str, Any]: + # Helper function to create a queue row dict for testing + return { + "count": count, + "avg_queue_s": avg_queue_s, + "machine_type": machine_type, + } + + def test_filter_aws_alerts(self): + # Test that the correct alerts are generated based on the rules and + # queue data, including resolved alerts and 1 rule with multiple machine + db_results = [ + self.create_queue_row(1, 1, "machine1"), + self.create_queue_row(2, 2, "machine2"), + self.create_queue_row(3, 3, "machine3"), + ] + + rules = [ + AWSAlertRule( + machines=["machine1", "machine2"], + rule=lambda count, seconds: count > 1 and seconds > 1, + team="team1", + ), + AWSAlertRule( + machines=["machine3"], + rule=lambda count, seconds: count > 2 and seconds > 2, + team="team2", + ), + ] + + alerts = get_aws_alerts(db_results, rules) + self.assertEqual(len(alerts), 3) + alerts.sort(key=lambda a: a.queue_info.machine) + self.assertEqual(alerts[0].status, "RESOLVED") + self.assertEqual(alerts[0].queue_info.machine, "machine1") + self.assertEqual(alerts[0].alerting_rule.team, "team1") + self.assertEqual(alerts[1].status, "FIRING") + self.assertEqual(alerts[2].status, "FIRING") + + def test_filter_aws_alerts_(self): + # Two teams listed for the same machine, both should get an alert if the + # rule is satisfied + + db_results = [ + self.create_queue_row(1, 1, "machine1"), + ] + + rules = [ + AWSAlertRule( + machines=["machine1"], + rule=lambda count, seconds: count > 0 and seconds > 0, + team="team1", + ), + AWSAlertRule( + machines=["machine1"], + rule=lambda count, seconds: count > 0 and seconds > 0, + team="team2", + ), + ] + + alerts = get_aws_alerts(db_results, rules) + self.assertEqual(len(alerts), 2) + + if __name__ == "__main__": main()