Skip to content

Commit b9bf1f5

Browse files
committed
Analyst sees that GTFS-RT scraping is enqueued in PubSub
1 parent 7b4155f commit b9bf1f5

File tree

4 files changed

+127
-112
lines changed

4 files changed

+127
-112
lines changed
Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,29 @@
1-
resource "google_workflows_workflow" "gtfs-rt-feed-archiver" {
2-
name = "gtfs-rt-feed-archiver"
3-
region = "us-west2"
4-
project = "cal-itp-data-infra-staging"
5-
description = "GTFS-RT Feed Archiver"
1+
resource "google_pubsub_topic" "gtfs-rt-feed-archiver" {
2+
name = "gtfs-rt-feed-archiver"
3+
project = "cal-itp-data-infra-staging"
4+
}
5+
6+
resource "google_eventarc_trigger" "gtfs-rt-feed-archiver" {
7+
name = "gtfs-rt-feed-archiver"
8+
location = "us-west2"
9+
project = "cal-itp-data-infra-staging"
10+
611
service_account = data.terraform_remote_state.iam.outputs.google_service_account_workflow-service-account_email
7-
source_contents = templatefile("${local.workflow_path}/gtfs-rt-feed-archiver.yaml", {})
812

9-
call_log_level = "LOG_ALL_CALLS"
10-
execution_history_level = "EXECUTION_HISTORY_DETAILED"
13+
matching_criteria {
14+
attribute = "type"
15+
value = "google.cloud.pubsub.topic.v1.messagePublished"
16+
}
1117

12-
user_env_vars = {
13-
"CALITP_BUCKET__GTFS_RT_RAW" = data.terraform_remote_state.gcs.outputs.google_storage_bucket_calitp-staging-gtfs-rt-raw-v2_name
18+
destination {
19+
workflow = google_workflows_workflow.gtfs-rt-feed-archiver.id
1420
}
15-
}
1621

17-
resource "google_cloud_tasks_queue" "gtfs-rt-feed-archiver" {
18-
name = "gtfs-rt-feed-archiver-1"
19-
location = "us-west2"
22+
transport {
23+
pubsub {
24+
topic = google_pubsub_topic.gtfs-rt-feed-archiver.id
25+
}
26+
}
2027
}
2128

2229
resource "google_workflows_workflow" "gtfs-rt-archiver" {
@@ -31,9 +38,7 @@ resource "google_workflows_workflow" "gtfs-rt-archiver" {
3138
execution_history_level = "EXECUTION_HISTORY_DETAILED"
3239

3340
user_env_vars = {
34-
"GTFS_RT_ARCHIVER__TASK_QUEUE" = google_cloud_tasks_queue.gtfs-rt-feed-archiver.id,
35-
"GTFS_RT_ARCHIVER__CHILD_WORKFLOW" = google_workflows_workflow.gtfs-rt-feed-archiver.name,
36-
"GTFS_RT_ARCHIVER__SERVICE_ACCOUNT" = data.terraform_remote_state.iam.outputs.google_service_account_workflow-service-account_email,
41+
"GTFS_RT_ARCHIVER__TOPIC" = google_pubsub_topic.gtfs-rt-feed-archiver.id
3742
}
3843
}
3944

@@ -49,15 +54,26 @@ resource "google_cloud_scheduler_job" "gtfs-rt-archiver" {
4954
http_target {
5055
http_method = "POST"
5156
uri = "https://workflowexecutions.googleapis.com/v1/${google_workflows_workflow.gtfs-rt-archiver.id}/executions"
52-
body = base64encode(
53-
jsonencode({
54-
argument = jsonencode({ limit = 1 }),
55-
callLogLevel = "CALL_LOG_LEVEL_UNSPECIFIED"
56-
})
57-
)
57+
body = base64encode(jsonencode({ argument = jsonencode({ limit = 1 }) }))
5858

5959
oauth_token {
6060
service_account_email = data.terraform_remote_state.iam.outputs.google_service_account_workflow-service-account_email
6161
}
6262
}
6363
}
64+
65+
resource "google_workflows_workflow" "gtfs-rt-feed-archiver" {
66+
name = "gtfs-rt-feed-archiver"
67+
region = "us-west2"
68+
project = "cal-itp-data-infra-staging"
69+
description = "GTFS-RT Feed Archiver"
70+
service_account = data.terraform_remote_state.iam.outputs.google_service_account_workflow-service-account_email
71+
source_contents = templatefile("${local.workflow_path}/gtfs-rt-feed-archiver.yaml", {})
72+
73+
call_log_level = "LOG_ALL_CALLS"
74+
execution_history_level = "EXECUTION_HISTORY_DETAILED"
75+
76+
user_env_vars = {
77+
"CALITP_BUCKET__GTFS_RT_RAW" = data.terraform_remote_state.gcs.outputs.google_storage_bucket_calitp-staging-gtfs-rt-raw-v2_name
78+
}
79+
}

iac/cal-itp-data-infra-staging/iam/us/project_iam_member.tf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,10 +299,10 @@ resource "google_project_iam_member" "workflow-service-account" {
299299
"roles/storage.objectUser",
300300
"roles/bigquery.dataViewer",
301301
"roles/bigquery.jobUser",
302-
"roles/cloudtasks.enqueuer",
302+
"roles/pubsub.publisher",
303303
"roles/secretmanager.secretAccessor",
304304
"roles/workflows.invoker",
305-
"roles/iam.serviceAccountUser"
305+
"roles/iam.serviceAccountTokenCreator"
306306
])
307307
role = each.key
308308
member = "serviceAccount:${google_service_account.workflow-service-account.email}"

workflows/gtfs-rt-archiver.yaml

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@ main:
55
assign:
66
- projectId: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
77
- location: $${sys.get_env("GOOGLE_CLOUD_LOCATION")}
8-
- taskQueue: $${sys.get_env("GTFS_RT_ARCHIVER__TASK_QUEUE")}
9-
- serviceAccount: $${sys.get_env("GTFS_RT_ARCHIVER__SERVICE_ACCOUNT")}
10-
- childWorkflow: $${sys.get_env("GTFS_RT_ARCHIVER__CHILD_WORKFLOW")}
8+
- topic: $${sys.get_env("GTFS_RT_ARCHIVER__TOPIC")}
119
- dataset: staging
1210
- table: int_transit_database__gtfs_datasets_dim
1311
- limit: $${int(default(map.get(args, "limit"), "-1"))}
14-
- startAt: $${time.format(sys.now())}
12+
- startTime: $${sys.now()}
13+
- startAt: $${time.format(startTime)}
1514
- runQuery:
1615
call: googleapis.bigquery.v2.jobs.query
1716
args:
@@ -21,39 +20,49 @@ main:
2120
useQueryCache: true
2221
timeoutMs: 30000
2322
query: $${
24-
"SELECT pipeline_url, type, base64_url, authorization_header_parameter_name, header_secret_key_name, authorization_url_parameter_name, url_secret_key_name " +
23+
"SELECT type, pipeline_url, authorization_header_parameter_name, header_secret_key_name, authorization_url_parameter_name, url_secret_key_name, base64_url " +
2524
"FROM `" + projectId + "." + dataset + "." + table + "` " +
2625
"WHERE _is_current = true " +
2726
"AND type IN ('service_alerts', 'trip_updates', 'vehicle_positions') " +
2827
if(limit < 0, "", "LIMIT " + limit)
2928
}
3029
result: queryResult
31-
- dispatchChildren:
32-
for:
33-
value: row
34-
in: $${queryResult.rows}
35-
steps:
36-
- prepareChildArgs:
37-
assign:
38-
- data:
39-
startAt: $${startAt}
40-
pipelineUrl: $${row.f[0].v}
41-
type: $${row.f[1].v}
42-
base64Url: $${row.f[2].v}
43-
authorizationHeaderParameterName: $${row.f[3].v}
44-
headerSecretKeyName: $${row.f[4].v}
45-
authorizationUrlParameterName: $${row.f[5].v}
46-
urlSecretKeyName: $${row.f[6].v}
47-
- exec:
48-
argument: $${json.encode_to_string(data)}
49-
- enqueueChild:
50-
call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
51-
args:
52-
parent: $${taskQueue}
30+
- publishMessages:
31+
parallel:
32+
for:
33+
value: delay
34+
in: [0, 20, 40]
35+
steps:
36+
- sleep:
37+
call: sys.sleep
38+
args:
39+
seconds: $${delay}
40+
- prepareArgs:
41+
assign:
42+
- messages: []
43+
- prepareMessages:
44+
for:
45+
value: row
46+
in: $${queryResult.rows}
47+
steps:
48+
- prepareChildArgs:
49+
assign:
50+
- data: {}
51+
- data.startAt: $${startAt}
52+
- data.fetchSecond: $${delay}
53+
- data.type: $${row.f[0].v}
54+
- data.pipelineUrl: $${row.f[1].v}
55+
- data.authorizationHeaderParameterName: $${row.f[2].v}
56+
- data.headerSecretKeyName: $${row.f[3].v}
57+
- data.authorizationUrlParameterName: $${row.f[4].v}
58+
- data.urlSecretKeyName: $${row.f[5].v}
59+
- data.base64Url: $${row.f[6].v}
60+
- message: {}
61+
- message.data: $${base64.encode(json.encode(data))}
62+
- messages: $${list.concat(messages, message)}
63+
- publish:
64+
call: googleapis.pubsub.v1.projects.topics.publish
65+
args:
66+
topic: $${topic}
5367
body:
54-
task:
55-
httpRequest:
56-
body: $${base64.encode(json.encode(exec))}
57-
url: $${"https://workflowexecutions.googleapis.com/v1/projects/" + projectId + "/locations/" + location + "/workflows/" + childWorkflow + "/executions"}
58-
oauthToken:
59-
serviceAccountEmail: $${serviceAccount}
68+
messages: $${messages}
Lines changed: 44 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,71 @@
11
main:
2-
params: [args]
2+
params: [event]
33
steps:
44
- init:
55
assign:
66
- projectId: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
77
- bucket: $${sys.get_env("CALITP_BUCKET__GTFS_RT_RAW")}
8-
- base64Url: $${args.base64Url}
9-
- pipelineUrl: $${args.pipelineUrl}
10-
- startAt: $${args.startAt}
11-
- type: $${args.type}
12-
- authorizationUrlParameterName: $${args.authorizationUrlParameterName}
13-
- urlSecretKeyName: $${args.urlSecretKeyName}
14-
- authorizationHeaderParameterName: $${args.authorizationHeaderParameterName}
15-
- headerSecretKeyName: $${args.headerSecretKeyName}
16-
- headers: {}
17-
- query: {}
18-
- fetchDate: $${text.split(startAt, "T")[0]}
19-
- fetchTime: $${text.split(startAt, "T")[1]}
8+
- arguments: $${json.decode(base64.decode(event.data.message.data))}
9+
- fetchDate: $${text.split(arguments.startAt, "T")[0]}
10+
- fetchTime: $${text.split(arguments.startAt, "T")[1]}
2011
- fetchHour: $${text.split(fetchTime, ":")[0]}
2112
- fetchMinute: $${text.split(fetchTime, ":")[1]}
13+
- headers: {}
14+
- query: {}
15+
- userAgentHeaders:
16+
User-Agent: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36'
17+
priority: 'u=0, i'
18+
sec-ch-ua: '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"'
19+
sec-ch-ua-mobile: '?0'
20+
sec-ch-ua-platform: '"macOS"'
21+
2222
- applySecrets:
2323
switch:
24-
- condition: $${headerSecretKeyName != null}
24+
- condition: $${arguments.headerSecretKeyName != null}
2525
steps:
2626
- headerSecret:
2727
call: googleapis.secretmanager.v1.projects.secrets.versions.accessString
2828
args:
2929
project_id: $${projectId}
30-
secret_id: $${headerSecretKeyName}
30+
secret_id: $${arguments.headerSecretKeyName}
3131
result: headerSecret
3232
- assignHeaders:
3333
assign:
3434
- headers:
35-
$${authorizationHeaderParameterName}: $${headerSecret}
36-
- condition: $${urlSecretKeyName != null}
35+
$${arguments.authorizationHeaderParameterName}: $${headerSecret}
36+
- condition: $${arguments.urlSecretKeyName != null}
3737
steps:
3838
- querySecret:
3939
call: googleapis.secretmanager.v1.projects.secrets.versions.accessString
4040
args:
4141
project_id: $${projectId}
42-
secret_id: $${urlSecretKeyName}
43-
result: fetchQuerySecret
42+
secret_id: $${arguments.urlSecretKeyName}
43+
result: querySecret
4444
- assignQuery:
4545
assign:
4646
- query:
47-
$${authorizationUrlParameterName}: $${fetchQuerySecret}
48-
- fetch:
49-
parallel:
50-
for:
51-
value: fetchSecond
52-
in: [0, 20, 40]
53-
steps:
54-
- wait:
55-
call: sys.sleep
56-
args:
57-
seconds: $${fetchSecond}
58-
- fetchUrl:
59-
call: http.get
60-
args:
61-
url: $${pipelineUrl}
62-
timeout: 20
63-
headers: $${headers}
64-
query: $${query}
65-
result: result
66-
- writeResult:
67-
call: http.post
68-
args:
69-
url: $${"https://storage.googleapis.com/upload/storage/v1/b/" + bucket + "/o"}
70-
auth:
71-
type: OAuth2
72-
query:
73-
name: $${
74-
type +
75-
"/dt=" + fetchDate +
76-
"/hour=" + fetchDate + "T" + fetchHour + ":00:00+00:00" +
77-
"/ts=" + fetchDate + "T" + fetchHour + ":" + fetchMinute + ":" + if(fetchSecond == 0, "00", fetchSecond) + "+00:00" +
78-
"/base64_url=" + base64Url +
79-
"/feed"
80-
}
81-
body: $${result.body}
47+
$${arguments.authorizationUrlParameterName}: $${querySecret}
48+
- fetchUrl:
49+
call: http.get
50+
args:
51+
url: $${arguments.pipelineUrl}
52+
timeout: 20
53+
headers: $${map.merge(userAgentHeaders, headers)}
54+
query: $${query}
55+
result: result
56+
- writeResult:
57+
call: http.post
58+
args:
59+
url: $${"https://storage.googleapis.com/upload/storage/v1/b/" + bucket + "/o"}
60+
auth:
61+
type: OAuth2
62+
query:
63+
name: $${
64+
arguments.type +
65+
"/dt=" + fetchDate +
66+
"/hour=" + fetchDate + "T" + fetchHour + ":00:00+00:00" +
67+
"/ts=" + fetchDate + "T" + fetchHour + ":" + fetchMinute + ":" + if(arguments.fetchSecond == 0, "00", arguments.fetchSecond) + "+00:00" +
68+
"/base64_url=" + arguments.base64Url +
69+
"/feed"
70+
}
71+
body: $${result.body}

0 commit comments

Comments
 (0)