Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,11 @@ def is_job_done():
self._retry_do_query = retry_do_query
self._job_retry = job_retry

# If the job hasn't been created, create it now. Related:
# https://github.com/googleapis/python-bigquery/issues/1940
if self.state is None:
self._begin(retry=retry, **done_kwargs)

# Refresh the job status with jobs.get because some of the
# exceptions thrown by jobs.getQueryResults like timeout and
# rateLimitExceeded errors are ambiguous. We want to know if
Expand Down
82 changes: 82 additions & 0 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,85 @@ def test_result_dry_run(self):
self.assertIsNone(result.job_id)
self.assertIsNone(result.query_id)

# If the job doesn't exist, create the job first. Issue:
# https://github.com/googleapis/python-bigquery/issues/1940
def test_result_begin_job_if_not_exist(self):
begun_resource = self._make_resource()
query_running_resource = {
"jobComplete": True,
"jobReference": {
"projectId": self.PROJECT,
"jobId": self.JOB_ID,
"location": "US",
},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"status": {"state": "RUNNING"},
}
query_done_resource = {
"jobComplete": True,
"jobReference": {
"projectId": self.PROJECT,
"jobId": self.JOB_ID,
"location": "US",
},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"status": {"state": "DONE"},
}
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = make_connection(
begun_resource,
query_running_resource,
query_done_resource,
done_resource,
)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)
job._properties["jobReference"]["location"] = "US"

job.result()

create_job_call = mock.call(
method="POST",
path=f"/projects/{self.PROJECT}/jobs",
data={
"jobReference": {
"jobId": self.JOB_ID,
"projectId": self.PROJECT,
"location": "US",
},
"configuration": {
"query": {"useLegacySql": False, "query": self.QUERY},
},
},
timeout=None,
)
reload_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}",
query_params={"projection": "full", "location": "US"},
timeout=DEFAULT_GET_JOB_TIMEOUT,
)
get_query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={
"maxResults": 0,
"location": "US",
},
timeout=None,
)

# assert connection.api_request.call_args == []
connection.api_request.assert_has_calls(
[
create_job_call,
reload_call,
get_query_results_call,
reload_call,
]
)

def test_result_with_done_job_calls_get_query_results(self):
query_resource_done = {
"jobComplete": True,
Expand Down Expand Up @@ -1379,6 +1458,7 @@ def test_result_w_timeout_doesnt_raise(self):
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)
job._properties["jobReference"]["location"] = "US"
job._properties["status"] = {"state": "RUNNING"}

with freezegun.freeze_time("1970-01-01 00:00:00", tick=False):
job.result(
Expand Down Expand Up @@ -1429,6 +1509,7 @@ def test_result_w_timeout_raises_concurrent_futures_timeout(self):
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)
job._properties["jobReference"]["location"] = "US"
job._properties["status"] = {"state": "RUNNING"}

with freezegun.freeze_time(
"1970-01-01 00:00:00", auto_tick_seconds=1.0
Expand Down Expand Up @@ -2319,5 +2400,6 @@ def test_iter(self):
connection = make_connection(begun_resource, query_resource, done_resource)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)
job._properties["status"] = {"state": "RUNNING"}

self.assertIsInstance(iter(job), types.GeneratorType)