Skip to content

Commit 3bb0598

Browse files
authored
Prefer BigQueryInsertJobOperator's project_id over hook's project_id for openlineage (#55948)
* Prefer operator's project_id over hook's project_id * Remove unused import * Formatting * Remove unnecessary arguments * Make mypy happy
1 parent 6a71094 commit 3bb0598

File tree

2 files changed

+37
-1
lines changed
  • providers/google

2 files changed

+37
-1
lines changed

providers/google/src/airflow/providers/google/cloud/openlineage/mixins.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ def get_openlineage_facets_on_complete(self, _):
9797
run_facets: dict[str, RunFacet] = {
9898
"externalQuery": ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
9999
}
100-
self._client = self.hook.get_client(project_id=self.hook.project_id, location=self.location)
100+
self._client = self.hook.get_client(
101+
project_id=self.project_id or self.hook.project_id, location=self.location
102+
)
101103
try:
102104
job_properties = self._client.get_job(job_id=self.job_id)._properties
103105

providers/google/tests/unit/google/cloud/openlineage/test_mixins.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,3 +1013,37 @@ def test_generate_column_lineage_facet(self):
10131013
),
10141014
}
10151015
)
1016+
1017+
def test_project_id_selection(self):
1018+
"""
1019+
Check if project_id set via an argument to the operator takes prevalence over project_id set in a
1020+
connection.
1021+
"""
1022+
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
1023+
1024+
class TestOperator(GoogleCloudBaseOperator, _BigQueryInsertJobOperatorOpenLineageMixin):
1025+
def __init__(self, project_id: str | None = None, **_):
1026+
self.project_id = project_id
1027+
self.job_id = "foobar"
1028+
self.location = "foobar"
1029+
self.sql = "foobar"
1030+
1031+
# First test task where project_id is set explicitly
1032+
test = TestOperator(project_id="project_a")
1033+
test.hook = MagicMock()
1034+
test.hook.project_id = "project_b"
1035+
test._client = MagicMock()
1036+
1037+
test.get_openlineage_facets_on_complete(None)
1038+
_, kwargs = test.hook.get_client.call_args
1039+
assert kwargs["project_id"] == "project_a"
1040+
1041+
# Then test task where project_id is inherited from the hook
1042+
test = TestOperator()
1043+
test.hook = MagicMock()
1044+
test.hook.project_id = "project_b"
1045+
test._client = MagicMock()
1046+
1047+
test.get_openlineage_facets_on_complete(None)
1048+
_, kwargs = test.hook.get_client.call_args
1049+
assert kwargs["project_id"] == "project_b"

0 commit comments

Comments
 (0)