From 40339f2c1827aa6aaccc188b3d44ad21432d030c Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Mon, 14 Sep 2015 16:12:43 -0700 Subject: [PATCH 1/6] Updating all bigquery samples to new main pattern --- appengine/ndb/transactions/main.py | 4 - bigquery/samples/async_query.py | 46 +++++--- .../samples/export_data_to_cloud_storage.py | 66 +++++++----- bigquery/samples/getting_started.py | 22 ++-- bigquery/samples/list_datasets_projects.py | 29 ++--- bigquery/samples/load_data_by_post.py | 50 +++++---- bigquery/samples/load_data_from_csv.py | 100 +++++++++++------- bigquery/samples/streaming.py | 62 ++++++----- bigquery/samples/sync_query.py | 42 +++++--- bigquery/samples/utils.py | 2 + bigquery/tests/test_async_query.py | 6 -- .../test_export_data_to_cloud_storage.py | 6 -- bigquery/tests/test_getting_started.py | 6 -- bigquery/tests/test_list_datasets_projects.py | 6 -- bigquery/tests/test_load_data_from_csv.py | 6 -- bigquery/tests/test_streaming.py | 5 - bigquery/tests/test_sync_query.py | 5 - 17 files changed, 253 insertions(+), 210 deletions(-) diff --git a/appengine/ndb/transactions/main.py b/appengine/ndb/transactions/main.py index 4a077af565d..024c73a77cc 100644 --- a/appengine/ndb/transactions/main.py +++ b/appengine/ndb/transactions/main.py @@ -191,7 +191,3 @@ def add_note(): return ('Already there
Return' % flask.url_for('main_page', page_name=page_name)) return flask.redirect(flask.url_for('main_page', page_name=page_name)) - - -if __name__ == '__main__': - app.run() diff --git a/bigquery/samples/async_query.py b/bigquery/samples/async_query.py index a0b2f775f0c..63b7d58e4e3 100644 --- a/bigquery/samples/async_query.py +++ b/bigquery/samples/async_query.py @@ -11,13 +11,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse import json import uuid -from bigquery.samples.utils import get_service -from bigquery.samples.utils import paging -from bigquery.samples.utils import poll_job -from six.moves import input +from utils import get_service, paging, poll_job # [START async_query] @@ -43,7 +41,7 @@ def async_query(service, project_id, query, batch=False, num_retries=5): # [START run] -def run(project_id, query_string, batch, num_retries, interval): +def main(project_id, query_string, batch, num_retries, interval): service = get_service() query_job = async_query(service, @@ -63,21 +61,35 @@ def run(project_id, query_string, batch, num_retries, interval): num_retries=num_retries, **query_job['jobReference']): - yield json.dumps(page['rows']) + print(json.dumps(page['rows'])) # [END run] # [START main] -def main(): - project_id = input("Enter the project ID: ") - query_string = input("Enter the Bigquery SQL Query: ") - batch = input("Run query as batch (y/n)?: ") in ( - 'True', 'true', 'y', 'Y', 'yes', 'Yes') - num_retries = int(input( - "Enter number of times to retry in case of 500 error: ")) - interval = input( - "Enter how often to poll the query for completion (seconds): ") +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Loads data into BigQuery.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('query', help='BigQuery SQL Query.') + parser.add_argument( + '-b', '--batch', help='Run query in batch mode.', action='store_true') + parser.add_argument( + '-r', '--num_retries', + help='Number of times to retry in case of 500 error.', + type=int, + default=5) + parser.add_argument( + '-p', '--poll_interval', + help='How often to poll the query for completion (seconds).', + type=int, + default=1) - for result in run(project_id, query_string, batch, num_retries, interval): - print(result) + args = parser.parse_args() + + main( + args.project_id, + args.query, + args.batch, + args.num_retries, + args.poll_interval) # [END main] diff --git a/bigquery/samples/export_data_to_cloud_storage.py b/bigquery/samples/export_data_to_cloud_storage.py index 334a12d4298..0f22c2669ea 100644 --- a/bigquery/samples/export_data_to_cloud_storage.py +++ b/bigquery/samples/export_data_to_cloud_storage.py @@ -11,16 +11,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse import uuid -from bigquery.samples.utils import get_service -from bigquery.samples.utils import poll_job -from six.moves import input +from utils import get_service, poll_job # [START export_table] def export_table(service, cloud_storage_path, - projectId, datasetId, tableId, + project_id, dataset_id, table_id, export_format="CSV", num_retries=5): """ @@ -42,15 +41,15 @@ def export_table(service, cloud_storage_path, # don't accidentally duplicate export job_data = { 'jobReference': { - 'projectId': projectId, + 'projectId': project_id, 'jobId': str(uuid.uuid4()) }, 'configuration': { 'extract': { 'sourceTable': { - 'projectId': projectId, - 'datasetId': datasetId, - 'tableId': tableId, + 'projectId': project_id, + 'datasetId': dataset_id, + 'tableId': table_id, }, 'destinationUris': [cloud_storage_path], 'destinationFormat': export_format @@ -58,19 +57,18 @@ def export_table(service, cloud_storage_path, } } return service.jobs().insert( - projectId=projectId, + projectId=project_id, body=job_data).execute(num_retries=num_retries) # [END export_table] # [START run] -def run(cloud_storage_path, - projectId, datasetId, tableId, - num_retries, interval, export_format="CSV"): +def main(cloud_storage_path, project_id, dataset_id, table_id, + num_retries, interval, export_format="CSV"): bigquery = get_service() resource = export_table(bigquery, cloud_storage_path, - projectId, datasetId, tableId, + project_id, dataset_id, table_id, num_retries=num_retries, export_format=export_format) poll_job(bigquery, @@ -82,20 +80,34 @@ def run(cloud_storage_path, # [START main] -def main(): - projectId = input("Enter the project ID: ") - datasetId = input("Enter a dataset ID: ") - tableId = input("Enter a table name to copy: ") - cloud_storage_path = input( - "Enter a Google Cloud Storage URI: ") - interval = input( - "Enter how often to poll the job (in seconds): ") - num_retries = input( - "Enter the number of retries in case of 500 error: ") +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Exports data from BigQuery to Google Cloud Storage.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('dataset_id', help='BigQuery dataset to export.') + parser.add_argument('table_id', help='BigQuery table to export.') + parser.add_argument( + 'gcs_path', + help=('Google Cloud Storage path to store the exported data. For ' + 'example, gs://mybucket/mydata.csv')) + parser.add_argument( + '-p', '--poll_interval', + help='How often to poll the query for completion (seconds).', + type=int, + default=1) + parser.add_argument( + '-r', '--num_retries', + help='Number of times to retry in case of 500 error.', + type=int, + default=5) - run(cloud_storage_path, - projectId, datasetId, tableId, - num_retries, interval) + args = parser.parse_args() - print('Done exporting!') + main( + args.gcs_path, + args.project_id, + args.dataset_id, + args.table_id, + args.num_retries, + args.poll_interval) # [END main] diff --git a/bigquery/samples/getting_started.py b/bigquery/samples/getting_started.py index 94aec36e41e..8c8780028e0 100644 --- a/bigquery/samples/getting_started.py +++ b/bigquery/samples/getting_started.py @@ -20,9 +20,10 @@ words. """ # [START all] +import argparse + from apiclient.discovery import build from apiclient.errors import HttpError - from oauth2client.client import GoogleCredentials @@ -38,10 +39,11 @@ def main(project_id): # [START run_query] query_request = bigquery_service.jobs() query_data = { - 'query': ('SELECT TOP(corpus, 10) as title, ' - 'COUNT(*) as unique_words ' - 'FROM [publicdata:samples.shakespeare];') - } + 'query': ( + 'SELECT TOP(corpus, 10) as title, ' + 'COUNT(*) as unique_words ' + 'FROM [publicdata:samples.shakespeare];') + } query_response = query_request.query( projectId=project_id, @@ -60,7 +62,11 @@ def main(project_id): if __name__ == '__main__': - # The id of the project to run queries under. - project_id = input("Enter the project ID: ") - main(project_id) + parser = argparse.ArgumentParser( + description='Queries the public BigQuery Shakespeare dataset.') + parser.add_argument('project_id', help='Your Google Cloud Project ID.') + + args = parser.parse_args() + + main(args.project_id) # [END all] diff --git a/bigquery/samples/list_datasets_projects.py b/bigquery/samples/list_datasets_projects.py index 0f97c668516..900528a0fd2 100644 --- a/bigquery/samples/list_datasets_projects.py +++ b/bigquery/samples/list_datasets_projects.py @@ -35,7 +35,21 @@ application-default-credentials#howtheywork [2] https://cloud.google.com/sdk/ [3] https://console.developers.google.com -""" # NOQA + +For more information on the BigQuery API you can visit: + + https://developers.google.com/bigquery/docs/overview + +For more information on the BigQuery API Python library surface you +can visit: + + https://developers.google.com/resources/api-libraries/documentation/ + bigquery/v2/python/latest/ + +For information on the Python Client Library visit: + + https://developers.google.com/api-client-library/python/start/get_started +""" import argparse from pprint import pprint @@ -81,19 +95,6 @@ def main(project_id): list_datasets(service, project_id) list_projects(service) - -# For more information on the BigQuery API you can visit: -# -# https://developers.google.com/bigquery/docs/overview -# -# For more information on the BigQuery API Python library surface you -# can visit: -# -# https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/ -# -# For information on the Python Client Library visit: -# -# https://developers.google.com/api-client-library/python/start/get_started if __name__ == '__main__': parser = argparse.ArgumentParser( description='Lists BigQuery datasets and projects.') diff --git a/bigquery/samples/load_data_by_post.py b/bigquery/samples/load_data_by_post.py index 26b1e2236ff..1d72b77ece4 100644 --- a/bigquery/samples/load_data_by_post.py +++ b/bigquery/samples/load_data_by_post.py @@ -11,12 +11,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse import json -from bigquery.samples.utils import get_service, poll_job +from utils import get_service, poll_job import httplib2 from oauth2client.client import GoogleCredentials -from six.moves import input # [START make_post] @@ -71,33 +71,27 @@ def make_post(http, schema, data, projectId, datasetId, tableId): # [START main] -def main(): +def main(project_id, dataset_id, table_name, schema_path, data_path): credentials = GoogleCredentials.get_application_default() http = credentials.authorize(httplib2.Http()) - projectId = input('Enter the project ID: ') - datasetId = input('Enter a dataset ID: ') - tableId = input('Enter a table name to load the data to: ') - schema_path = input( - 'Enter the path to the schema file for the table: ') with open(schema_path, 'r') as schema_file: schema = schema_file.read() - data_path = input('Enter the path to the data file: ') - with open(data_path, 'r') as data_file: data = data_file.read() - resp, content = make_post(http, - schema, - data, - projectId, - datasetId, - tableId) + resp, content = make_post( + http, + schema, + data, + project_id, + dataset_id, + table_name) if resp.status == 200: job_resource = json.loads(content) - service = get_service(credentials) + service = get_service() poll_job(service, **job_resource['jobReference']) print("Success!") else: @@ -105,4 +99,24 @@ def main(): # [END main] if __name__ == '__main__': - main() + parser = argparse.ArgumentParser( + description='Loads data into BigQuery.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('dataset_id', help='A BigQuery dataset ID.') + parser.add_argument( + 'table_name', help='Name of the table to load data into.') + parser.add_argument( + 'schema_file', + help='Path to a schema file describing the table schema.') + parser.add_argument( + 'data_file', + help='Path to the data file.') + + args = parser.parse_args() + + main( + args.project_id, + args.dataset_id, + args.table_name, + args.schema_file, + args.data_file) diff --git a/bigquery/samples/load_data_from_csv.py b/bigquery/samples/load_data_from_csv.py index b8000785cbb..1068edd42bc 100644 --- a/bigquery/samples/load_data_from_csv.py +++ b/bigquery/samples/load_data_from_csv.py @@ -11,16 +11,16 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse import json import uuid -from bigquery.samples.utils import get_service, poll_job -from six.moves import input +from utils import get_service, poll_job # [START load_table] -def load_table(service, source_schema, source_csv, - projectId, datasetId, tableId, num_retries=5): +def load_table(service, project_id, dataset_id, table_name, source_schema, + source_path, num_retries=5): """ Starts a job to load a bigquery table from CSV @@ -40,70 +40,90 @@ def load_table(service, source_schema, source_csv, # don't accidentally duplicate query job_data = { 'jobReference': { - 'projectId': projectId, + 'projectId': project_id, 'job_id': str(uuid.uuid4()) }, 'configuration': { 'load': { - 'sourceUris': [source_csv], + 'sourceUris': [source_path], 'schema': { 'fields': source_schema }, 'destinationTable': { - 'projectId': projectId, - 'datasetId': datasetId, - 'tableId': tableId + 'projectId': project_id, + 'datasetId': dataset_id, + 'tableId': table_name } } } } return service.jobs().insert( - projectId=projectId, + projectId=project_id, body=job_data).execute(num_retries=num_retries) # [END load_table] # [START run] -def run(source_schema, source_csv, - projectId, datasetId, tableId, interval, num_retries): +def main(project_id, dataset_id, table_name, schema_file, data_path, + poll_interval, num_retries): service = get_service() - job = load_table(service, source_schema, source_csv, - projectId, datasetId, tableId, num_retries) + with open(schema_file, 'r') as f: + schema = json.load(f) + + job = load_table( + service, + project_id, + dataset_id, + table_name, + schema, + data_path, + num_retries) poll_job(service, job['jobReference']['projectId'], job['jobReference']['jobId'], - interval, + poll_interval, num_retries) # [END run] # [START main] -def main(): - projectId = input("Enter the project ID: ") - datasetId = input("Enter a dataset ID: ") - tableId = input("Enter a destination table name: ") - - schema_file_path = input( - "Enter the path to the table schema: ") - with open(schema_file_path, 'r') as schema_file: - schema = json.load(schema_file) - - data_file_path = input( - "Enter the Cloud Storage path for the CSV file: ") - num_retries = input( - "Enter number of times to retry in case of 500 error: ") - interval = input( - "Enter how often to poll the query for completion (seconds): ") - run(schema, - data_file_path, - projectId, - datasetId, - tableId, - interval, - num_retries) - - print("Job complete!") +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Loads data into BigQuery from a CSV file in Google ' + 'Cloud Storage.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('dataset_id', help='A BigQuery dataset ID.') + parser.add_argument( + 'table_name', help='Name of the table to load data into.') + parser.add_argument( + 'schema_file', + help='Path to a schema file describing the table schema.') + parser.add_argument( + 'data_path', + help='Google Cloud Storage path to the CSV data, for example: ' + 'gs://mybucket/in.csv') + parser.add_argument( + '-p', '--poll_interval', + help='How often to poll the query for completion (seconds).', + type=int, + default=1) + parser.add_argument( + '-r', '--num_retries', + help='Number of times to retry in case of 500 error.', + type=int, + default=5) + + args = parser.parse_args() + + main( + args.project_id, + args.dataset_id, + args.table_name, + args.schema_file, + args.data_path, + args.poll_interval, + args.num_retries) # [END main] diff --git a/bigquery/samples/streaming.py b/bigquery/samples/streaming.py index 958efcf5f99..6abfe2eab47 100644 --- a/bigquery/samples/streaming.py +++ b/bigquery/samples/streaming.py @@ -11,20 +11,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse import ast import json import uuid -from bigquery.samples.utils import get_service +from utils import get_service from six.moves import input # [START stream_row_to_bigquery] -def stream_row_to_bigquery(service, - project_id, - dataset_id, - table_id, - row, +def stream_row_to_bigquery(service, project_id, dataset_id, table_name, row, num_retries=5): # Generate a unique row id so retries # don't accidentally duplicate insert @@ -35,42 +32,53 @@ def stream_row_to_bigquery(service, return service.tabledata().insertAll( projectId=project_id, datasetId=dataset_id, - tableId=table_id, + tableId=table_name, body=insert_all_data).execute(num_retries=num_retries) # [END stream_row_to_bigquery] # [START run] -def run(project_id, dataset_id, table_id, rows, num_retries): +def main(project_id, dataset_id, table_name, num_retries): service = get_service() - for row in rows: - response = stream_row_to_bigquery(service, - project_id, - dataset_id, - table_id, - row, - num_retries) - yield json.dumps(response) -# [END run] + for row in get_rows(): + response = stream_row_to_bigquery( + service, project_id, dataset_id, table_name, row, num_retries) + print(json.dumps(response)) -# [START main] def get_rows(): line = input("Enter a row (python dict) into the table: ") while line: yield ast.literal_eval(line) line = input("Enter another row into the table \n" + "[hit enter to stop]: ") +# [END run] -def main(): - project_id = input("Enter the project ID: ") - dataset_id = input("Enter a dataset ID: ") - table_id = input("Enter a table ID : ") - num_retries = int(input( - "Enter number of times to retry in case of 500 error: ")) +# [START main] +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Streams data into BigQuery from the command line.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('dataset_id', help='A BigQuery dataset ID.') + parser.add_argument( + 'table_name', help='Name of the table to load data into.') + parser.add_argument( + '-p', '--poll_interval', + help='How often to poll the query for completion (seconds).', + type=int, + default=1) + parser.add_argument( + '-r', '--num_retries', + help='Number of times to retry in case of 500 error.', + type=int, + default=5) + + args = parser.parse_args() - for result in run(project_id, dataset_id, table_id, - get_rows(), num_retries): - print(result) + main( + args.project_id, + args.dataset_id, + args.table_name, + args.num_retries) # [END main] diff --git a/bigquery/samples/sync_query.py b/bigquery/samples/sync_query.py index 56be93ef262..b8770fca073 100644 --- a/bigquery/samples/sync_query.py +++ b/bigquery/samples/sync_query.py @@ -11,10 +11,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse import json -from bigquery.samples.utils import get_service, paging -from six.moves import input +from utils import get_service, paging # [START sync_query] @@ -30,7 +30,7 @@ def sync_query(service, project_id, query, timeout=10000, num_retries=5): # [START run] -def run(project_id, query, timeout, num_retries): +def main(project_id, query, timeout, num_retries): service = get_service() response = sync_query(service, project_id, @@ -42,21 +42,33 @@ def run(project_id, query, timeout, num_retries): service.jobs().getQueryResults, num_retries=num_retries, **response['jobReference']): - yield json.dumps(page['rows']) + print(json.dumps(page['rows'])) # [END run] # [START main] -def main(): - project_id = input("Enter the project ID: ") - query_string = input("Enter the Bigquery SQL Query: ") - timeout = input( - "Enter how long to wait for the query to complete in milliseconds" - "\n (if longer than 10 seconds, use an asynchronous query): ") - num_retries = int(input( - "Enter how many times to retry in case of server error")) - - for result in run(project_id, query_string, timeout, num_retries): - print(result) +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Loads data into BigQuery.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('query', help='BigQuery SQL Query.') + parser.add_argument( + '-t', '--timeout', + help='Number seconds to wait for a result', + type=int, + default=30) + parser.add_argument( + '-r', '--num_retries', + help='Number of times to retry in case of 500 error.', + type=int, + default=5) + + args = parser.parse_args() + + main( + args.project_id, + args.query, + args.timeout, + args.num_retries) # [END main] diff --git a/bigquery/samples/utils.py b/bigquery/samples/utils.py index 8a2f9af9557..fc2384510e3 100644 --- a/bigquery/samples/utils.py +++ b/bigquery/samples/utils.py @@ -47,6 +47,8 @@ def poll_job(service, projectId, jobId, interval=5.0, num_retries=5): time.sleep(float(interval)) job_resource = job_get.execute(num_retries=num_retries) + print('Job is {}.'.format(job_resource['status']['state'])) + return job_resource # [END poll_job] diff --git a/bigquery/tests/test_async_query.py b/bigquery/tests/test_async_query.py index 0792cb57072..a06072dc760 100644 --- a/bigquery/tests/test_async_query.py +++ b/bigquery/tests/test_async_query.py @@ -13,10 +13,8 @@ # import json import os -import unittest from bigquery.samples.async_query import main, run - import tests @@ -41,7 +39,3 @@ def test_async_query_runner(self): with tests.mock_input_answers( answers, target='bigquery.samples.async_query.input'): main() - - -if __name__ == '__main__': - unittest.main() diff --git a/bigquery/tests/test_export_data_to_cloud_storage.py b/bigquery/tests/test_export_data_to_cloud_storage.py index d5872d87241..dee1f2041ac 100644 --- a/bigquery/tests/test_export_data_to_cloud_storage.py +++ b/bigquery/tests/test_export_data_to_cloud_storage.py @@ -13,9 +13,6 @@ # """Tests for export_table_to_gcs.""" - -import unittest - from bigquery.samples.export_data_to_cloud_storage import run from tests import CloudBaseTest @@ -48,6 +45,3 @@ def test_export_table_avro(self): 5, 5, export_format="AVRO") - -if __name__ == '__main__': - unittest.main() diff --git a/bigquery/tests/test_getting_started.py b/bigquery/tests/test_getting_started.py index ee311173a3b..4fb115559ed 100644 --- a/bigquery/tests/test_getting_started.py +++ b/bigquery/tests/test_getting_started.py @@ -12,10 +12,8 @@ # limitations under the License. # import re -import unittest from bigquery.samples import getting_started - import tests @@ -26,7 +24,3 @@ def test_main(self): stdout = mock_stdout.getvalue() self.assertRegexpMatches(stdout, re.compile( r'Query Results:.hamlet', re.DOTALL)) - - -if __name__ == '__main__': - unittest.main() diff --git a/bigquery/tests/test_list_datasets_projects.py b/bigquery/tests/test_list_datasets_projects.py index 1cb916e3187..8abb120bc91 100644 --- a/bigquery/tests/test_list_datasets_projects.py +++ b/bigquery/tests/test_list_datasets_projects.py @@ -12,10 +12,8 @@ # limitations under the License. # import re -import unittest from bigquery.samples import list_datasets_projects - import tests @@ -29,7 +27,3 @@ def test_main(self): r'Project list:.*bigquery#projectList.*projects', re.DOTALL)) self.assertRegexpMatches(stdout, re.compile( r'Dataset list:.*datasets.*datasetId', re.DOTALL)) - - -if __name__ == '__main__': - unittest.main() diff --git a/bigquery/tests/test_load_data_from_csv.py b/bigquery/tests/test_load_data_from_csv.py index 53c8039b02c..aaea740f9ec 100644 --- a/bigquery/tests/test_load_data_from_csv.py +++ b/bigquery/tests/test_load_data_from_csv.py @@ -12,10 +12,8 @@ # limitations under the License. # """Tests for load_data_from_csv.""" - import json import os -import unittest from bigquery.samples.load_data_from_csv import run from tests import CloudBaseTest @@ -38,7 +36,3 @@ def test_load_table(self): self.constants['newTableId'], 5, 5) - - -if __name__ == '__main__': - unittest.main() diff --git a/bigquery/tests/test_streaming.py b/bigquery/tests/test_streaming.py index 791475881ac..52e2c72b439 100644 --- a/bigquery/tests/test_streaming.py +++ b/bigquery/tests/test_streaming.py @@ -14,7 +14,6 @@ """Tests for export_table_to_gcs.""" import json import os -import unittest from bigquery.samples.streaming import run from tests import CloudBaseTest @@ -36,7 +35,3 @@ def test_stream_row_to_bigquery(self): rows, 5): self.assertIsNotNone(json.loads(result)) - - -if __name__ == '__main__': - unittest.main() diff --git a/bigquery/tests/test_sync_query.py b/bigquery/tests/test_sync_query.py index e9f0637ac6b..884f01f75d9 100644 --- a/bigquery/tests/test_sync_query.py +++ b/bigquery/tests/test_sync_query.py @@ -12,7 +12,6 @@ # limitations under the License. # import json -import unittest from bigquery.samples.sync_query import run from tests import CloudBaseTest @@ -27,7 +26,3 @@ def test_sync_query(self): 5): self.assertIsNotNone(json.loads(result)) - - -if __name__ == '__main__': - unittest.main() From 39dab6b98c276ed9abfa91ee519493379caf5c4a Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Mon, 14 Sep 2015 16:37:57 -0700 Subject: [PATCH 2/6] Fixing bigquery tests --- bigquery/tests/test_async_query.py | 33 ++++++++----------- .../test_export_data_to_cloud_storage.py | 17 ++++++---- bigquery/tests/test_getting_started.py | 1 + bigquery/tests/test_list_datasets_projects.py | 2 ++ bigquery/tests/test_load_data_from_csv.py | 18 +++------- bigquery/tests/test_streaming.py | 22 ++++++++----- bigquery/tests/test_sync_query.py | 17 ++++++---- 7 files changed, 54 insertions(+), 56 deletions(-) diff --git a/bigquery/tests/test_async_query.py b/bigquery/tests/test_async_query.py index a06072dc760..ba3afefd54f 100644 --- a/bigquery/tests/test_async_query.py +++ b/bigquery/tests/test_async_query.py @@ -12,30 +12,23 @@ # limitations under the License. # import json -import os -from bigquery.samples.async_query import main, run +from bigquery.samples.async_query import main import tests class TestAsyncQuery(tests.CloudBaseTest): def test_async_query(self): - for result in run(self.constants['projectId'], - self.constants['query'], - False, - 5, - 5): - self.assertIsNotNone(json.loads(result)) - - -class TestAsyncRunner(tests.CloudBaseTest): - - def test_async_query_runner(self): - test_project_id = os.environ.get(tests.PROJECT_ID_ENV) - answers = [test_project_id, self.constants['query'], 'n', - '1', '1'] - - with tests.mock_input_answers( - answers, target='bigquery.samples.async_query.input'): - main() + with tests.capture_stdout() as stdout: + main( + self.constants['projectId'], + self.constants['query'], + False, + 5, + 5) + + value = stdout.getvalue().split('\n')[1] + + self.assertIsNotNone( + json.loads(value)) diff --git a/bigquery/tests/test_export_data_to_cloud_storage.py b/bigquery/tests/test_export_data_to_cloud_storage.py index dee1f2041ac..4bfa65740b5 100644 --- a/bigquery/tests/test_export_data_to_cloud_storage.py +++ b/bigquery/tests/test_export_data_to_cloud_storage.py @@ -13,35 +13,38 @@ # """Tests for export_table_to_gcs.""" -from bigquery.samples.export_data_to_cloud_storage import run +from bigquery.samples.export_data_to_cloud_storage import main from tests import CloudBaseTest class TestExportTableToGCS(CloudBaseTest): def test_export_table_csv(self): - run(self.constants['cloudStorageInputURI'], + main( + self.constants['cloudStorageInputURI'], self.constants['projectId'], self.constants['datasetId'], self.constants['newTableId'], 5, - 5, + 1, export_format="CSV") def test_export_table_json(self): - run(self.constants['cloudStorageInputURI'], + main( + self.constants['cloudStorageInputURI'], self.constants['projectId'], self.constants['datasetId'], self.constants['newTableId'], 5, - 5, + 1, export_format="NEWLINE_DELIMITED_JSON") def test_export_table_avro(self): - run(self.constants['cloudStorageInputURI'], + main( + self.constants['cloudStorageInputURI'], self.constants['projectId'], self.constants['datasetId'], self.constants['newTableId'], 5, - 5, + 1, export_format="AVRO") diff --git a/bigquery/tests/test_getting_started.py b/bigquery/tests/test_getting_started.py index 4fb115559ed..dbb4ff555a5 100644 --- a/bigquery/tests/test_getting_started.py +++ b/bigquery/tests/test_getting_started.py @@ -21,6 +21,7 @@ class TestGettingStarted(tests.CloudBaseTest): def test_main(self): with tests.capture_stdout() as mock_stdout: getting_started.main(self.constants['projectId']) + stdout = mock_stdout.getvalue() self.assertRegexpMatches(stdout, re.compile( r'Query Results:.hamlet', re.DOTALL)) diff --git a/bigquery/tests/test_list_datasets_projects.py b/bigquery/tests/test_list_datasets_projects.py index 8abb120bc91..7dfcf39821b 100644 --- a/bigquery/tests/test_list_datasets_projects.py +++ b/bigquery/tests/test_list_datasets_projects.py @@ -22,7 +22,9 @@ class TestListDatasetsProjects(tests.CloudBaseTest): def test_main(self): with tests.capture_stdout() as mock_stdout: list_datasets_projects.main(self.constants['projectId']) + stdout = mock_stdout.getvalue() + self.assertRegexpMatches(stdout, re.compile( r'Project list:.*bigquery#projectList.*projects', re.DOTALL)) self.assertRegexpMatches(stdout, re.compile( diff --git a/bigquery/tests/test_load_data_from_csv.py b/bigquery/tests/test_load_data_from_csv.py index aaea740f9ec..a800c4be3e7 100644 --- a/bigquery/tests/test_load_data_from_csv.py +++ b/bigquery/tests/test_load_data_from_csv.py @@ -12,27 +12,19 @@ # limitations under the License. # """Tests for load_data_from_csv.""" -import json import os -from bigquery.samples.load_data_from_csv import run +from bigquery.samples.load_data_from_csv import main from tests import CloudBaseTest class TestLoadDataFromCSV(CloudBaseTest): - - def setUp(self): - super(TestLoadDataFromCSV, self).setUp() - with open( - os.path.join(self.resource_path, 'schema.json'), - 'r') as schema_file: - self.schema = json.load(schema_file) - def test_load_table(self): - run(self.schema, - self.constants['cloudStorageInputURI'], + main( self.constants['projectId'], self.constants['datasetId'], self.constants['newTableId'], - 5, + os.path.join(self.resource_path, 'schema.json'), + self.constants['cloudStorageInputURI'], + 1, 5) diff --git a/bigquery/tests/test_streaming.py b/bigquery/tests/test_streaming.py index 52e2c72b439..e925ab64673 100644 --- a/bigquery/tests/test_streaming.py +++ b/bigquery/tests/test_streaming.py @@ -15,23 +15,27 @@ import json import os -from bigquery.samples.streaming import run -from tests import CloudBaseTest +from bigquery.samples import streaming +from tests import CloudBaseTest, capture_stdout class TestStreaming(CloudBaseTest): def test_stream_row_to_bigquery(self): - with open( os.path.join(self.resource_path, 'streamrows.json'), 'r') as rows_file: rows = json.load(rows_file) - for result in run(self.constants['projectId'], - self.constants['datasetId'], - self.constants['newTableId'], - rows, - 5): - self.assertIsNotNone(json.loads(result)) + streaming.get_rows = lambda: rows + + with capture_stdout() as stdout: + streaming.main( + self.constants['projectId'], + self.constants['datasetId'], + self.constants['newTableId'], + 5) + + results = stdout.getvalue().split('\n') + self.assertIsNotNone(json.loads(results[0])) diff --git a/bigquery/tests/test_sync_query.py b/bigquery/tests/test_sync_query.py index 884f01f75d9..271467773c0 100644 --- a/bigquery/tests/test_sync_query.py +++ b/bigquery/tests/test_sync_query.py @@ -13,16 +13,19 @@ # import json -from bigquery.samples.sync_query import run -from tests import CloudBaseTest +from bigquery.samples.sync_query import main +from tests import CloudBaseTest, capture_stdout class TestSyncQuery(CloudBaseTest): def test_sync_query(self): - for result in run(self.constants['projectId'], - self.constants['query'], - 5000, - 5): + with capture_stdout() as stdout: + main( + self.constants['projectId'], + self.constants['query'], + 30, + 5) - self.assertIsNotNone(json.loads(result)) + result = stdout.getvalue().split('\n')[0] + self.assertIsNotNone(json.loads(result)) From ad20eca2af1e7126f6d03cc10d95696a73c62793 Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Mon, 14 Sep 2015 16:40:01 -0700 Subject: [PATCH 3/6] Fixing pep8, removing mock_input_answers --- bigquery/samples/load_data_by_post.py | 2 +- bigquery/samples/streaming.py | 2 +- bigquery/tests/test_streaming.py | 2 +- bigquery/tests/test_sync_query.py | 2 +- tests/__init__.py | 2 -- tests/utils.py | 23 ----------------------- 6 files changed, 4 insertions(+), 29 deletions(-) diff --git a/bigquery/samples/load_data_by_post.py b/bigquery/samples/load_data_by_post.py index 1d72b77ece4..c9e822d32bc 100644 --- a/bigquery/samples/load_data_by_post.py +++ b/bigquery/samples/load_data_by_post.py @@ -14,9 +14,9 @@ import argparse import json -from utils import get_service, poll_job import httplib2 from oauth2client.client import GoogleCredentials +from utils import get_service, poll_job # [START make_post] diff --git a/bigquery/samples/streaming.py b/bigquery/samples/streaming.py index 6abfe2eab47..135af1402e9 100644 --- a/bigquery/samples/streaming.py +++ b/bigquery/samples/streaming.py @@ -16,8 +16,8 @@ import json import uuid -from utils import get_service from six.moves import input +from utils import get_service # [START stream_row_to_bigquery] diff --git a/bigquery/tests/test_streaming.py b/bigquery/tests/test_streaming.py index e925ab64673..c29efa06e63 100644 --- a/bigquery/tests/test_streaming.py +++ b/bigquery/tests/test_streaming.py @@ -16,7 +16,7 @@ import os from bigquery.samples import streaming -from tests import CloudBaseTest, capture_stdout +from tests import capture_stdout, CloudBaseTest class TestStreaming(CloudBaseTest): diff --git a/bigquery/tests/test_sync_query.py b/bigquery/tests/test_sync_query.py index 271467773c0..f5ed3eadf42 100644 --- a/bigquery/tests/test_sync_query.py +++ b/bigquery/tests/test_sync_query.py @@ -14,7 +14,7 @@ import json from bigquery.samples.sync_query import main -from tests import CloudBaseTest, capture_stdout +from tests import capture_stdout, CloudBaseTest class TestSyncQuery(CloudBaseTest): diff --git a/tests/__init__.py b/tests/__init__.py index 8c68e5c9bd7..84164816c8a 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -17,7 +17,6 @@ BUCKET_NAME_ENV, capture_stdout, CloudBaseTest, - mock_input_answers, PROJECT_ID_ENV, RESOURCE_PATH) @@ -27,7 +26,6 @@ 'BUCKET_NAME_ENV', 'capture_stdout', 'CloudBaseTest', - 'mock_input_answers', 'PROJECT_ID_ENV', 'RESOURCE_PATH' ] diff --git a/tests/utils.py b/tests/utils.py index 095eb66ae7c..a6aa6e69ebd 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -22,7 +22,6 @@ import tempfile import unittest -from mock import patch from nose.plugins.skip import SkipTest from six.moves import cStringIO @@ -39,28 +38,6 @@ os.path.abspath(os.path.dirname(__file__)), 'resources') -# TODO: This can be written as a much simpler context manager. -class mock_input_answers(object): - - def __init__(self, list_, target): - self.i = 0 - self.list_ = list_ - self.target = target - - def get_next_value(self, question): - ret = self.list_[self.i] - self.i += 1 - print('Responding to {} with {}'.format(question, ret)) - return u"{}".format(ret) - - def __enter__(self): - self.patch = patch(self.target, self.get_next_value) - self.patch.__enter__() - - def __exit__(self, exc_type, exc_value, traceback): - self.patch.__exit__(exc_type, exc_value, traceback) - - class CloudBaseTest(unittest.TestCase): def setUp(self): From e4f827c4c27c5ed753b678b391503aaeb4ce4dd5 Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Tue, 15 Sep 2015 10:40:39 -0700 Subject: [PATCH 4/6] Moving to module-based execution --- bigquery/README.md | 8 ++++++++ bigquery/samples/async_query.py | 2 +- bigquery/samples/export_data_to_cloud_storage.py | 2 +- bigquery/samples/load_data_by_post.py | 2 +- bigquery/samples/load_data_from_csv.py | 2 +- bigquery/samples/streaming.py | 2 +- bigquery/samples/sync_query.py | 2 +- bigquery/requirements.txt => requirements.txt | 8 ++++---- 8 files changed, 18 insertions(+), 10 deletions(-) rename bigquery/requirements.txt => requirements.txt (63%) diff --git a/bigquery/README.md b/bigquery/README.md index 75f7b9a6897..0385d7cbf63 100644 --- a/bigquery/README.md +++ b/bigquery/README.md @@ -2,6 +2,14 @@ This section contains samples for [Google BigQuery](https://cloud.google.com/bigquery). +## Running the samples + +These samples must be run as modules, for example: + +``` +$ python -m bigquery.samples.async_query [your-project-id] [your-query] +``` + ## Other Samples * [Using BigQuery from Google App Engine](../appengine/bigquery). diff --git a/bigquery/samples/async_query.py b/bigquery/samples/async_query.py index 63b7d58e4e3..00a280eb907 100644 --- a/bigquery/samples/async_query.py +++ b/bigquery/samples/async_query.py @@ -15,7 +15,7 @@ import json import uuid -from utils import get_service, paging, poll_job +from .utils import get_service, paging, poll_job # [START async_query] diff --git a/bigquery/samples/export_data_to_cloud_storage.py b/bigquery/samples/export_data_to_cloud_storage.py index 0f22c2669ea..f943deed006 100644 --- a/bigquery/samples/export_data_to_cloud_storage.py +++ b/bigquery/samples/export_data_to_cloud_storage.py @@ -14,7 +14,7 @@ import argparse import uuid -from utils import get_service, poll_job +from .utils import get_service, poll_job # [START export_table] diff --git a/bigquery/samples/load_data_by_post.py b/bigquery/samples/load_data_by_post.py index c9e822d32bc..4c20aed7e8d 100644 --- a/bigquery/samples/load_data_by_post.py +++ b/bigquery/samples/load_data_by_post.py @@ -16,7 +16,7 @@ import httplib2 from oauth2client.client import GoogleCredentials -from utils import get_service, poll_job +from .utils import get_service, poll_job # [START make_post] diff --git a/bigquery/samples/load_data_from_csv.py b/bigquery/samples/load_data_from_csv.py index 1068edd42bc..38bb342202d 100644 --- a/bigquery/samples/load_data_from_csv.py +++ b/bigquery/samples/load_data_from_csv.py @@ -15,7 +15,7 @@ import json import uuid -from utils import get_service, poll_job +from .utils import get_service, poll_job # [START load_table] diff --git a/bigquery/samples/streaming.py b/bigquery/samples/streaming.py index 135af1402e9..2e28bbe57a2 100644 --- a/bigquery/samples/streaming.py +++ b/bigquery/samples/streaming.py @@ -17,7 +17,7 @@ import uuid from six.moves import input -from utils import get_service +from .utils import get_service # [START stream_row_to_bigquery] diff --git a/bigquery/samples/sync_query.py b/bigquery/samples/sync_query.py index b8770fca073..cc7d6ad9d8d 100644 --- a/bigquery/samples/sync_query.py +++ b/bigquery/samples/sync_query.py @@ -14,7 +14,7 @@ import argparse import json -from utils import get_service, paging +from .utils import get_service, paging # [START sync_query] diff --git a/bigquery/requirements.txt b/requirements.txt similarity index 63% rename from bigquery/requirements.txt rename to requirements.txt index 3487fe93bac..396a8981627 100644 --- a/bigquery/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ -argparse==1.2.1 -google-api-python-client==1.3.2 -httplib2==0.9 -oauth2client==1.4.6 +argparse>=1.2.1 +google-api-python-client>=1.4.2 +httplib2>=0.9.1 +oauth2client>=1.5.1 py==1.4.26 pyasn1==0.1.7 pyasn1-modules==0.0.5 From 7b5f96c0c66f88cdae181495d33727561c1d42de Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Tue, 15 Sep 2015 10:50:40 -0700 Subject: [PATCH 5/6] Fixing inconsistences in style --- README.md | 10 ++++++++++ bigquery/samples/utils.py | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 36ab4283270..2dbaaead8b2 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,16 @@ This repository holds the samples used in the python documentation on [cloud.goo For more detailed introduction to a product, check the README in the corresponding folder. +## Running the samples + +Most samples must be run as modules instead of directly, for example: + +``` +$ python -m bigquery.samples.async_query [your-project-id] [your-query] +``` + +Refer to the README in the corresponding folder for any special instructions. + ## Contributing changes * See [CONTRIBUTING.md](CONTRIBUTING.md) diff --git a/bigquery/samples/utils.py b/bigquery/samples/utils.py index fc2384510e3..696761f0120 100644 --- a/bigquery/samples/utils.py +++ b/bigquery/samples/utils.py @@ -33,12 +33,12 @@ def get_service(): # [START poll_job] -def poll_job(service, projectId, jobId, interval=5.0, num_retries=5): +def poll_job(service, project_id, job_id, interval=5.0, num_retries=5): """checks the status of a job every *interval* seconds""" import time - job_get = service.jobs().get(projectId=projectId, jobId=jobId) + job_get = service.jobs().get(projectId=project_id, jobId=job_id) job_resource = job_get.execute(num_retries=num_retries) while not job_resource['status']['state'] == 'DONE': From 2e5d57c30af1f23a6369dda419ad60ce991093fd Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Tue, 15 Sep 2015 12:55:41 -0700 Subject: [PATCH 6/6] Removing utils, putting depending code directly in samples --- bigquery/samples/async_query.py | 71 ++++++++++++++----- .../samples/export_data_to_cloud_storage.py | 59 +++++++++++---- bigquery/samples/list_datasets_projects.py | 2 + bigquery/samples/load_data_by_post.py | 42 ++++++++--- bigquery/samples/load_data_from_csv.py | 49 ++++++++++--- bigquery/samples/streaming.py | 18 +++-- bigquery/samples/sync_query.py | 42 +++++++---- bigquery/samples/utils.py | 68 ------------------ bigquery/tests/test_async_query.py | 2 +- .../test_export_data_to_cloud_storage.py | 6 +- 10 files changed, 217 insertions(+), 142 deletions(-) delete mode 100644 bigquery/samples/utils.py diff --git a/bigquery/samples/async_query.py b/bigquery/samples/async_query.py index 00a280eb907..ff50c03cb17 100644 --- a/bigquery/samples/async_query.py +++ b/bigquery/samples/async_query.py @@ -13,13 +13,15 @@ # import argparse import json +import time import uuid -from .utils import get_service, paging, poll_job +from googleapiclient import discovery +from oauth2client.client import GoogleCredentials # [START async_query] -def async_query(service, project_id, query, batch=False, num_retries=5): +def async_query(bigquery, project_id, query, batch=False, num_retries=5): # Generate a unique job_id so retries # don't accidentally duplicate query job_data = { @@ -34,34 +36,67 @@ def async_query(service, project_id, query, batch=False, num_retries=5): } } } - return service.jobs().insert( + return bigquery.jobs().insert( projectId=project_id, body=job_data).execute(num_retries=num_retries) # [END async_query] +# [START poll_job] +def poll_job(bigquery, job): + """Waits for a job to complete.""" + + print('Waiting for job to finish...') + + request = bigquery.jobs().get( + projectId=job['jobReference']['projectId'], + jobId=job['jobReference']['jobId']) + + while True: + result = request.execute(num_retries=2) + + if result['status']['state'] == 'DONE': + if 'errorResult' in result['status']: + raise RuntimeError(result['status']['errorResult']) + print('Job complete.') + return + + time.sleep(1) +# [END poll_job] + + # [START run] def main(project_id, query_string, batch, num_retries, interval): - service = get_service() + # [START build_service] + # Grab the application's default credentials from the environment. + credentials = GoogleCredentials.get_application_default() - query_job = async_query(service, - project_id, - query_string, - batch, - num_retries) + # Construct the service object for interacting with the BigQuery API. + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) + # [END build_service] - poll_job(service, - query_job['jobReference']['projectId'], - query_job['jobReference']['jobId'], - interval, - num_retries) + # Submit the job and wait for it to complete. + query_job = async_query( + bigquery, + project_id, + query_string, + batch, + num_retries) - for page in paging(service, - service.jobs().getQueryResults, - num_retries=num_retries, - **query_job['jobReference']): + poll_job(bigquery, query_job) + + # Page through the result set and print all results. + page_token = None + while True: + page = bigquery.jobs().getQueryResults( + pageToken=page_token, + **query_job['jobReference']).execute(num_retries=2) print(json.dumps(page['rows'])) + + page_token = page.get('pageToken') + if not page_token: + break # [END run] diff --git a/bigquery/samples/export_data_to_cloud_storage.py b/bigquery/samples/export_data_to_cloud_storage.py index f943deed006..bd0319f0613 100644 --- a/bigquery/samples/export_data_to_cloud_storage.py +++ b/bigquery/samples/export_data_to_cloud_storage.py @@ -12,13 +12,15 @@ # limitations under the License. # import argparse +import time import uuid -from .utils import get_service, poll_job +from googleapiclient import discovery +from oauth2client.client import GoogleCredentials # [START export_table] -def export_table(service, cloud_storage_path, +def export_table(bigquery, cloud_storage_path, project_id, dataset_id, table_id, export_format="CSV", num_retries=5): @@ -26,7 +28,7 @@ def export_table(service, cloud_storage_path, Starts an export job Args: - service: initialized and authorized bigquery + bigquery: initialized and authorized bigquery google-api-client object. cloud_storage_path: fully qualified path to a Google Cloud Storage location. @@ -56,26 +58,55 @@ def export_table(service, cloud_storage_path, } } } - return service.jobs().insert( + return bigquery.jobs().insert( projectId=project_id, body=job_data).execute(num_retries=num_retries) # [END export_table] +# [START poll_job] +def poll_job(bigquery, job): + """Waits for a job to complete.""" + + print('Waiting for job to finish...') + + request = bigquery.jobs().get( + projectId=job['jobReference']['projectId'], + jobId=job['jobReference']['jobId']) + + while True: + result = request.execute(num_retries=2) + + if result['status']['state'] == 'DONE': + if 'errorResult' in result['status']: + raise RuntimeError(result['status']['errorResult']) + print('Job complete.') + return + + time.sleep(1) +# [END poll_job] + + # [START run] def main(cloud_storage_path, project_id, dataset_id, table_id, num_retries, interval, export_format="CSV"): + # [START build_service] + # Grab the application's default credentials from the environment. + credentials = GoogleCredentials.get_application_default() + + # Construct the service object for interacting with the BigQuery API. + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) + # [END build_service] - bigquery = get_service() - resource = export_table(bigquery, cloud_storage_path, - project_id, dataset_id, table_id, - num_retries=num_retries, - export_format=export_format) - poll_job(bigquery, - resource['jobReference']['projectId'], - resource['jobReference']['jobId'], - interval, - num_retries) + job = export_table( + bigquery, + cloud_storage_path, + project_id, + dataset_id, + table_id, + num_retries=num_retries, + export_format=export_format) + poll_job(bigquery, job) # [END run] diff --git a/bigquery/samples/list_datasets_projects.py b/bigquery/samples/list_datasets_projects.py index 900528a0fd2..d8c22c73b79 100644 --- a/bigquery/samples/list_datasets_projects.py +++ b/bigquery/samples/list_datasets_projects.py @@ -69,6 +69,7 @@ def list_datasets(service, project): except HTTPError as err: print('Error in list_datasets: %s' % err.content) + raise err # [END list_datasets] @@ -84,6 +85,7 @@ def list_projects(service): except HTTPError as err: print('Error in list_projects: %s' % err.content) + raise err # [END list_projects] diff --git a/bigquery/samples/load_data_by_post.py b/bigquery/samples/load_data_by_post.py index 4c20aed7e8d..08d2bd2bc3b 100644 --- a/bigquery/samples/load_data_by_post.py +++ b/bigquery/samples/load_data_by_post.py @@ -13,14 +13,15 @@ # import argparse import json +import time +from googleapiclient import discovery import httplib2 from oauth2client.client import GoogleCredentials -from .utils import get_service, poll_job # [START make_post] -def make_post(http, schema, data, projectId, datasetId, tableId): +def make_post(http, schema, data, project_id, dataset_id, table_id): """ Creates an http POST request for loading data into a bigquery table @@ -34,7 +35,7 @@ def make_post(http, schema, data, projectId, datasetId, tableId): Returns: an http.request object """ url = ('https://www.googleapis.com/upload/bigquery/v2/projects/' + - projectId + '/jobs') + project_id + '/jobs') # Create the body of the request, separated by a boundary of xxx resource = ('--xxx\n' + 'Content-Type: application/json; charset=UTF-8\n' + '\n' + @@ -45,9 +46,9 @@ def make_post(http, schema, data, projectId, datasetId, tableId): ' "fields": ' + str(schema) + '\n' + ' },\n' + ' "destinationTable": {\n' + - ' "projectId": "' + projectId + '",\n' + - ' "datasetId": "' + datasetId + '",\n' + - ' "tableId": "' + tableId + '"\n' + + ' "projectId": "' + project_id + '",\n' + + ' "datasetId": "' + dataset_id + '",\n' + + ' "tableId": "' + table_id + '"\n' + ' }\n' + ' }\n' + ' }\n' + @@ -70,10 +71,34 @@ def make_post(http, schema, data, projectId, datasetId, tableId): # [END make_post] +# [START poll_job] +def poll_job(bigquery, job): + """Waits for a job to complete.""" + + print('Waiting for job to finish...') + + request = bigquery.jobs().get( + projectId=job['jobReference']['projectId'], + jobId=job['jobReference']['jobId']) + + while True: + result = request.execute(num_retries=2) + + if result['status']['state'] == 'DONE': + if 'errorResult' in result['status']: + raise RuntimeError(result['status']['errorResult']) + print('Job complete.') + return + + time.sleep(1) +# [END poll_job] + + # [START main] def main(project_id, dataset_id, table_name, schema_path, data_path): credentials = GoogleCredentials.get_application_default() http = credentials.authorize(httplib2.Http()) + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) with open(schema_path, 'r') as schema_file: schema = schema_file.read() @@ -90,9 +115,8 @@ def main(project_id, dataset_id, table_name, schema_path, data_path): table_name) if resp.status == 200: - job_resource = json.loads(content) - service = get_service() - poll_job(service, **job_resource['jobReference']) + job = json.loads(content) + poll_job(bigquery, job) print("Success!") else: print("Http error code: {}".format(resp.status)) diff --git a/bigquery/samples/load_data_from_csv.py b/bigquery/samples/load_data_from_csv.py index 38bb342202d..084eecca63d 100644 --- a/bigquery/samples/load_data_from_csv.py +++ b/bigquery/samples/load_data_from_csv.py @@ -13,19 +13,21 @@ # import argparse import json +import time import uuid -from .utils import get_service, poll_job +from googleapiclient import discovery +from oauth2client.client import GoogleCredentials # [START load_table] -def load_table(service, project_id, dataset_id, table_name, source_schema, +def load_table(bigquery, project_id, dataset_id, table_name, source_schema, source_path, num_retries=5): """ Starts a job to load a bigquery table from CSV Args: - service: an initialized and authorized bigquery + bigquery: an initialized and authorized bigquery client google-api-client object source_schema: a valid bigquery schema, see https://cloud.google.com/bigquery/docs/reference/v2/tables @@ -58,22 +60,51 @@ def load_table(service, project_id, dataset_id, table_name, source_schema, } } - return service.jobs().insert( + return bigquery.jobs().insert( projectId=project_id, body=job_data).execute(num_retries=num_retries) # [END load_table] +# [START poll_job] +def poll_job(bigquery, job): + """Waits for a job to complete.""" + + print('Waiting for job to finish...') + + request = bigquery.jobs().get( + projectId=job['jobReference']['projectId'], + jobId=job['jobReference']['jobId']) + + while True: + result = request.execute(num_retries=2) + + if result['status']['state'] == 'DONE': + if 'errorResult' in result['status']: + raise RuntimeError(result['status']['errorResult']) + print('Job complete.') + return + + time.sleep(1) +# [END poll_job] + + # [START run] def main(project_id, dataset_id, table_name, schema_file, data_path, poll_interval, num_retries): - service = get_service() + # [START build_service] + # Grab the application's default credentials from the environment. + credentials = GoogleCredentials.get_application_default() + + # Construct the service object for interacting with the BigQuery API. + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) + # [END build_service] with open(schema_file, 'r') as f: schema = json.load(f) job = load_table( - service, + bigquery, project_id, dataset_id, table_name, @@ -81,11 +112,7 @@ def main(project_id, dataset_id, table_name, schema_file, data_path, data_path, num_retries) - poll_job(service, - job['jobReference']['projectId'], - job['jobReference']['jobId'], - poll_interval, - num_retries) + poll_job(bigquery, job) # [END run] diff --git a/bigquery/samples/streaming.py b/bigquery/samples/streaming.py index 2e28bbe57a2..06c1cfca052 100644 --- a/bigquery/samples/streaming.py +++ b/bigquery/samples/streaming.py @@ -16,12 +16,13 @@ import json import uuid +from googleapiclient import discovery +from oauth2client.client import GoogleCredentials from six.moves import input -from .utils import get_service # [START stream_row_to_bigquery] -def stream_row_to_bigquery(service, project_id, dataset_id, table_name, row, +def stream_row_to_bigquery(bigquery, project_id, dataset_id, table_name, row, num_retries=5): # Generate a unique row id so retries # don't accidentally duplicate insert @@ -29,7 +30,7 @@ def stream_row_to_bigquery(service, project_id, dataset_id, table_name, row, 'insertId': str(uuid.uuid4()), 'rows': [{'json': row}] } - return service.tabledata().insertAll( + return bigquery.tabledata().insertAll( projectId=project_id, datasetId=dataset_id, tableId=table_name, @@ -39,10 +40,17 @@ def stream_row_to_bigquery(service, project_id, dataset_id, table_name, row, # [START run] def main(project_id, dataset_id, table_name, num_retries): - service = get_service() + # [START build_service] + # Grab the application's default credentials from the environment. + credentials = GoogleCredentials.get_application_default() + + # Construct the service object for interacting with the BigQuery API. + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) + # [END build_service] + for row in get_rows(): response = stream_row_to_bigquery( - service, project_id, dataset_id, table_name, row, num_retries) + bigquery, project_id, dataset_id, table_name, row, num_retries) print(json.dumps(response)) diff --git a/bigquery/samples/sync_query.py b/bigquery/samples/sync_query.py index cc7d6ad9d8d..5c08e2bc6cb 100644 --- a/bigquery/samples/sync_query.py +++ b/bigquery/samples/sync_query.py @@ -14,16 +14,17 @@ import argparse import json -from .utils import get_service, paging +from googleapiclient import discovery +from oauth2client.client import GoogleCredentials # [START sync_query] -def sync_query(service, project_id, query, timeout=10000, num_retries=5): +def sync_query(bigquery, project_id, query, timeout=10000, num_retries=5): query_data = { 'query': query, 'timeoutMs': timeout, } - return service.jobs().query( + return bigquery.jobs().query( projectId=project_id, body=query_data).execute(num_retries=num_retries) # [END sync_query] @@ -31,18 +32,33 @@ def sync_query(service, project_id, query, timeout=10000, num_retries=5): # [START run] def main(project_id, query, timeout, num_retries): - service = get_service() - response = sync_query(service, - project_id, - query, - timeout, - num_retries) + # [START build_service] + # Grab the application's default credentials from the environment. + credentials = GoogleCredentials.get_application_default() + + # Construct the service object for interacting with the BigQuery API. + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) + # [END build_service] + + query_job = sync_query( + bigquery, + project_id, + query, + timeout, + num_retries) + + # Page through the result set and print all results. + page_token = None + while True: + page = bigquery.jobs().getQueryResults( + pageToken=page_token, + **query_job['jobReference']).execute(num_retries=2) - for page in paging(service, - service.jobs().getQueryResults, - num_retries=num_retries, - **response['jobReference']): print(json.dumps(page['rows'])) + + page_token = page.get('pageToken') + if not page_token: + break # [END run] diff --git a/bigquery/samples/utils.py b/bigquery/samples/utils.py deleted file mode 100644 index 696761f0120..00000000000 --- a/bigquery/samples/utils.py +++ /dev/null @@ -1,68 +0,0 @@ -# Copyright 2015, Google, Inc. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -# [START get_service] -def get_service(): - """returns an initialized and authorized bigquery client""" - - from googleapiclient.discovery import build - from oauth2client.client import GoogleCredentials - - credentials = GoogleCredentials.get_application_default() - - # Create and return the service object for v2 of the bigquery api. You can - # browse available apis and versions here: - # https://developers.google.com/api-client-library/python/apis/ - # - # The build() method also takes care of injecting the necessary scopes - # to access the given service, when using application default credentials. - return build('bigquery', 'v2', credentials=credentials) -# [END get_service] - - -# [START poll_job] -def poll_job(service, project_id, job_id, interval=5.0, num_retries=5): - """checks the status of a job every *interval* seconds""" - - import time - - job_get = service.jobs().get(projectId=project_id, jobId=job_id) - job_resource = job_get.execute(num_retries=num_retries) - - while not job_resource['status']['state'] == 'DONE': - print('Job is {}, waiting {} seconds...' - .format(job_resource['status']['state'], interval)) - time.sleep(float(interval)) - job_resource = job_get.execute(num_retries=num_retries) - - print('Job is {}.'.format(job_resource['status']['state'])) - - return job_resource -# [END poll_job] - - -# [START paging] -def paging(service, request_func, num_retries=5, **kwargs): - """pages though the results of an asynchronous job""" - - has_next = True - while has_next: - response = request_func(**kwargs).execute(num_retries=num_retries) - if 'pageToken' in response: - kwargs['pageToken'] = response['pageToken'] - else: - has_next = False - yield response -# [END paging] diff --git a/bigquery/tests/test_async_query.py b/bigquery/tests/test_async_query.py index ba3afefd54f..8ea0abc293c 100644 --- a/bigquery/tests/test_async_query.py +++ b/bigquery/tests/test_async_query.py @@ -28,7 +28,7 @@ def test_async_query(self): 5, 5) - value = stdout.getvalue().split('\n')[1] + value = stdout.getvalue().strip().split('\n').pop() self.assertIsNotNone( json.loads(value)) diff --git a/bigquery/tests/test_export_data_to_cloud_storage.py b/bigquery/tests/test_export_data_to_cloud_storage.py index 4bfa65740b5..97501bea2ce 100644 --- a/bigquery/tests/test_export_data_to_cloud_storage.py +++ b/bigquery/tests/test_export_data_to_cloud_storage.py @@ -21,7 +21,7 @@ class TestExportTableToGCS(CloudBaseTest): def test_export_table_csv(self): main( - self.constants['cloudStorageInputURI'], + self.constants['cloudStorageOutputURI'], self.constants['projectId'], self.constants['datasetId'], self.constants['newTableId'], @@ -31,7 +31,7 @@ def test_export_table_csv(self): def test_export_table_json(self): main( - self.constants['cloudStorageInputURI'], + self.constants['cloudStorageOutputURI'], self.constants['projectId'], self.constants['datasetId'], self.constants['newTableId'], @@ -41,7 +41,7 @@ def test_export_table_json(self): def test_export_table_avro(self): main( - self.constants['cloudStorageInputURI'], + self.constants['cloudStorageOutputURI'], self.constants['projectId'], self.constants['datasetId'], self.constants['newTableId'],