From f340de2fe96c8d76ea0fea7246e2b7ff6bd6f27a Mon Sep 17 00:00:00 2001 From: Ryan Miguel Date: Wed, 14 May 2025 15:16:13 -0700 Subject: [PATCH] Include additional info about the integrations used --- .gitignore | 4 + README.md | 8 +- run.py | 224 ++++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 179 insertions(+), 57 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2e80603 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +# ignore any artifacts from running this tool +pagerduty_export* +*.zip +*.tgz diff --git a/README.md b/README.md index e7aa566..5c86ce5 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,8 @@ After running, the script creates a `pagerduty_export/` folder containing: | `schedules.csv` | On-call schedules with layers and user counts. | | `escalation_policies.csv` | Escalation Policies with their linked users and schedules. | | `services.csv` | Services with their alert integrations and linked Escalation Policies. | +| `integration_types.csv` | Summary stats of the integration types used across all services. | +| `integration_details.csv` | Integrations with their type, vendor, and associated service ids. | The resulting set of `.csv` files can be reviewed by you, before sharing with us directly. @@ -58,13 +60,13 @@ Anonymous mode **keeps integration names intact** so we can still understand wha ### Step 2: Run the script ```bash -python3 resource-discovery.py --token YOUR_PD_API_TOKEN +python3 run.py --token YOUR_PD_API_TOKEN ``` To anonymise your export: ```bash -python3 resource-discovery.py --token YOUR_PD_API_TOKEN --anonymise +python3 run.py --token YOUR_PD_API_TOKEN --anonymise ``` ## Sharing with us @@ -72,7 +74,7 @@ python3 resource-discovery.py --token YOUR_PD_API_TOKEN --anonymise Once you’ve run the script: 1. Review the contents of the `.csv` files to ensure you’re happy with the data we’ve pulled. -2. Send the contents of the `pagerduty_export/` folder back to us. +2. Send the contents of the `pagerduty_export_YYYYmmdd_HHMM/` folder back to us. 3. We’ll review the data and share back a detailed migration plan tailored to your setup This process helps us get ahead of any custom configurations or edge cases, and ensures your switch to incident.io is smooth, fast, and well supported.# pd-resource-discovery diff --git a/run.py b/run.py index 4699d5e..2ab6bf1 100644 --- a/run.py +++ b/run.py @@ -2,6 +2,7 @@ import requests import csv import os +from datetime import datetime BASE_URL = "https://api.pagerduty.com" API_HEADERS_TEMPLATE = { @@ -16,7 +17,9 @@ def get_data(endpoint, key, headers, params=None): page_params = {"limit": 100, "offset": offset} if params: page_params.update(params) - resp = requests.get(f"{BASE_URL}/{endpoint}", headers=headers, params=page_params) + resp = requests.get( + f"{BASE_URL}/{endpoint}", headers=headers, params=page_params + ) resp.raise_for_status() result = resp.json() chunk = result.get(key, []) @@ -42,17 +45,40 @@ def anonymize_name(entity_type, index): def extract_summary(api_key, anonymize=False): + runtime = datetime.now().strftime("%Y%m%d_%H%M") headers = dict(API_HEADERS_TEMPLATE) headers["Authorization"] = f"Token token={api_key}" - os.makedirs("pagerduty_export", exist_ok=True) + os.makedirs(f"pagerduty_export_{runtime}", exist_ok=True) - name_maps = {key: {} for key in ["users", "services", "escalation_policies", "schedules", "teams"]} - id_maps = {key: {} for key in ["users", "services", "escalation_policies", "schedules", "teams"]} + name_maps = { + key: {} + for key in [ + "users", + "services", + "escalation_policies", + "schedules", + "teams", + "integrations", + ] + } + id_maps = { + key: {} + for key in [ + "users", + "services", + "escalation_policies", + "schedules", + "teams", + "integrations", + ] + } name_counters = {key: 1 for key in name_maps} def get_anon_name(entity_type, original): if original not in name_maps[entity_type]: - name_maps[entity_type][original] = anonymize_name(entity_type[:-1].capitalize(), name_counters[entity_type]) + name_maps[entity_type][original] = anonymize_name( + entity_type[:-1].capitalize(), name_counters[entity_type] + ) name_counters[entity_type] += 1 return name_maps[entity_type][original] @@ -63,12 +89,18 @@ def get_anon_name(entity_type, original): tid = t.get("id") name = t.get("name", "") id_maps["teams"][tid] = name - team_data.append({ - "id": tid, - "name": get_anon_name("teams", name) if anonymize else name, - "description": "" if anonymize else t.get("description", "") - }) - write_csv("pagerduty_export/teams.csv", team_data, ["id", "name", "description"]) + team_data.append( + { + "id": tid, + "name": get_anon_name("teams", name) if anonymize else name, + "description": "" if anonymize else t.get("description", ""), + } + ) + write_csv( + f"pagerduty_export_{runtime}/teams.csv", + team_data, + ["id", "name", "description"], + ) print("🔍 Fetching users...") users = get_data("users", "users", headers) @@ -78,14 +110,20 @@ def get_anon_name(entity_type, original): name = u.get("name", "") id_maps["users"][uid] = name team_id = u.get("teams", [{}])[0].get("id", "") if u.get("teams") else "" - user_data.append({ - "id": uid, - "name": get_anon_name("users", name), # Always anonymize users - "email": "hidden@example.com", # Always redact email - "role": u.get("role", ""), - "team_id": team_id - }) - write_csv("pagerduty_export/users.csv", user_data, ["id", "name", "email", "role", "team_id"]) + user_data.append( + { + "id": uid, + "name": get_anon_name("users", name), # Always anonymize users + "email": "hidden@example.com", # Always redact email + "role": u.get("role", ""), + "team_id": team_id, + } + ) + write_csv( + f"pagerduty_export_{runtime}/users.csv", + user_data, + ["id", "name", "email", "role", "team_id"], + ) print("🔍 Fetching schedules...") schedules = get_data("schedules", "schedules", headers) @@ -97,16 +135,21 @@ def get_anon_name(entity_type, original): layers = s.get("schedule_layers", []) total_users = sum(len(layer.get("users", [])) for layer in layers) team_id = s.get("teams", [{}])[0].get("id", "") if s.get("teams") else "" - schedule_data.append({ - "id": sid, - "name": get_anon_name("schedules", name) if anonymize else name, - "time_zone": s.get("time_zone", ""), - "num_layers": len(layers), - "total_users": total_users, - "team_id": team_id - }) - write_csv("pagerduty_export/schedules.csv", schedule_data, - ["id", "name", "time_zone", "num_layers", "total_users", "team_id"]) + schedule_data.append( + { + "id": sid, + "name": get_anon_name("schedules", name) if anonymize else name, + "time_zone": s.get("time_zone", ""), + "num_layers": len(layers), + "total_users": total_users, + "team_id": team_id, + } + ) + write_csv( + f"pagerduty_export_{runtime}/schedules.csv", + schedule_data, + ["id", "name", "time_zone", "num_layers", "total_users", "team_id"], + ) print("🔍 Fetching escalation policies...") policies = get_data("escalation_policies", "escalation_policies", headers) @@ -123,22 +166,35 @@ def get_anon_name(entity_type, original): target_id = target.get("id") if target_type == "user_reference" and target_id in id_maps["users"]: user_ids.append(target_id) - elif target_type == "schedule_reference" and target_id in id_maps["schedules"]: + elif ( + target_type == "schedule_reference" + and target_id in id_maps["schedules"] + ): schedule_ids.append(target_id) team_id = p.get("teams", [{}])[0].get("id", "") if p.get("teams") else "" - esc_data.append({ - "id": pid, - "name": get_anon_name("escalation_policies", name) if anonymize else name, - "user_ids": ", ".join(user_ids), - "schedule_ids": ", ".join(schedule_ids), - "team_id": team_id - }) - write_csv("pagerduty_export/escalation_policies.csv", esc_data, - ["id", "name", "user_ids", "schedule_ids", "team_id"]) + esc_data.append( + { + "id": pid, + "name": ( + get_anon_name("escalation_policies", name) if anonymize else name + ), + "user_ids": ", ".join(user_ids), + "schedule_ids": ", ".join(schedule_ids), + "team_id": team_id, + } + ) + write_csv( + f"pagerduty_export_{runtime}/escalation_policies.csv", + esc_data, + ["id", "name", "user_ids", "schedule_ids", "team_id"], + ) print("🔍 Fetching services...") - services = get_data("services", "services", headers, params={"include[]": "integrations"}) + services = get_data( + "services", "services", headers, params={"include[]": "integrations"} + ) service_data = [] + integration_details = [] for svc in services: sid = svc.get("id") name = svc.get("name", "") @@ -146,24 +202,84 @@ def get_anon_name(entity_type, original): ep_id = svc.get("escalation_policy", {}).get("id", "") team_id = svc.get("teams", [{}])[0].get("id", "") if svc.get("teams") else "" integrations = [i.get("summary", "") for i in svc.get("integrations", [])] - service_data.append({ - "id": sid, - "name": get_anon_name("services", name) if anonymize else name, - "escalation_policy_id": ep_id, - "integrations": ", ".join(integrations), - "team_id": team_id - }) - write_csv("pagerduty_export/services.csv", service_data, - ["id", "name", "escalation_policy_id", "integrations", "team_id"]) + service_data.append( + { + "id": sid, + "name": get_anon_name("services", name) if anonymize else name, + "escalation_policy_id": ep_id, + "integrations": ", ".join(integrations), + "team_id": team_id, + } + ) + for intg in svc.get("integrations", []): + intg_id = intg.get("id") + intg_name = intg.get("summary", "") + intg_vendor = intg.get("vendor", None) + id_maps["integrations"][intg_id] = intg_name + integration_details.append( + { + "id": intg_id, + "summary": ( + get_anon_name("integrations", intg_name) + if anonymize + else intg_name + ), + "integration_type": intg.get("type", ""), + "vendor": intg_vendor.get("summary", "") if intg_vendor else "", + "service": sid, + } + ) + # Write service data to CSV + write_csv( + f"pagerduty_export_{runtime}/services.csv", + service_data, + ["id", "name", "escalation_policy_id", "integrations", "team_id"], + ) - print("\n✅ Export complete! Files saved to ./pagerduty_export\n") + # Provide a summary of integration types + integration_types = [i.get("integration_type", "") for i in integration_details] + write_csv( + f"pagerduty_export_{runtime}/integration_types.csv", + [ + {"integration_type": t, "count": integration_types.count(t)} + for t in set(integration_types) + ], + ["integration_type", "count"], + ) + + # Write integration details to a separate CSV + write_csv( + f"pagerduty_export_{runtime}/integration_details.csv", + integration_details, + ["id", "summary", "integration_type", "vendor", "service"], + ) + + print(f"\n✅ Export complete! Files saved to ./pagerduty_export_{runtime}\n") if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Export PagerDuty configuration to CSV") - parser.add_argument("--token", required=True, help="Your PagerDuty API token (read-only)") - parser.add_argument("--anonymise", action="store_true", - help="Replace names with generic labels for services, schedules, policies, and teams") + parser = argparse.ArgumentParser( + description="Export PagerDuty configuration to CSV" + ) + # add mutually exclusive arguments for sourcing the token + group = parser.add_mutually_exclusive_group() + group.add_argument("-t", "--token", help="Your PagerDuty API token (read-only)") + group.add_argument( + "-e", + "--env", + help="Name of environment variable containing your PagerDuty API token (read-only). Default is PAGERDUTY_TOKEN", + default="PAGERDUTY_TOKEN", + ) + parser.add_argument( + "-a", + "--anonymise", + action="store_true", + help="Replace names with generic labels for services, schedules, policies, and teams", + ) args = parser.parse_args() + if args.env: + args.token = os.environ.get(args.env) + if not args.token: + raise ValueError(f"Environment variable '{args.env}' not set or empty.") extract_summary(api_key=args.token, anonymize=args.anonymise)