diff --git a/gcloud/bigquery/client.py b/gcloud/bigquery/client.py index 068bdf1967a1..221a8c41a022 100644 --- a/gcloud/bigquery/client.py +++ b/gcloud/bigquery/client.py @@ -21,6 +21,7 @@ from gcloud.bigquery.job import CopyJob from gcloud.bigquery.job import ExtractTableToStorageJob from gcloud.bigquery.job import LoadTableFromStorageJob +from gcloud.bigquery.job import RunQueryJob class Client(JSONClient): @@ -153,3 +154,17 @@ def extract_table_to_storage(self, name, source, *destination_uris): """ return ExtractTableToStorageJob(name, source, destination_uris, client=self) + + def run_query(self, name, query): + """Construct a job for running a SQL query. + + :type name: string + :param name: Name of the job. + + :type query: string + :param query: SQL query to be executed + + :rtype: :class:`gcloud.bigquery.job.RunQueryJob` + :returns: a new ``RunQueryJob`` instance + """ + return RunQueryJob(name, query, client=self) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index 32e32c452e27..da8589cb5068 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -18,7 +18,9 @@ from gcloud.exceptions import NotFound from gcloud._helpers import _datetime_from_microseconds +from gcloud.bigquery.dataset import Dataset from gcloud.bigquery.table import SchemaField +from gcloud.bigquery.table import Table from gcloud.bigquery.table import _build_schema_resource from gcloud.bigquery.table import _parse_schema_resource @@ -122,6 +124,13 @@ class Encoding(_EnumProperty): ALLOWED = (UTF_8, ISO_8559_1) +class QueryPriority(_EnumProperty): + """Pseudo-enum for ``RunQueryJob.priority`` property.""" + INTERACTIVE = 'INTERACTIVE' + BATCH = 'BATCH' + ALLOWED = (INTERACTIVE, BATCH) + + class SourceFormat(_EnumProperty): """Pseudo-enum for ``source_format`` properties.""" CSV = 'CSV' @@ -403,7 +412,7 @@ class _LoadConfiguration(object): class LoadTableFromStorageJob(_BaseJob): - """Asynchronous job for loading data into a BQ table from CloudStorage. + """Asynchronous job for loading data into a table from CloudStorage. :type name: string :param name: the name of the job @@ -616,7 +625,7 @@ class _CopyConfiguration(object): class CopyJob(_BaseJob): - """Asynchronous job: copy data into a BQ table from other tables. + """Asynchronous job: copy data into a table from other tables. :type name: string :param name: the name of the job @@ -695,7 +704,7 @@ class _ExtractConfiguration(object): class ExtractTableToStorageJob(_BaseJob): - """Asynchronous job: extract data from a BQ table into Cloud Storage. + """Asynchronous job: extract data from a table into Cloud Storage. :type name: string :param name: the name of the job @@ -773,3 +782,140 @@ def _build_resource(self): self._populate_config_resource(configuration) return resource + + +class _QueryConfiguration(object): + """User-settable configuration options for query jobs.""" + # None -> use server default. + _allow_large_results = None + _create_disposition = None + _default_dataset = None + _destination_table = None + _flatten_results = None + _priority = None + _use_query_cache = None + _write_disposition = None + + +class RunQueryJob(_BaseJob): + """Asynchronous job: query tables. + + :type name: string + :param name: the name of the job + + :type query: string + :param query: SQL query string + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: A client which holds credentials and project configuration + for the dataset (which requires a project). + """ + def __init__(self, name, query, client): + super(RunQueryJob, self).__init__(name, client) + self.query = query + self._configuration = _QueryConfiguration() + + allow_large_results = _TypedProperty('allow_large_results', bool) + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.allowLargeResults + """ + + create_disposition = CreateDisposition('create_disposition') + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition + """ + + default_dataset = _TypedProperty('default_dataset', Dataset) + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.default_dataset + """ + + destination_table = _TypedProperty('destination_table', Table) + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.destinationTable + """ + + flatten_results = _TypedProperty('flatten_results', bool) + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.flattenResults + """ + + priority = QueryPriority('priority') + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.priority + """ + + use_query_cache = _TypedProperty('use_query_cache', bool) + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.useQueryCache + """ + + write_disposition = WriteDisposition('write_disposition') + """See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition + """ + + def _destination_table_resource(self): + if self.destination_table is not None: + return { + 'projectId': self.destination_table.project, + 'datasetId': self.destination_table.dataset_name, + 'tableId': self.destination_table.name, + } + + def _populate_config_resource(self, configuration): + """Helper for _build_resource: copy config properties to resource""" + if self.allow_large_results is not None: + configuration['allowLargeResults'] = self.allow_large_results + if self.create_disposition is not None: + configuration['createDisposition'] = self.create_disposition + if self.default_dataset is not None: + configuration['defaultDataset'] = { + 'projectId': self.default_dataset.project, + 'datasetId': self.default_dataset.name, + } + if self.destination_table is not None: + table_res = self._destination_table_resource() + configuration['destinationTable'] = table_res + if self.flatten_results is not None: + configuration['flattenResults'] = self.flatten_results + if self.priority is not None: + configuration['priority'] = self.priority + if self.use_query_cache is not None: + configuration['useQueryCache'] = self.use_query_cache + if self.write_disposition is not None: + configuration['writeDisposition'] = self.write_disposition + + def _build_resource(self): + """Generate a resource for :meth:`begin`.""" + + resource = { + 'jobReference': { + 'projectId': self.project, + 'jobId': self.name, + }, + 'configuration': { + 'query': { + 'query': self.query, + }, + }, + } + configuration = resource['configuration']['query'] + self._populate_config_resource(configuration) + + return resource + + def _scrub_local_properties(self, cleaned): + """Helper: handle subclass properties in cleaned.""" + configuration = cleaned['configuration']['query'] + dest_remote = configuration.get('destinationTable') + + if dest_remote is None: + if self.destination_table is not None: + del self.destination_table + else: + dest_local = self._destination_table_resource() + if dest_remote != dest_local: + assert dest_remote['projectId'] == self.project + dataset = self._client.dataset(dest_remote['datasetId']) + self.destination_table = dataset.table(dest_remote['tableId']) diff --git a/gcloud/bigquery/test_client.py b/gcloud/bigquery/test_client.py index e1f0d4a3a287..16007c54b80e 100644 --- a/gcloud/bigquery/test_client.py +++ b/gcloud/bigquery/test_client.py @@ -186,6 +186,20 @@ def test_extract_table_to_storage(self): self.assertEqual(job.source, source) self.assertEqual(list(job.destination_uris), [DESTINATION]) + def test_run_query(self): + from gcloud.bigquery.job import RunQueryJob + PROJECT = 'PROJECT' + JOB = 'job_name' + QUERY = 'select count(*) from persons' + creds = _Credentials() + http = object() + client = self._makeOne(project=PROJECT, credentials=creds, http=http) + job = client.run_query(JOB, QUERY) + self.assertTrue(isinstance(job, RunQueryJob)) + self.assertTrue(job._client is client) + self.assertEqual(job.name, JOB) + self.assertEqual(job.query, QUERY) + class _Credentials(object): diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index cbed14d4f1a6..950fa1d7902b 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -1,3 +1,4 @@ +# pylint: disable=C0302 # Copyright 2015 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -1086,12 +1087,282 @@ def test_reload_w_alternate_client(self): self._verifyResourceProperties(job, RESOURCE) +class TestRunQueryJob(unittest2.TestCase, _Base): + JOB_TYPE = 'query' + QUERY = 'select count(*) from persons' + + def _getTargetClass(self): + from gcloud.bigquery.job import RunQueryJob + return RunQueryJob + + def _verifyBooleanResourceProperties(self, job, config): + + if 'allowLargeResults' in config: + self.assertEqual(job.allow_large_results, + config['allowLargeResults']) + else: + self.assertTrue(job.allow_large_results is None) + if 'flattenResults' in config: + self.assertEqual(job.flatten_results, + config['flattenResults']) + else: + self.assertTrue(job.flatten_results is None) + if 'useQueryCache' in config: + self.assertEqual(job.use_query_cache, + config['useQueryCache']) + else: + self.assertTrue(job.use_query_cache is None) + + def _verifyResourceProperties(self, job, resource): + self._verifyReadonlyResourceProperties(job, resource) + + config = resource.get('configuration', {}).get('query') + self._verifyBooleanResourceProperties(job, config) + + if 'createDisposition' in config: + self.assertEqual(job.create_disposition, + config['createDisposition']) + else: + self.assertTrue(job.create_disposition is None) + if 'defaultDataset' in config: + dataset = job.default_dataset + ds_ref = { + 'projectId': dataset.project, + 'datasetId': dataset.name, + } + self.assertEqual(ds_ref, config['defaultDataset']) + else: + self.assertTrue(job.default_dataset is None) + if 'destinationTable' in config: + table = job.destination_table + tb_ref = { + 'projectId': table.project, + 'datasetId': table.dataset_name, + 'tableId': table.name + } + self.assertEqual(tb_ref, config['destinationTable']) + else: + self.assertTrue(job.destination_table is None) + if 'priority' in config: + self.assertEqual(job.priority, + config['priority']) + else: + self.assertTrue(job.priority is None) + if 'writeDisposition' in config: + self.assertEqual(job.write_disposition, + config['writeDisposition']) + else: + self.assertTrue(job.write_disposition is None) + + def test_ctor(self): + client = _Client(self.PROJECT) + job = self._makeOne(self.JOB_NAME, self.QUERY, client) + self.assertEqual(job.query, self.QUERY) + self.assertTrue(job._client is client) + self.assertEqual( + job.path, + '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)) + + self._verifyInitialReadonlyProperties(job) + + # set/read from resource['configuration']['copy'] + self.assertTrue(job.allow_large_results is None) + self.assertTrue(job.create_disposition is None) + self.assertTrue(job.default_dataset is None) + self.assertTrue(job.destination_table is None) + self.assertTrue(job.flatten_results is None) + self.assertTrue(job.priority is None) + self.assertTrue(job.use_query_cache is None) + self.assertTrue(job.write_disposition is None) + + def test_begin_w_bound_client(self): + PATH = 'projects/%s/jobs' % self.PROJECT + RESOURCE = self._makeResource() + # Ensure None for missing server-set props + del RESOURCE['statistics']['creationTime'] + del RESOURCE['etag'] + del RESOURCE['selfLink'] + del RESOURCE['user_email'] + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + job = self._makeOne(self.JOB_NAME, self.QUERY, client) + + job.begin() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'query': { + 'query': self.QUERY, + }, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + + def test_begin_w_alternate_client(self): + from gcloud.bigquery.dataset import Dataset + from gcloud.bigquery.dataset import Table + PATH = 'projects/%s/jobs' % self.PROJECT + TABLE = 'TABLE' + DS_NAME = 'DATASET' + RESOURCE = self._makeResource(ended=True) + QUERY_CONFIGURATION = { + 'query': self.QUERY, + 'allowLargeResults': True, + 'createDisposition': 'CREATE_NEVER', + 'defaultDataset': { + 'projectId': self.PROJECT, + 'datasetId': DS_NAME, + }, + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': DS_NAME, + 'tableId': TABLE, + }, + 'flattenResults': True, + 'priority': 'INTERACTIVE', + 'useQueryCache': True, + 'writeDisposition': 'WRITE_TRUNCATE', + } + RESOURCE['configuration']['query'] = QUERY_CONFIGURATION + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + job = self._makeOne(self.JOB_NAME, self.QUERY, client1) + + dataset = Dataset(DS_NAME, client1) + table = Table(TABLE, dataset) + + job.allow_large_results = True + job.create_disposition = 'CREATE_NEVER' + job.default_dataset = dataset + job.destination_table = table + job.flatten_results = True + job.priority = 'INTERACTIVE' + job.use_query_cache = True + job.write_disposition = 'WRITE_TRUNCATE' + + job.begin(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'query': QUERY_CONFIGURATION, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + + def test_exists_miss_w_bound_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + conn = _Connection() + client = _Client(project=self.PROJECT, connection=conn) + job = self._makeOne(self.JOB_NAME, self.QUERY, client) + + self.assertFalse(job.exists()) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], {'fields': 'id'}) + + def test_exists_hit_w_alternate_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection({}) + client2 = _Client(project=self.PROJECT, connection=conn2) + job = self._makeOne(self.JOB_NAME, self.QUERY, client1) + + self.assertTrue(job.exists(client=client2)) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], {'fields': 'id'}) + + def test_reload_w_bound_client(self): + from gcloud.bigquery.dataset import Dataset + from gcloud.bigquery.dataset import Table + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + DS_NAME = 'DATASET' + DEST_TABLE = 'dest_table' + RESOURCE = self._makeResource() + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + job = self._makeOne(self.JOB_NAME, self.QUERY, client) + + dataset = Dataset(DS_NAME, client) + table = Table(DEST_TABLE, dataset) + job.destination_table = table + + job.reload() + + self.assertEqual(job.destination_table, None) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + + def test_reload_w_alternate_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + DS_NAME = 'DATASET' + DEST_TABLE = 'dest_table' + RESOURCE = self._makeResource() + q_config = RESOURCE['configuration']['query'] + q_config['destinationTable'] = { + 'projectId': self.PROJECT, + 'datasetId': DS_NAME, + 'tableId': DEST_TABLE, + } + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + job = self._makeOne(self.JOB_NAME, self.QUERY, client1) + + job.reload(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + + class _Client(object): def __init__(self, project='project', connection=None): self.project = project self.connection = connection + def dataset(self, name): + from gcloud.bigquery.dataset import Dataset + return Dataset(name, client=self) + class _Table(object): diff --git a/system_tests/bigquery.py b/system_tests/bigquery.py index 46bc6050fa13..1ae32c36bdf5 100644 --- a/system_tests/bigquery.py +++ b/system_tests/bigquery.py @@ -301,6 +301,8 @@ def test_load_table_from_storage_then_dump_table(self): if job.state not in ('DONE', 'done'): time.sleep(10) + self.assertTrue(job.state in ('DONE', 'done')) + rows, _, _ = table.fetch_data() by_age = operator.itemgetter(1) self.assertEqual(sorted(rows, key=by_age),