From b34ec4c98668aac4a033cdd5d877540661d9ba2f Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 16 Aug 2017 17:10:06 -0700 Subject: [PATCH] BigQuery: Poll via getQueryResults method. This modifies the QueryJob's Futures interface implementation to poll using getQueryResults instead of jobs.get. This was recommended by BigQuery engineers because getQueryResults does HTTP long-polling for closer to realtime results. --- .../google/cloud/bigquery/dbapi/cursor.py | 13 +-- bigquery/google/cloud/bigquery/job.py | 35 ++++---- bigquery/tests/system.py | 3 +- bigquery/tests/unit/test_dbapi_cursor.py | 3 +- bigquery/tests/unit/test_job.py | 80 ++++++++++++++++--- 5 files changed, 97 insertions(+), 37 deletions(-) diff --git a/bigquery/google/cloud/bigquery/dbapi/cursor.py b/bigquery/google/cloud/bigquery/dbapi/cursor.py index 167afb45e285..a5f04e15c674 100644 --- a/bigquery/google/cloud/bigquery/dbapi/cursor.py +++ b/bigquery/google/cloud/bigquery/dbapi/cursor.py @@ -154,20 +154,13 @@ def execute(self, operation, parameters=None, job_id=None): query_parameters=query_parameters) query_job.use_legacy_sql = False + # Wait for the query to finish. try: - query_results = query_job.result() + query_job = query_job.result() except google.cloud.exceptions.GoogleCloudError: raise exceptions.DatabaseError(query_job.errors) - # Force the iterator to run because the query_results doesn't - # have the total_rows populated. See: - # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3506 - query_iterator = query_results.fetch_data() - try: - six.next(iter(query_iterator)) - except StopIteration: - pass - + query_results = query_job.query_results() self._query_data = iter( query_results.fetch_data(max_results=self.arraysize)) self._set_rowcount(query_results) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 48d440063fa3..a43aeecbb931 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -1085,6 +1085,7 @@ def __init__(self, name, query, client, self.udf_resources = udf_resources self.query_parameters = query_parameters self._configuration = _AsyncQueryConfiguration() + self._query_results = None allow_large_results = _TypedProperty('allow_large_results', bool) """See @@ -1284,23 +1285,25 @@ def query_results(self): :rtype: :class:`~google.cloud.bigquery.query.QueryResults` :returns: results instance """ - from google.cloud.bigquery.query import QueryResults - return QueryResults.from_query_job(self) + if not self._query_results: + self._query_results = self._client.get_query_results(self.name) + return self._query_results - def result(self, timeout=None): - """Start the job and wait for it to complete and get the result. + def done(self): + """Refresh the job and checks if it is complete. - :type timeout: int - :param timeout: How long to wait for job to complete before raising - a :class:`TimeoutError`. + :rtype: bool + :returns: True if the job is complete, False otherwise. + """ + # Do not refresh is the state is already done, as the job will not + # change once complete. + if self.state != _DONE_STATE: + self._query_results = self._client.get_query_results(self.name) - :rtype: :class:`~google.cloud.bigquery.query.QueryResults` - :returns: The query results. + # Only reload the job once we know the query is complete. + # This will ensure that fields such as the destination table are + # correctly populated. + if self._query_results.complete: + self.reload() - :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job - failed or :class:`TimeoutError` if the job did not complete in the - given timeout. - """ - super(QueryJob, self).result(timeout=timeout) - # Return a QueryResults instance instead of returning the job. - return self.query_results() + return self.state == _DONE_STATE diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 3cff1b001731..fab7d4b175bd 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1093,7 +1093,8 @@ def test_async_query_future(self): str(uuid.uuid4()), 'SELECT 1') query_job.use_legacy_sql = False - iterator = query_job.result(timeout=JOB_TIMEOUT).fetch_data() + query_job = query_job.result(timeout=JOB_TIMEOUT) + iterator = query_job.query_results().fetch_data() rows = list(iterator) self.assertEqual(rows, [(1,)]) diff --git a/bigquery/tests/unit/test_dbapi_cursor.py b/bigquery/tests/unit/test_dbapi_cursor.py index 49a332999f7e..7351db8f670b 100644 --- a/bigquery/tests/unit/test_dbapi_cursor.py +++ b/bigquery/tests/unit/test_dbapi_cursor.py @@ -42,7 +42,8 @@ def _mock_job( mock_job = mock.create_autospec(job.QueryJob) mock_job.error_result = None mock_job.state = 'DONE' - mock_job.result.return_value = self._mock_results( + mock_job.result.return_value = mock_job + mock_job.query_results.return_value = self._mock_results( rows=rows, schema=schema, num_dml_affected_rows=num_dml_affected_rows) return mock_job diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 81d07b122eb0..2a324b3ee347 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -171,6 +171,7 @@ def _makeResource(self, started=False, ended=False): } if ended: + resource['status'] = {'state': 'DONE'} resource['statistics']['load']['inputFiles'] = self.INPUT_FILES resource['statistics']['load']['inputFileBytes'] = self.INPUT_BYTES resource['statistics']['load']['outputBytes'] = self.OUTPUT_BYTES @@ -310,6 +311,37 @@ def test_ctor_w_schema(self): schema=[full_name, age]) self.assertEqual(job.schema, [full_name, age]) + def test_done(self): + client = _Client(self.PROJECT) + resource = self._makeResource(ended=True) + job = self._get_target_class().from_api_repr(resource, client) + self.assertTrue(job.done()) + + def test_result(self): + client = _Client(self.PROJECT) + resource = self._makeResource(ended=True) + job = self._get_target_class().from_api_repr(resource, client) + + result = job.result() + + self.assertIs(result, job) + + def test_result_invokes_begins(self): + begun_resource = self._makeResource() + done_resource = copy.deepcopy(begun_resource) + done_resource['status'] = {'state': 'DONE'} + connection = _Connection(begun_resource, done_resource) + client = _Client(self.PROJECT, connection=connection) + table = _Table() + job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + + job.result() + + self.assertEqual(len(connection._requested), 2) + begin_request, reload_request = connection._requested + self.assertEqual(begin_request['method'], 'POST') + self.assertEqual(reload_request['method'], 'GET') + def test_schema_setter_non_list(self): client = _Client(self.PROJECT) table = _Table() @@ -1421,6 +1453,10 @@ def _makeResource(self, started=False, ended=False): started, ended) config = resource['configuration']['query'] config['query'] = self.QUERY + + if ended: + resource['status'] = {'state': 'DONE'} + return resource def _verifyBooleanResourceProperties(self, job, config): @@ -1640,40 +1676,60 @@ def test_cancelled(self): self.assertTrue(job.cancelled()) + def test_done(self): + client = _Client(self.PROJECT) + resource = self._makeResource(ended=True) + job = self._get_target_class().from_api_repr(resource, client) + self.assertTrue(job.done()) + def test_query_results(self): from google.cloud.bigquery.query import QueryResults - client = _Client(self.PROJECT) + query_resource = {'jobComplete': True} + connection = _Connection(query_resource) + client = _Client(self.PROJECT, connection=connection) job = self._make_one(self.JOB_NAME, self.QUERY, client) results = job.query_results() self.assertIsInstance(results, QueryResults) - self.assertIs(results._job, job) - def test_result(self): + def test_query_results_w_cached_value(self): from google.cloud.bigquery.query import QueryResults client = _Client(self.PROJECT) job = self._make_one(self.JOB_NAME, self.QUERY, client) - job._properties['status'] = {'state': 'DONE'} + query_results = QueryResults(None, client) + job._query_results = query_results + + results = job.query_results() + + self.assertIs(results, query_results) + + def test_result(self): + client = _Client(self.PROJECT) + resource = self._makeResource(ended=True) + job = self._get_target_class().from_api_repr(resource, client) result = job.result() - self.assertIsInstance(result, QueryResults) - self.assertIs(result._job, job) + self.assertIs(result, job) def test_result_invokes_begins(self): begun_resource = self._makeResource() + incomplete_resource = {'jobComplete': False} + query_resource = {'jobComplete': True} done_resource = copy.deepcopy(begun_resource) done_resource['status'] = {'state': 'DONE'} - connection = _Connection(begun_resource, done_resource) + connection = _Connection( + begun_resource, incomplete_resource, query_resource, done_resource) client = _Client(self.PROJECT, connection=connection) job = self._make_one(self.JOB_NAME, self.QUERY, client) job.result() - self.assertEqual(len(connection._requested), 2) - begin_request, reload_request = connection._requested + self.assertEqual(len(connection._requested), 4) + begin_request, _, query_request, reload_request = connection._requested self.assertEqual(begin_request['method'], 'POST') + self.assertEqual(query_request['method'], 'GET') self.assertEqual(reload_request['method'], 'GET') def test_result_error(self): @@ -2088,6 +2144,12 @@ def dataset(self, name): return Dataset(name, client=self) + def get_query_results(self, job_id): + from google.cloud.bigquery.query import QueryResults + + resource = self._connection.api_request(method='GET') + return QueryResults.from_api_repr(resource, self) + class _Table(object):