diff --git a/gcloud/bigquery/client.py b/gcloud/bigquery/client.py index 748b60a4d48b..068bdf1967a1 100644 --- a/gcloud/bigquery/client.py +++ b/gcloud/bigquery/client.py @@ -19,6 +19,7 @@ from gcloud.bigquery.connection import Connection from gcloud.bigquery.dataset import Dataset from gcloud.bigquery.job import CopyJob +from gcloud.bigquery.job import ExtractTableToStorageJob from gcloud.bigquery.job import LoadTableFromStorageJob @@ -133,3 +134,22 @@ def copy_table(self, name, destination, *sources): :returns: a new ``CopyJob`` instance """ return CopyJob(name, destination, sources, client=self) + + def extract_table_to_storage(self, name, source, *destination_uris): + """Construct a job for extracting a table into Cloud Storage files. + + :type name: string + :param name: Name of the job. + + :type source: :class:`gcloud.bigquery.table.Table` + :param source: table to be extracted. + + :type destination_uris: sequence of string + :param destination_uris: URIs of CloudStorage file(s) into which + table data is to be extracted. + + :rtype: :class:`gcloud.bigquery.job.ExtractTableToStorageJob` + :returns: a new ``ExtractTableToStorageJob`` instance + """ + return ExtractTableToStorageJob(name, source, destination_uris, + client=self) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index b6abe62fc713..734345463760 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -788,7 +788,7 @@ def _populate_config_resource(self, configuration): configuration['writeDisposition'] = self.write_disposition def _build_resource(self): - """Generate a resource for ``begin``.""" + """Generate a resource for :meth:`begin`.""" resource = { 'jobReference': { 'projectId': self.project, @@ -911,7 +911,7 @@ def _populate_config_resource(self, configuration): configuration['writeDisposition'] = self.write_disposition def _build_resource(self): - """Generate a resource for ``begin``.""" + """Generate a resource for :meth:`begin`.""" source_refs = [{ 'projectId': table.project, @@ -939,3 +939,204 @@ def _build_resource(self): self._populate_config_resource(configuration) return resource + + +class _ExtractConfiguration(object): + """User-settable configuration options for extract jobs.""" + # None -> use server default. + _compression = None + _destination_format = None + _field_delimiter = None + _print_header = None + + +class Compression(_Enum): + """Pseudo-enum for allowed values for ``compression`` properties. + """ + GZIP = 'GZIP' + NONE = 'NONE' + ALLOWED = (GZIP, NONE) + + +class DestinationFormat(_Enum): + """Pseudo-enum for allowed values for ``destination_format`` properties. + """ + CSV = 'CSV' + NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' + AVRO = 'AVRO' + ALLOWED = (CSV, NEWLINE_DELIMITED_JSON, AVRO) + + +class ExtractTableToStorageJob(_BaseJob): + """Asynchronous job: extract data from a BQ table into Cloud Storage. + + :type name: string + :param name: the name of the job + + :type source: :class:`gcloud.bigquery.table.Table` + :param source: Table into which data is to be loaded. + + :type destination_uris: list of string + :param destination_uris: URIs describing Cloud Storage blobs into which + extracted data will be written. + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: A client which holds credentials and project configuration + for the dataset (which requires a project). + """ + def __init__(self, name, source, destination_uris, client): + super(ExtractTableToStorageJob, self).__init__(name, client) + self.source = source + self.destination_uris = destination_uris + self._configuration = _ExtractConfiguration() + + @property + def compression(self): + """Compression to apply to destination blobs. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extract.compression + + :rtype: string, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._compression + + @compression.setter + def compression(self, value): + """Update compression. + + :type value: string + :param value: allowed value for :class:`Compression`. + """ + Compression.validate(value) # raises ValueError if invalie + self._configuration._compression = value + + @compression.deleter + def compression(self): + """Delete compression.""" + del self._configuration._compression + + @property + def destination_format(self): + """Handling for missing destination table. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extract.destinationFormat + + :rtype: string, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._destination_format + + @destination_format.setter + def destination_format(self, value): + """Update destination_format. + + :type value: string + :param value: allowed value for :class:`DestinationFormat`. + """ + DestinationFormat.validate(value) # raises ValueError if invalid + self._configuration._destination_format = value + + @destination_format.deleter + def destination_format(self): + """Delete destination_format.""" + del self._configuration._destination_format + + @property + def field_delimiter(self): + """Allow rows with missing trailing commas for optional fields. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extract.fieldDelimiter + + :rtype: string, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._field_delimiter + + @field_delimiter.setter + def field_delimiter(self, value): + """Update field_delimiter. + + :type value: string + :param value: new field delimiter + + :raises: ValueError for invalid value types. + """ + if not isinstance(value, six.string_types): + raise ValueError("Pass a string") + self._configuration._field_delimiter = value + + @field_delimiter.deleter + def field_delimiter(self): + """Delete field_delimiter.""" + del self._configuration._field_delimiter + + @property + def print_header(self): + """Write a header row into destination blobs. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.extract.printHeader + + :rtype: boolean, or ``NoneType`` + :returns: The value as set by the user, or None (the default). + """ + return self._configuration._print_header + + @print_header.setter + def print_header(self, value): + """Update print_header. + + :type value: boolean + :param value: new print_header + + :raises: ValueError for invalid value types. + """ + if not isinstance(value, bool): + raise ValueError("Pass a boolean") + self._configuration._print_header = value + + @print_header.deleter + def print_header(self): + """Delete print_header.""" + del self._configuration._print_header + + def _populate_config_resource(self, configuration): + """Helper for _build_resource: copy config properties to resource""" + if self.compression is not None: + configuration['compression'] = self.compression + if self.destination_format is not None: + configuration['destinationFormat'] = self.destination_format + if self.field_delimiter is not None: + configuration['fieldDelimiter'] = self.field_delimiter + if self.print_header is not None: + configuration['printHeader'] = self.print_header + + def _build_resource(self): + """Generate a resource for :meth:`begin`.""" + + source_ref = { + 'projectId': self.source.project, + 'datasetId': self.source.dataset_name, + 'tableId': self.source.name, + } + + resource = { + 'jobReference': { + 'projectId': self.project, + 'jobId': self.name, + }, + 'configuration': { + 'extract': { + 'sourceTable': source_ref, + 'destinationUris': self.destination_uris, + }, + }, + } + configuration = resource['configuration']['extract'] + self._populate_config_resource(configuration) + + return resource diff --git a/gcloud/bigquery/test_client.py b/gcloud/bigquery/test_client.py index 653a964c9faf..e1f0d4a3a287 100644 --- a/gcloud/bigquery/test_client.py +++ b/gcloud/bigquery/test_client.py @@ -167,6 +167,25 @@ def test_copy_table(self): self.assertEqual(list(job.sources), [source]) self.assertTrue(job.destination is destination) + def test_extract_table_to_storage(self): + from gcloud.bigquery.job import ExtractTableToStorageJob + PROJECT = 'PROJECT' + JOB = 'job_name' + DATASET = 'dataset_name' + SOURCE = 'source_table' + DESTINATION = 'gs://bucket_name/object_name' + creds = _Credentials() + http = object() + client = self._makeOne(project=PROJECT, credentials=creds, http=http) + dataset = client.dataset(DATASET) + source = dataset.table(SOURCE) + job = client.extract_table_to_storage(JOB, source, DESTINATION) + self.assertTrue(isinstance(job, ExtractTableToStorageJob)) + self.assertTrue(job._client is client) + self.assertEqual(job.name, JOB) + self.assertEqual(job.source, source) + self.assertEqual(list(job.destination_uris), [DESTINATION]) + class _Credentials(object): diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index edf621ad5740..b23b1dd6b9fc 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -979,6 +979,296 @@ def test_reload_w_alternate_client(self): self._verifyResourceProperties(job, RESOURCE) +class TestExtractTableToStorageJob(unittest2.TestCase, _Base): + JOB_TYPE = 'extract' + SOURCE_TABLE = 'source_table' + DESTINATION_URI = 'gs://bucket_name/object_name' + + def _getTargetClass(self): + from gcloud.bigquery.job import ExtractTableToStorageJob + return ExtractTableToStorageJob + + def _verifyResourceProperties(self, job, resource): + self._verifyReadonlyResourceProperties(job, resource) + + config = resource.get('configuration', {}).get('extract') + + if 'compression' in config: + self.assertEqual(job.compression, + config['compression']) + else: + self.assertTrue(job.compression is None) + + if 'destinationFormat' in config: + self.assertEqual(job.destination_format, + config['destinationFormat']) + else: + self.assertTrue(job.destination_format is None) + + if 'fieldDelimiter' in config: + self.assertEqual(job.field_delimiter, + config['fieldDelimiter']) + else: + self.assertTrue(job.field_delimiter is None) + + if 'printHeader' in config: + self.assertEqual(job.print_header, + config['printHeader']) + else: + self.assertTrue(job.print_header is None) + + def test_ctor(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + self.assertEqual(job.source, source) + self.assertEqual(job.destination_uris, [self.DESTINATION_URI]) + self.assertTrue(job._client is client) + self.assertEqual( + job.path, + '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)) + + self._verifyInitialReadonlyProperties(job) + + # set/read from resource['configuration']['copy'] + self.assertTrue(job.compression is None) + self.assertTrue(job.destination_format is None) + self.assertTrue(job.field_delimiter is None) + self.assertTrue(job.print_header is None) + + def test_compression_setter_bad_value(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + with self.assertRaises(ValueError): + job.compression = 'BOGUS' + + def test_compression_setter_deleter(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + job.compression = 'GZIP' + self.assertEqual(job.compression, 'GZIP') + del job.compression + self.assertTrue(job.compression is None) + + def test_destination_format_setter_bad_value(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + with self.assertRaises(ValueError): + job.destination_format = 'BOGUS' + + def test_destination_format_setter_deleter(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + job.destination_format = 'AVRO' + self.assertEqual(job.destination_format, 'AVRO') + del job.destination_format + self.assertTrue(job.destination_format is None) + + def test_field_delimiter_setter_bad_value(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + with self.assertRaises(ValueError): + job.field_delimiter = object() + + def test_field_delimiter_setter_deleter(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + job.field_delimiter = '|' + self.assertEqual(job.field_delimiter, '|') + del job.field_delimiter + self.assertTrue(job.field_delimiter is None) + + def test_print_header_setter_bad_value(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + with self.assertRaises(ValueError): + job.print_header = 'BOGUS' + + def test_print_header_setter_deleter(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + job.print_header = False + self.assertEqual(job.print_header, False) + del job.print_header + self.assertTrue(job.print_header is None) + + def test_begin_w_bound_client(self): + PATH = 'projects/%s/jobs' % self.PROJECT + RESOURCE = self._makeResource() + # Ensure None for missing server-set props + del RESOURCE['statistics']['creationTime'] + del RESOURCE['etag'] + del RESOURCE['selfLink'] + del RESOURCE['user_email'] + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + + job.begin() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'extract': { + 'sourceTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.SOURCE_TABLE + }, + 'destinationUris': [self.DESTINATION_URI], + }, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + + def test_begin_w_alternate_client(self): + PATH = 'projects/%s/jobs' % self.PROJECT + RESOURCE = self._makeResource(ended=True) + EXTRACT_CONFIGURATION = { + 'sourceTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.SOURCE_TABLE, + }, + 'destinationUris': [self.DESTINATION_URI], + 'compression': 'GZIP', + 'destinationFormat': 'NEWLINE_DELIMITED_JSON', + 'fieldDelimiter': '|', + 'printHeader': False, + } + RESOURCE['configuration']['extract'] = EXTRACT_CONFIGURATION + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client1) + + job.compression = 'GZIP' + job.destination_format = 'NEWLINE_DELIMITED_JSON' + job.field_delimiter = '|' + job.print_header = False + + job.begin(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'extract': EXTRACT_CONFIGURATION, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + + def test_exists_miss_w_bound_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + conn = _Connection() + client = _Client(project=self.PROJECT, connection=conn) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + + self.assertFalse(job.exists()) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], {'fields': 'id'}) + + def test_exists_hit_w_alternate_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection({}) + client2 = _Client(project=self.PROJECT, connection=conn2) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client1) + + self.assertTrue(job.exists(client=client2)) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], {'fields': 'id'}) + + def test_reload_w_bound_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + RESOURCE = self._makeResource() + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + + job.reload() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + + def test_reload_w_alternate_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + RESOURCE = self._makeResource() + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + source = _Table(self.SOURCE_TABLE) + job = self._makeOne(self.JOB_NAME, source, [self.DESTINATION_URI], + client1) + + job.reload(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + + class _Client(object): def __init__(self, project='project', connection=None):