diff --git a/gcloud/bigquery/client.py b/gcloud/bigquery/client.py index 25b5cdcd1567..748b60a4d48b 100644 --- a/gcloud/bigquery/client.py +++ b/gcloud/bigquery/client.py @@ -18,6 +18,7 @@ from gcloud.client import JSONClient from gcloud.bigquery.connection import Connection from gcloud.bigquery.dataset import Dataset +from gcloud.bigquery.job import CopyJob from gcloud.bigquery.job import LoadTableFromStorageJob @@ -115,3 +116,20 @@ def load_table_from_storage(self, name, destination, *source_uris): """ return LoadTableFromStorageJob(name, destination, source_uris, client=self) + + def copy_table(self, name, destination, *sources): + """Construct a job for copying one or more tables into another table. + + :type name: string + :param name: Name of the job. + + :type destination: :class:`gcloud.bigquery.table.Table` + :param destination: Table into which data is to be copied. + + :type sources: sequence of :class:`gcloud.bigquery.table.Table` + :param sources: tables to be copied. + + :rtype: :class:`gcloud.bigquery.job.CopyJob` + :returns: a new ``CopyJob`` instance + """ + return CopyJob(name, destination, sources, client=self) diff --git a/gcloud/bigquery/job.py b/gcloud/bigquery/job.py index c48a3bc96cab..b6abe62fc713 100644 --- a/gcloud/bigquery/job.py +++ b/gcloud/bigquery/job.py @@ -23,96 +23,20 @@ from gcloud.bigquery.table import _parse_schema_resource -class _Enum(object): - """Psedo-enumeration class. - - Subclasses must define ``ALLOWED`` as a class-level constant: it must - be a sequence of strings. - """ - @classmethod - def validate(cls, value): - """Check that ``value`` is one of the allowed values. - - :raises: ValueError if value is not allowed. - """ - if value not in cls.ALLOWED: - raise ValueError('Pass one of: %s' ', '.join(cls.ALLOWED)) - - -class CreateDisposition(_Enum): - """Pseudo-enum for allowed values for ``create_disposition`` properties. - """ - CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' - CREATE_NEVER = 'CREATE_NEVER' - ALLOWED = (CREATE_IF_NEEDED, CREATE_NEVER) - - -class Encoding(_Enum): - """Pseudo-enum for allowed values for ``encoding`` properties.""" - UTF_8 = 'UTF-8' - ISO_8559_1 = 'ISO-8559-1' - ALLOWED = (UTF_8, ISO_8559_1) - - -class SourceFormat(_Enum): - """Pseudo-enum for allowed values for ``source_format`` properties.""" - CSV = 'CSV' - DATASTORE_BACKUP = 'DATASTORE_BACKUP' - NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' - ALLOWED = (CSV, DATASTORE_BACKUP, NEWLINE_DELIMITED_JSON) - - -class WriteDisposition(_Enum): - """Pseudo-enum for allowed values for ``write_disposition`` properties.""" - WRITE_APPEND = 'WRITE_APPEND' - WRITE_TRUNCATE = 'WRITE_TRUNCATE' - WRITE_EMPTY = 'WRITE_EMPTY' - ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY) - - -class _LoadConfiguration(object): - """User-settable configuration options for load jobs.""" - # None -> use server default. - _allow_jagged_rows = None - _allow_quoted_newlines = None - _create_disposition = None - _encoding = None - _field_delimiter = None - _ignore_unknown_values = None - _max_bad_records = None - _quote_character = None - _skip_leading_rows = None - _source_format = None - _write_disposition = None - - -class LoadTableFromStorageJob(object): - """Asynchronous job for loading data into a BQ table from CloudStorage. +class _BaseJob(object): + """Base class for asynchronous jobs. :type name: string :param name: the name of the job - :type destination: :class:`gcloud.bigquery.table.Table` - :param destination: Table into which data is to be loaded. - - :type source_uris: sequence of string - :param source_uris: URIs of data files to be loaded. - :type client: :class:`gcloud.bigquery.client.Client` :param client: A client which holds credentials and project configuration for the dataset (which requires a project). - - :type schema: list of :class:`gcloud.bigquery.table.SchemaField` - :param schema: The job's schema """ - def __init__(self, name, destination, source_uris, client, schema=()): + def __init__(self, name, client): self.name = name - self.destination = destination self._client = client - self.source_uris = source_uris - self.schema = schema self._properties = {} - self._configuration = _LoadConfiguration() @property def project(self): @@ -132,29 +56,6 @@ def path(self): """ return '/projects/%s/jobs/%s' % (self.project, self.name) - @property - def schema(self): - """Table's schema. - - :rtype: list of :class:`SchemaField` - :returns: fields describing the schema - """ - return list(self._schema) - - @schema.setter - def schema(self, value): - """Update table's schema - - :type value: list of :class:`SchemaField` - :param value: fields describing the schema - - :raises: TypeError if 'value' is not a sequence, or ValueError if - any item in the sequence is not a SchemaField - """ - if not all(isinstance(field, SchemaField) for field in value): - raise ValueError('Schema items must be fields') - self._schema = tuple(value) - @property def etag(self): """ETag for the job resource. @@ -205,30 +106,299 @@ def created(self): return _datetime_from_microseconds(millis * 1000.0) @property - def started(self): - """Datetime at which the job was started. + def started(self): + """Datetime at which the job was started. + + :rtype: ``datetime.datetime``, or ``NoneType`` + :returns: the start time (None until set from the server). + """ + statistics = self._properties.get('statistics') + if statistics is not None: + millis = statistics.get('startTime') + if millis is not None: + return _datetime_from_microseconds(millis * 1000.0) + + @property + def ended(self): + """Datetime at which the job finished. + + :rtype: ``datetime.datetime``, or ``NoneType`` + :returns: the end time (None until set from the server). + """ + statistics = self._properties.get('statistics') + if statistics is not None: + millis = statistics.get('endTime') + if millis is not None: + return _datetime_from_microseconds(millis * 1000.0) + + @property + def error_result(self): + """Error information about the job as a whole. + + :rtype: mapping, or ``NoneType`` + :returns: the error information (None until set from the server). + """ + status = self._properties.get('status') + if status is not None: + return status.get('errorResult') + + @property + def errors(self): + """Information about individual errors generated by the job. + + :rtype: list of mappings, or ``NoneType`` + :returns: the error information (None until set from the server). + """ + status = self._properties.get('status') + if status is not None: + return status.get('errors') + + @property + def state(self): + """Status of the job. + + :rtype: string, or ``NoneType`` + :returns: the state (None until set from the server). + """ + status = self._properties.get('status') + if status is not None: + return status.get('state') + + def _require_client(self, client): + """Check client or verify over-ride. + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + + :rtype: :class:`gcloud.bigquery.client.Client` + :returns: The client passed in or the currently bound client. + """ + if client is None: + client = self._client + return client + + def _scrub_local_properties(self, cleaned): + """Helper: handle subclass properties in cleaned.""" + pass + + def _set_properties(self, api_response): + """Update properties from resource in body of ``api_response`` + + :type api_response: httplib2.Response + :param api_response: response returned from an API call + """ + cleaned = api_response.copy() + self._scrub_local_properties(cleaned) + + statistics = cleaned.get('statistics', {}) + if 'creationTime' in statistics: + statistics['creationTime'] = float(statistics['creationTime']) + if 'startTime' in statistics: + statistics['startTime'] = float(statistics['startTime']) + if 'endTime' in statistics: + statistics['endTime'] = float(statistics['endTime']) + + self._properties.clear() + self._properties.update(cleaned) + + def begin(self, client=None): + """API call: begin the job via a POST request + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + path = '/projects/%s/jobs' % (self.project,) + api_response = client.connection.api_request( + method='POST', path=path, data=self._build_resource()) + self._set_properties(api_response) + + def exists(self, client=None): + """API call: test for the existence of the job via a GET request + + See + https://cloud.google.com/bigquery/docs/reference/v2/jobs/get + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + + try: + client.connection.api_request(method='GET', path=self.path, + query_params={'fields': 'id'}) + except NotFound: + return False + else: + return True + + def reload(self, client=None): + """API call: refresh job properties via a GET request + + See + https://cloud.google.com/bigquery/docs/reference/v2/jobs/get + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + + api_response = client.connection.api_request( + method='GET', path=self.path) + self._set_properties(api_response) + + def cancel(self, client=None): + """API call: cancel job via a POST request + + See + https://cloud.google.com/bigquery/docs/reference/v2/jobs/cancel + + :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current dataset. + """ + client = self._require_client(client) + + api_response = client.connection.api_request( + method='POST', path='%s/cancel' % (self.path,)) + self._set_properties(api_response) + + +class _Enum(object): + """Psedo-enumeration class. + + Subclasses must define ``ALLOWED`` as a class-level constant: it must + be a sequence of strings. + """ + @classmethod + def validate(cls, value): + """Check that ``value`` is one of the allowed values. + + :raises: ValueError if value is not allowed. + """ + if value not in cls.ALLOWED: + raise ValueError('Pass one of: %s' ', '.join(cls.ALLOWED)) + + +class CreateDisposition(_Enum): + """Pseudo-enum for allowed values for ``create_disposition`` properties. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.createDisposition + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.createDisposition + """ + CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' + CREATE_NEVER = 'CREATE_NEVER' + ALLOWED = (CREATE_IF_NEEDED, CREATE_NEVER) + + +class Encoding(_Enum): + """Pseudo-enum for allowed values for ``encoding`` properties. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.encoding + """ + UTF_8 = 'UTF-8' + ISO_8559_1 = 'ISO-8559-1' + ALLOWED = (UTF_8, ISO_8559_1) + + +class SourceFormat(_Enum): + """Pseudo-enum for allowed values for ``source_format`` properties. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.sourceFormat + """ + CSV = 'CSV' + DATASTORE_BACKUP = 'DATASTORE_BACKUP' + NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' + ALLOWED = (CSV, DATASTORE_BACKUP, NEWLINE_DELIMITED_JSON) + + +class WriteDisposition(_Enum): + """Pseudo-enum for allowed values for ``write_disposition`` properties. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.writeDisposition + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.writeDisposition + """ + WRITE_APPEND = 'WRITE_APPEND' + WRITE_TRUNCATE = 'WRITE_TRUNCATE' + WRITE_EMPTY = 'WRITE_EMPTY' + ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY) + + +class _LoadConfiguration(object): + """User-settable configuration options for load jobs.""" + # None -> use server default. + _allow_jagged_rows = None + _allow_quoted_newlines = None + _create_disposition = None + _encoding = None + _field_delimiter = None + _ignore_unknown_values = None + _max_bad_records = None + _quote_character = None + _skip_leading_rows = None + _source_format = None + _write_disposition = None + + +class LoadTableFromStorageJob(_BaseJob): + """Asynchronous job for loading data into a BQ table from CloudStorage. + + :type name: string + :param name: the name of the job + + :type destination: :class:`gcloud.bigquery.table.Table` + :param destination: Table into which data is to be loaded. + + :type source_uris: sequence of string + :param source_uris: URIs of data files to be loaded. + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: A client which holds credentials and project configuration + for the dataset (which requires a project). + + :type schema: list of :class:`gcloud.bigquery.table.SchemaField` + :param schema: The job's schema + """ + def __init__(self, name, destination, source_uris, client, schema=()): + super(LoadTableFromStorageJob, self).__init__(name, client) + self.destination = destination + self.source_uris = source_uris + self.schema = schema + self._configuration = _LoadConfiguration() + + @property + def schema(self): + """Table's schema. - :rtype: ``datetime.datetime``, or ``NoneType`` - :returns: the start time (None until set from the server). + :rtype: list of :class:`SchemaField` + :returns: fields describing the schema """ - statistics = self._properties.get('statistics') - if statistics is not None: - millis = statistics.get('startTime') - if millis is not None: - return _datetime_from_microseconds(millis * 1000.0) + return list(self._schema) - @property - def ended(self): - """Datetime at which the job finished. + @schema.setter + def schema(self, value): + """Update table's schema - :rtype: ``datetime.datetime``, or ``NoneType`` - :returns: the end time (None until set from the server). + :type value: list of :class:`SchemaField` + :param value: fields describing the schema + + :raises: TypeError if 'value' is not a sequence, or ValueError if + any item in the sequence is not a SchemaField """ - statistics = self._properties.get('statistics') - if statistics is not None: - millis = statistics.get('endTime') - if millis is not None: - return _datetime_from_microseconds(millis * 1000.0) + if not all(isinstance(field, SchemaField) for field in value): + raise ValueError('Schema items must be fields') + self._schema = tuple(value) @property def input_file_bytes(self): @@ -274,43 +444,13 @@ def output_rows(self): if statistics is not None: return int(statistics['load']['outputRows']) - @property - def error_result(self): - """Error information about the job as a whole. - - :rtype: mapping, or ``NoneType`` - :returns: the error information (None until set from the server). - """ - status = self._properties.get('status') - if status is not None: - return status['errorResult'] - - @property - def errors(self): - """Information about individual errors generated by the job. - - :rtype: list of mappings, or ``NoneType`` - :returns: the error information (None until set from the server). - """ - status = self._properties.get('status') - if status is not None: - return status['errors'] - - @property - def state(self): - """Status of the job. - - :rtype: string, or ``NoneType`` - :returns: the state (None until set from the server). - """ - status = self._properties.get('status') - if status is not None: - return status['state'] - @property def allow_jagged_rows(self): """Allow rows with missing trailing commas for optional fields. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.allowJaggedRows + :rtype: boolean, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -338,6 +478,9 @@ def allow_jagged_rows(self): def allow_quoted_newlines(self): """Allow rows with quoted newlines. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.allowQuotedNewlines + :rtype: boolean, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -363,7 +506,10 @@ def allow_quoted_newlines(self): @property def create_disposition(self): - """Handling for missing destination table. + """Define how the back-end handles a missing destination table. + + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.createDisposition :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). @@ -389,6 +535,9 @@ def create_disposition(self): def encoding(self): """Encoding for source data. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.encoding + :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -413,6 +562,9 @@ def encoding(self): def field_delimiter(self): """Allow rows with missing trailing commas for optional fields. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.fieldDelimiter + :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -440,6 +592,9 @@ def field_delimiter(self): def ignore_unknown_values(self): """Ignore rows with extra columns beyond those specified by the schema. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.ignoreUnknownValues + :rtype: boolean, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -467,6 +622,9 @@ def ignore_unknown_values(self): def max_bad_records(self): """Max number of bad records to be ignored. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.maxBadRecords + :rtype: integer, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -494,6 +652,9 @@ def max_bad_records(self): def quote_character(self): """Character used to quote values. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.quote + :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -521,6 +682,9 @@ def quote_character(self): def skip_leading_rows(self): """Count of leading rows to be skipped. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.skipLeadingRows + :rtype: integer, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -548,6 +712,9 @@ def skip_leading_rows(self): def source_format(self): """Format of source data files. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.sourceFormat + :rtype: string, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -572,6 +739,9 @@ def source_format(self): def write_disposition(self): """Allow rows with missing trailing commas for optional fields. + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.writeDisposition + :rtype: boolean, or ``NoneType`` :returns: The value as set by the user, or None (the default). """ @@ -592,20 +762,6 @@ def write_disposition(self): """Delete write_disposition.""" del self._configuration._write_disposition - def _require_client(self, client): - """Check client or verify over-ride. - - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. - - :rtype: :class:`gcloud.bigquery.client.Client` - :returns: The client passed in or the currently bound client. - """ - if client is None: - client = self._client - return client - def _populate_config_resource(self, configuration): """Helper for _build_resource: copy config properties to resource""" if self.allow_jagged_rows is not None: @@ -658,91 +814,128 @@ def _build_resource(self): return resource - def _set_properties(self, api_response): - """Update properties from resource in body of ``api_response`` - - :type api_response: httplib2.Response - :param api_response: response returned from an API call - """ - self._properties.clear() - cleaned = api_response.copy() + def _scrub_local_properties(self, cleaned): + """Helper: handle subclass properties in cleaned.""" schema = cleaned.pop('schema', {'fields': ()}) self.schema = _parse_schema_resource(schema) - statistics = cleaned.get('statistics', {}) - if 'creationTime' in statistics: - statistics['creationTime'] = float(statistics['creationTime']) - if 'startTime' in statistics: - statistics['startTime'] = float(statistics['startTime']) - if 'endTime' in statistics: - statistics['endTime'] = float(statistics['endTime']) - self._properties.update(cleaned) +class _CopyConfiguration(object): + """User-settable configuration options for copy jobs.""" + # None -> use server default. + _create_disposition = None + _write_disposition = None - def begin(self, client=None): - """API call: begin the job via a POST request + +class CopyJob(_BaseJob): + """Asynchronous job: copy data into a BQ table from other tables. + + :type name: string + :param name: the name of the job + + :type destination: :class:`gcloud.bigquery.table.Table` + :param destination: Table into which data is to be loaded. + + :type sources: list of :class:`gcloud.bigquery.table.Table` + :param sources: Table into which data is to be loaded. + + :type client: :class:`gcloud.bigquery.client.Client` + :param client: A client which holds credentials and project configuration + for the dataset (which requires a project). + """ + def __init__(self, name, destination, sources, client): + super(CopyJob, self).__init__(name, client) + self.destination = destination + self.sources = sources + self._configuration = _CopyConfiguration() + + @property + def create_disposition(self): + """Handling for missing destination table. See: - https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.createDisposition - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. + :rtype: string, or ``NoneType`` + :returns: The value as set by the user, or None (the default). """ - client = self._require_client(client) - path = '/projects/%s/jobs' % (self.project,) - api_response = client.connection.api_request( - method='POST', path=path, data=self._build_resource()) - self._set_properties(api_response) - - def exists(self, client=None): - """API call: test for the existence of the job via a GET request + return self._configuration._create_disposition - See - https://cloud.google.com/bigquery/docs/reference/v2/jobs/get + @create_disposition.setter + def create_disposition(self, value): + """Update create_disposition. - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. + :type value: string + :param value: allowed values for :class:`CreateDisposition` """ - client = self._require_client(client) + CreateDisposition.validate(value) # raises ValueError if invalid + self._configuration._create_disposition = value - try: - client.connection.api_request(method='GET', path=self.path, - query_params={'fields': 'id'}) - except NotFound: - return False - else: - return True + @create_disposition.deleter + def create_disposition(self): + """Delete create_disposition.""" + del self._configuration._create_disposition - def reload(self, client=None): - """API call: refresh job properties via a GET request + @property + def write_disposition(self): + """Allow rows with missing trailing commas for optional fields. - See - https://cloud.google.com/bigquery/docs/reference/v2/jobs/get + See: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy.writeDisposition - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. + :rtype: string, or ``NoneType`` + :returns: The value as set by the user, or None (the default). """ - client = self._require_client(client) + return self._configuration._write_disposition - api_response = client.connection.api_request( - method='GET', path=self.path) - self._set_properties(api_response) + @write_disposition.setter + def write_disposition(self, value): + """Update write_disposition. - def cancel(self, client=None): - """API call: cancel job via a POST request + :type value: string + :param value: allowed values for :class:`WriteDisposition`. + """ + WriteDisposition.validate(value) # raises ValueError if invalid + self._configuration._write_disposition = value - See - https://cloud.google.com/bigquery/docs/reference/v2/jobs/cancel + @write_disposition.deleter + def write_disposition(self): + """Delete write_disposition.""" + del self._configuration._write_disposition - :type client: :class:`gcloud.bigquery.client.Client` or ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. - """ - client = self._require_client(client) + def _populate_config_resource(self, configuration): + """Helper for _build_resource: copy config properties to resource""" + if self.create_disposition is not None: + configuration['createDisposition'] = self.create_disposition + if self.write_disposition is not None: + configuration['writeDisposition'] = self.write_disposition - api_response = client.connection.api_request( - method='POST', path='%s/cancel' % self.path) - self._set_properties(api_response) + def _build_resource(self): + """Generate a resource for ``begin``.""" + + source_refs = [{ + 'projectId': table.project, + 'datasetId': table.dataset_name, + 'tableId': table.name, + } for table in self.sources] + + resource = { + 'jobReference': { + 'projectId': self.project, + 'jobId': self.name, + }, + 'configuration': { + 'copy': { + 'sourceTables': source_refs, + 'destinationTable': { + 'projectId': self.destination.project, + 'datasetId': self.destination.dataset_name, + 'tableId': self.destination.name, + }, + }, + }, + } + configuration = resource['configuration']['copy'] + self._populate_config_resource(configuration) + + return resource diff --git a/gcloud/bigquery/test_client.py b/gcloud/bigquery/test_client.py index 548814d7c744..653a964c9faf 100644 --- a/gcloud/bigquery/test_client.py +++ b/gcloud/bigquery/test_client.py @@ -147,6 +147,26 @@ def test_load_table_from_storage(self): self.assertEqual(list(job.source_uris), [SOURCE_URI]) self.assertTrue(job.destination is destination) + def test_copy_table(self): + from gcloud.bigquery.job import CopyJob + PROJECT = 'PROJECT' + JOB = 'job_name' + DATASET = 'dataset_name' + SOURCE = 'source_table' + DESTINATION = 'destination_table' + creds = _Credentials() + http = object() + client = self._makeOne(project=PROJECT, credentials=creds, http=http) + dataset = client.dataset(DATASET) + source = dataset.table(SOURCE) + destination = dataset.table(DESTINATION) + job = client.copy_table(JOB, destination, source) + self.assertTrue(isinstance(job, CopyJob)) + self.assertTrue(job._client is client) + self.assertEqual(job.name, JOB) + self.assertEqual(list(job.sources), [source]) + self.assertTrue(job.destination is destination) + class _Credentials(object): diff --git a/gcloud/bigquery/test_job.py b/gcloud/bigquery/test_job.py index 56a517d73d42..edf621ad5740 100644 --- a/gcloud/bigquery/test_job.py +++ b/gcloud/bigquery/test_job.py @@ -15,17 +15,13 @@ import unittest2 -class TestLoadTableFromStorageJob(unittest2.TestCase): +class _Base(object): PROJECT = 'project' SOURCE1 = 'http://example.com/source1.csv' DS_NAME = 'datset_name' TABLE_NAME = 'table_name' JOB_NAME = 'job_name' - def _getTargetClass(self): - from gcloud.bigquery.job import LoadTableFromStorageJob - return LoadTableFromStorageJob - def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) @@ -40,21 +36,17 @@ def _setUpConstants(self): self.JOB_ID = '%s:%s' % (self.PROJECT, self.JOB_NAME) self.RESOURCE_URL = 'http://example.com/path/to/resource' self.USER_EMAIL = 'phred@example.com' - self.INPUT_FILES = 2 - self.INPUT_BYTES = 12345 - self.OUTPUT_BYTES = 23456 - self.OUTPUT_ROWS = 345 def _makeResource(self, started=False, ended=False): self._setUpConstants() resource = { 'configuration': { - 'load': { + self.JOB_TYPE: { }, }, 'statistics': { 'creationTime': self.WHEN_TS * 1000, - 'load': { + self.JOB_TYPE: { } }, 'etag': self.ETAG, @@ -72,43 +64,90 @@ def _makeResource(self, started=False, ended=False): if ended: resource['statistics']['endTime'] = (self.WHEN_TS + 1000) * 1000 - resource['statistics']['load']['inputFiles'] = self.INPUT_FILES - resource['statistics']['load']['inputFileBytes'] = self.INPUT_BYTES - resource['statistics']['load']['outputBytes'] = self.OUTPUT_BYTES - resource['statistics']['load']['outputRows'] = self.OUTPUT_ROWS return resource + def _verifyInitialReadonlyProperties(self, job): + # root elements of resource + self.assertEqual(job.etag, None) + self.assertEqual(job.job_id, None) + self.assertEqual(job.self_link, None) + self.assertEqual(job.user_email, None) + + # derived from resource['statistics'] + self.assertEqual(job.created, None) + self.assertEqual(job.started, None) + self.assertEqual(job.ended, None) + + # derived from resource['status'] + self.assertEqual(job.error_result, None) + self.assertEqual(job.errors, None) + self.assertEqual(job.state, None) + def _verifyReadonlyResourceProperties(self, job, resource): from datetime import timedelta self.assertEqual(job.job_id, self.JOB_ID) - if 'creationTime' in resource.get('statistics', {}): + statistics = resource.get('statistics', {}) + + if 'creationTime' in statistics: self.assertEqual(job.created, self.WHEN) else: self.assertEqual(job.created, None) - if 'startTime' in resource.get('statistics', {}): + + if 'startTime' in statistics: self.assertEqual(job.started, self.WHEN) else: self.assertEqual(job.started, None) - if 'endTime' in resource.get('statistics', {}): + + if 'endTime' in statistics: self.assertEqual(job.ended, self.WHEN + timedelta(seconds=1000)) else: self.assertEqual(job.ended, None) + if 'etag' in resource: self.assertEqual(job.etag, self.ETAG) else: self.assertEqual(job.etag, None) + if 'selfLink' in resource: self.assertEqual(job.self_link, self.RESOURCE_URL) else: self.assertEqual(job.self_link, None) + if 'user_email' in resource: self.assertEqual(job.user_email, self.USER_EMAIL) else: self.assertEqual(job.user_email, None) + +class TestLoadTableFromStorageJob(unittest2.TestCase, _Base): + JOB_TYPE = 'load' + + def _getTargetClass(self): + from gcloud.bigquery.job import LoadTableFromStorageJob + return LoadTableFromStorageJob + + def _setUpConstants(self): + super(TestLoadTableFromStorageJob, self)._setUpConstants() + self.INPUT_FILES = 2 + self.INPUT_BYTES = 12345 + self.OUTPUT_BYTES = 23456 + self.OUTPUT_ROWS = 345 + + def _makeResource(self, started=False, ended=False): + resource = super(TestLoadTableFromStorageJob, self)._makeResource( + started, ended) + + if ended: + resource['statistics']['load']['inputFiles'] = self.INPUT_FILES + resource['statistics']['load']['inputFileBytes'] = self.INPUT_BYTES + resource['statistics']['load']['outputBytes'] = self.OUTPUT_BYTES + resource['statistics']['load']['outputRows'] = self.OUTPUT_ROWS + + return resource + def _verifyBooleanConfigProperties(self, job, config): if 'allowJaggedRows' in config: self.assertEqual(job.allow_jagged_rows, @@ -188,6 +227,16 @@ def test_ctor(self): job.path, '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)) self.assertEqual(job.schema, []) + + self._verifyInitialReadonlyProperties(job) + + # derived from resource['statistics']['load'] + self.assertEqual(job.input_file_bytes, None) + self.assertEqual(job.input_files, None) + self.assertEqual(job.output_bytes, None) + self.assertEqual(job.output_rows, None) + + # set/read from resource['configuration']['load'] self.assertTrue(job.allow_jagged_rows is None) self.assertTrue(job.allow_quoted_newlines is None) self.assertTrue(job.create_disposition is None) @@ -200,28 +249,6 @@ def test_ctor(self): self.assertTrue(job.source_format is None) self.assertTrue(job.write_disposition is None) - # root elements of resource - self.assertEqual(job.etag, None) - self.assertEqual(job.job_id, None) - self.assertEqual(job.self_link, None) - self.assertEqual(job.user_email, None) - - # derived from resource['statistics'] - self.assertEqual(job.created, None) - self.assertEqual(job.started, None) - self.assertEqual(job.ended, None) - - # derived from resource['statistics']['load'] - self.assertEqual(job.input_file_bytes, None) - self.assertEqual(job.input_files, None) - self.assertEqual(job.output_bytes, None) - self.assertEqual(job.output_rows, None) - - # derived from resource['status'] - self.assertEqual(job.error_result, None) - self.assertEqual(job.errors, None) - self.assertEqual(job.state, None) - def test_ctor_w_schema(self): from gcloud.bigquery.table import SchemaField client = _Client(self.PROJECT) @@ -293,11 +320,6 @@ def test_props_set_by_server(self): load_stats['outputBytes'] = 23456 load_stats['outputRows'] = 345 - status = job._properties['status'] = {} - status['errorResult'] = ERROR_RESULT - status['errors'] = [ERROR_RESULT] - status['state'] = 'STATE' - self.assertEqual(job.etag, 'ETAG') self.assertEqual(job.job_id, JOB_ID) self.assertEqual(job.self_link, URL) @@ -312,6 +334,16 @@ def test_props_set_by_server(self): self.assertEqual(job.output_bytes, 23456) self.assertEqual(job.output_rows, 345) + status = job._properties['status'] = {} + + self.assertEqual(job.error_result, None) + self.assertEqual(job.errors, None) + self.assertEqual(job.state, None) + + status['errorResult'] = ERROR_RESULT + status['errors'] = [ERROR_RESULT] + status['state'] = 'STATE' + self.assertEqual(job.error_result, ERROR_RESULT) self.assertEqual(job.errors, [ERROR_RESULT]) self.assertEqual(job.state, 'STATE') @@ -703,6 +735,250 @@ def test_cancel_w_alternate_client(self): self._verifyResourceProperties(job, RESOURCE) +class TestCopyJob(unittest2.TestCase, _Base): + JOB_TYPE = 'copy' + SOURCE_TABLE = 'source_table' + DESTINATION_TABLE = 'destination_table' + + def _getTargetClass(self): + from gcloud.bigquery.job import CopyJob + return CopyJob + + def _verifyResourceProperties(self, job, resource): + self._verifyReadonlyResourceProperties(job, resource) + + config = resource.get('configuration', {}).get('copy') + + if 'createDisposition' in config: + self.assertEqual(job.create_disposition, + config['createDisposition']) + else: + self.assertTrue(job.create_disposition is None) + + if 'writeDisposition' in config: + self.assertEqual(job.write_disposition, + config['writeDisposition']) + else: + self.assertTrue(job.write_disposition is None) + + def test_ctor(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client) + self.assertTrue(job.destination is destination) + self.assertEqual(job.sources, [source]) + self.assertTrue(job._client is client) + self.assertEqual( + job.path, + '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)) + + self._verifyInitialReadonlyProperties(job) + + # set/read from resource['configuration']['copy'] + self.assertTrue(job.create_disposition is None) + self.assertTrue(job.write_disposition is None) + + def test_create_disposition_setter_bad_value(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client) + with self.assertRaises(ValueError): + job.create_disposition = 'BOGUS' + + def test_create_disposition_setter_deleter(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client) + job.create_disposition = 'CREATE_IF_NEEDED' + self.assertEqual(job.create_disposition, 'CREATE_IF_NEEDED') + del job.create_disposition + self.assertTrue(job.create_disposition is None) + + def test_write_disposition_setter_bad_value(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client) + with self.assertRaises(ValueError): + job.write_disposition = 'BOGUS' + + def test_write_disposition_setter_deleter(self): + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client) + job.write_disposition = 'WRITE_TRUNCATE' + self.assertEqual(job.write_disposition, 'WRITE_TRUNCATE') + del job.write_disposition + self.assertTrue(job.write_disposition is None) + + def test_begin_w_bound_client(self): + PATH = 'projects/%s/jobs' % self.PROJECT + RESOURCE = self._makeResource() + # Ensure None for missing server-set props + del RESOURCE['statistics']['creationTime'] + del RESOURCE['etag'] + del RESOURCE['selfLink'] + del RESOURCE['user_email'] + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client) + + job.begin() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'copy': { + 'sourceTables': [{ + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.SOURCE_TABLE + }], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.DESTINATION_TABLE, + }, + }, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + + def test_begin_w_alternate_client(self): + PATH = 'projects/%s/jobs' % self.PROJECT + RESOURCE = self._makeResource(ended=True) + COPY_CONFIGURATION = { + 'sourceTables': [{ + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.SOURCE_TABLE, + }], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.DESTINATION_TABLE, + }, + 'createDisposition': 'CREATE_NEVER', + 'writeDisposition': 'WRITE_TRUNCATE', + } + RESOURCE['configuration']['copy'] = COPY_CONFIGURATION + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client1) + + job.create_disposition = 'CREATE_NEVER' + job.write_disposition = 'WRITE_TRUNCATE' + + job.begin(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'copy': COPY_CONFIGURATION, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + + def test_exists_miss_w_bound_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + conn = _Connection() + client = _Client(project=self.PROJECT, connection=conn) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client) + + self.assertFalse(job.exists()) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], {'fields': 'id'}) + + def test_exists_hit_w_alternate_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection({}) + client2 = _Client(project=self.PROJECT, connection=conn2) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client1) + + self.assertTrue(job.exists(client=client2)) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['query_params'], {'fields': 'id'}) + + def test_reload_w_bound_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + RESOURCE = self._makeResource() + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client) + + job.reload() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + + def test_reload_w_alternate_client(self): + PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + RESOURCE = self._makeResource() + conn1 = _Connection() + client1 = _Client(project=self.PROJECT, connection=conn1) + conn2 = _Connection(RESOURCE) + client2 = _Client(project=self.PROJECT, connection=conn2) + source = _Table(self.SOURCE_TABLE) + destination = _Table(self.DESTINATION_TABLE) + job = self._makeOne(self.JOB_NAME, destination, [source], client1) + + job.reload(client=client2) + + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/%s' % PATH) + self._verifyResourceProperties(job, RESOURCE) + + class _Client(object): def __init__(self, project='project', connection=None): @@ -712,11 +988,13 @@ def __init__(self, project='project', connection=None): class _Table(object): - def __init__(self): - pass + def __init__(self, name=None): + self._name = name @property def name(self): + if self._name is not None: + return self._name return TestLoadTableFromStorageJob.TABLE_NAME @property