Skip to content

Commit 0303f82

Browse files
committed
WIP - Analyst sees that GTFS-RT feeds are updated via workflows
1 parent ea0f44c commit 0303f82

File tree

8 files changed

+192
-0
lines changed

8 files changed

+192
-0
lines changed

iac/cal-itp-data-infra-staging/gtfs-rt-archiver/us/.terraform.lock.hcl

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
provider "google" {
2+
project = "cal-itp-data-infra-staging"
3+
}
4+
5+
terraform {
6+
required_providers {
7+
google = {
8+
version = "~> 6.41.0"
9+
}
10+
}
11+
12+
backend "gcs" {
13+
bucket = "calitp-staging-gcp-components-tfstate"
14+
prefix = "cal-itp-data-infra-staging/gtfs-rt-archiver"
15+
}
16+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
locals {
2+
workflow_path = "${path.module}/../../../../workflows"
3+
}
4+
5+
data "terraform_remote_state" "iam" {
6+
backend = "gcs"
7+
8+
config = {
9+
bucket = "calitp-staging-gcp-components-tfstate"
10+
prefix = "cal-itp-data-infra-staging/iam"
11+
}
12+
}
13+
14+
data "terraform_remote_state" "gcs" {
15+
backend = "gcs"
16+
17+
config = {
18+
bucket = "calitp-staging-gcp-components-tfstate"
19+
prefix = "cal-itp-data-infra-staging/gcs"
20+
}
21+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
resource "google_workflows_workflow" "gtfs-rt-archiver" {
2+
name = "gtfs-rt-archiver"
3+
region = "us-west2"
4+
project = "cal-itp-data-infra-staging"
5+
description = "GTFS-RT Archiver"
6+
service_account = data.terraform_remote_state.iam.outputs.google_service_account_workflow-service-account_email
7+
source_contents = templatefile("${local.workflow_path}/gtfs-rt.yaml", { bucket = data.terraform_remote_state.gcs.outputs.google_storage_bucket_calitp-staging-gtfs-rt-raw-v2_name })
8+
}
9+
10+
resource "google_cloud_scheduler_job" "gtfs-rt-archiver" {
11+
project = "cal-itp-data-infra-staging"
12+
name = "gtfs-rt-archiver"
13+
description = "Cloud Scheduler GTFS-RT Archiver"
14+
schedule = "*/15 * * * *"
15+
time_zone = "America/Los_Angeles"
16+
attempt_deadline = "320s"
17+
region = "us-west2"
18+
19+
http_target {
20+
http_method = "POST"
21+
uri = "https://workflowexecutions.googleapis.com/v1/${google_workflows_workflow.gtfs-rt-archiver.id}/executions"
22+
body = base64encode(
23+
jsonencode({
24+
"argument" : "{}",
25+
"callLogLevel" : "CALL_LOG_LEVEL_UNSPECIFIED"
26+
})
27+
)
28+
29+
oauth_token {
30+
service_account_email = data.terraform_remote_state.iam.outputs.google_service_account_workflow-service-account_email
31+
}
32+
}
33+
}

iac/cal-itp-data-infra-staging/iam/us/outputs.tf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ output "google_service_account_composer-service-account_name" {
190190
value = google_service_account.composer-service-account.name
191191
}
192192

193+
output "google_service_account_workflow-service-account_email" {
194+
value = google_service_account.workflow-service-account.email
195+
}
196+
193197
output "google_service_account_sftp-pod-service-account_id" {
194198
value = google_service_account.sftp-pod-service-account.id
195199
}

iac/cal-itp-data-infra-staging/iam/us/project_iam_member.tf

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,19 @@ resource "google_project_iam_member" "composer-service-account" {
294294
project = "cal-itp-data-infra-staging"
295295
}
296296

297+
resource "google_project_iam_member" "workflow-service-account" {
298+
for_each = toset([
299+
"roles/storage.objectUser",
300+
"roles/bigquery.dataViewer",
301+
"roles/bigquery.jobUser",
302+
"roles/secretmanager.secretAccessor",
303+
"roles/workflows.invoker"
304+
])
305+
role = each.key
306+
member = "serviceAccount:${google_service_account.workflow-service-account.email}"
307+
project = "cal-itp-data-infra-staging"
308+
}
309+
297310
resource "google_project_iam_member" "ms-entra-id-DOT_DDS_Data_Pipeline_and_Warehouse_Users" {
298311
for_each = toset([
299312
"roles/viewer",

iac/cal-itp-data-infra-staging/iam/us/service_account.tf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ resource "google_service_account" "composer-service-account" {
5858
project = "cal-itp-data-infra-staging"
5959
}
6060

61+
resource "google_service_account" "workflow-service-account" {
62+
account_id = "workflow-service-account"
63+
description = "Service account for Workflow"
64+
disabled = "false"
65+
display_name = "workflow"
66+
project = "cal-itp-data-infra-staging"
67+
}
68+
6169
resource "google_service_account" "sftp-pod-service-account" {
6270
account_id = "sftp-pod-service-account"
6371
description = "Service account for sftp server"

workflows/gtfs-rt.yaml

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
main:
2+
params: [args]
3+
steps:
4+
- init:
5+
assign:
6+
- projectId: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
7+
- limit: $${default(map.get(args, "limit"), "1")}
8+
- dataset: staging
9+
- table: int_transit_database__gtfs_datasets_dim
10+
- bucket: calitp-staging-gtfs-rt-raw-v2
11+
- runQuery:
12+
call: googleapis.bigquery.v2.jobs.query
13+
args:
14+
projectId: $${projectId}
15+
body:
16+
useLegacySql: false
17+
useQueryCache: true
18+
timeoutMs: 30000
19+
query: $${
20+
"SELECT pipeline_url, type, base64_url " +
21+
"FROM `" + projectId + "." + dataset + "." + table + "` " +
22+
"WHERE _is_current = true " +
23+
"AND type IN ('service_alerts', 'trip_updates', 'vehicle_positions') " +
24+
"LIMIT " + limit
25+
}
26+
result: queryResult
27+
- fetchInterval:
28+
parallel:
29+
for:
30+
value: fetchSecond
31+
in: [0, 20, 40]
32+
steps:
33+
- wait:
34+
call: sys.sleep
35+
args:
36+
seconds: $${fetchSecond}
37+
- fetchUrls:
38+
parallel:
39+
for:
40+
value: row
41+
in: $${queryResult.rows}
42+
steps:
43+
- fetchInit:
44+
assign:
45+
- fetchPipelineUrl: $${row.f[0].v}
46+
- fetchType: $${row.f[1].v}
47+
- fetchBase64Url: $${row.f[2].v}
48+
- fetchDate: $${text.split(time.format(sys.now()), "T")[0]}
49+
- fetchTime: $${text.split(time.format(sys.now()), "T")[1]}
50+
- fetchHour: $${text.split(fetchTime, ":")[0]}
51+
- fetchMinute: $${text.split(fetchTime, ":")[1]}
52+
- fetchHourTimestamp: $${fetchDate + "T" + fetchHour + ":00:00+00:00"}
53+
- fetchTimestamp: $${fetchDate + "T" + fetchHour + ":" + fetchMinute + ":" + if(fetchSecond == 0, "00", fetchSecond) + "+00:00"}
54+
- fetchUrl:
55+
call: http.get
56+
args:
57+
url: $${fetchPipelineUrl}
58+
timeout: 20
59+
result: fetchResult
60+
- writeResult:
61+
call: http.post
62+
args:
63+
url: $${"https://storage.googleapis.com/upload/storage/v1/b/" + bucket + "/o"}
64+
auth:
65+
type: OAuth2
66+
query:
67+
name: $${
68+
fetchType +
69+
"/dt=" + fetchDate +
70+
"/hour=" + fetchHourTimestamp +
71+
"/ts=" + fetchTimestamp +
72+
"/base64_url=" + fetchBase64Url +
73+
"/feed"
74+
}
75+
body: $${fetchResult.body}

0 commit comments

Comments
 (0)