diff --git a/README.md b/README.md index 36ab4283270..2dbaaead8b2 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,16 @@ This repository holds the samples used in the python documentation on [cloud.goo For more detailed introduction to a product, check the README in the corresponding folder. +## Running the samples + +Most samples must be run as modules instead of directly, for example: + +``` +$ python -m bigquery.samples.async_query [your-project-id] [your-query] +``` + +Refer to the README in the corresponding folder for any special instructions. + ## Contributing changes * See [CONTRIBUTING.md](CONTRIBUTING.md) diff --git a/appengine/ndb/transactions/main.py b/appengine/ndb/transactions/main.py index 4a077af565d..024c73a77cc 100644 --- a/appengine/ndb/transactions/main.py +++ b/appengine/ndb/transactions/main.py @@ -191,7 +191,3 @@ def add_note(): return ('Already there
Return' % flask.url_for('main_page', page_name=page_name)) return flask.redirect(flask.url_for('main_page', page_name=page_name)) - - -if __name__ == '__main__': - app.run() diff --git a/bigquery/README.md b/bigquery/README.md index 75f7b9a6897..0385d7cbf63 100644 --- a/bigquery/README.md +++ b/bigquery/README.md @@ -2,6 +2,14 @@ This section contains samples for [Google BigQuery](https://cloud.google.com/bigquery). +## Running the samples + +These samples must be run as modules, for example: + +``` +$ python -m bigquery.samples.async_query [your-project-id] [your-query] +``` + ## Other Samples * [Using BigQuery from Google App Engine](../appengine/bigquery). diff --git a/bigquery/samples/async_query.py b/bigquery/samples/async_query.py index a0b2f775f0c..ff50c03cb17 100644 --- a/bigquery/samples/async_query.py +++ b/bigquery/samples/async_query.py @@ -11,17 +11,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse import json +import time import uuid -from bigquery.samples.utils import get_service -from bigquery.samples.utils import paging -from bigquery.samples.utils import poll_job -from six.moves import input +from googleapiclient import discovery +from oauth2client.client import GoogleCredentials # [START async_query] -def async_query(service, project_id, query, batch=False, num_retries=5): +def async_query(bigquery, project_id, query, batch=False, num_retries=5): # Generate a unique job_id so retries # don't accidentally duplicate query job_data = { @@ -36,48 +36,95 @@ def async_query(service, project_id, query, batch=False, num_retries=5): } } } - return service.jobs().insert( + return bigquery.jobs().insert( projectId=project_id, body=job_data).execute(num_retries=num_retries) # [END async_query] +# [START poll_job] +def poll_job(bigquery, job): + """Waits for a job to complete.""" + + print('Waiting for job to finish...') + + request = bigquery.jobs().get( + projectId=job['jobReference']['projectId'], + jobId=job['jobReference']['jobId']) + + while True: + result = request.execute(num_retries=2) + + if result['status']['state'] == 'DONE': + if 'errorResult' in result['status']: + raise RuntimeError(result['status']['errorResult']) + print('Job complete.') + return + + time.sleep(1) +# [END poll_job] + + # [START run] -def run(project_id, query_string, batch, num_retries, interval): - service = get_service() - - query_job = async_query(service, - project_id, - query_string, - batch, - num_retries) - - poll_job(service, - query_job['jobReference']['projectId'], - query_job['jobReference']['jobId'], - interval, - num_retries) - - for page in paging(service, - service.jobs().getQueryResults, - num_retries=num_retries, - **query_job['jobReference']): - - yield json.dumps(page['rows']) +def main(project_id, query_string, batch, num_retries, interval): + # [START build_service] + # Grab the application's default credentials from the environment. + credentials = GoogleCredentials.get_application_default() + + # Construct the service object for interacting with the BigQuery API. + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) + # [END build_service] + + # Submit the job and wait for it to complete. + query_job = async_query( + bigquery, + project_id, + query_string, + batch, + num_retries) + + poll_job(bigquery, query_job) + + # Page through the result set and print all results. + page_token = None + while True: + page = bigquery.jobs().getQueryResults( + pageToken=page_token, + **query_job['jobReference']).execute(num_retries=2) + + print(json.dumps(page['rows'])) + + page_token = page.get('pageToken') + if not page_token: + break # [END run] # [START main] -def main(): - project_id = input("Enter the project ID: ") - query_string = input("Enter the Bigquery SQL Query: ") - batch = input("Run query as batch (y/n)?: ") in ( - 'True', 'true', 'y', 'Y', 'yes', 'Yes') - num_retries = int(input( - "Enter number of times to retry in case of 500 error: ")) - interval = input( - "Enter how often to poll the query for completion (seconds): ") - - for result in run(project_id, query_string, batch, num_retries, interval): - print(result) +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Loads data into BigQuery.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('query', help='BigQuery SQL Query.') + parser.add_argument( + '-b', '--batch', help='Run query in batch mode.', action='store_true') + parser.add_argument( + '-r', '--num_retries', + help='Number of times to retry in case of 500 error.', + type=int, + default=5) + parser.add_argument( + '-p', '--poll_interval', + help='How often to poll the query for completion (seconds).', + type=int, + default=1) + + args = parser.parse_args() + + main( + args.project_id, + args.query, + args.batch, + args.num_retries, + args.poll_interval) # [END main] diff --git a/bigquery/samples/export_data_to_cloud_storage.py b/bigquery/samples/export_data_to_cloud_storage.py index 334a12d4298..bd0319f0613 100644 --- a/bigquery/samples/export_data_to_cloud_storage.py +++ b/bigquery/samples/export_data_to_cloud_storage.py @@ -11,23 +11,24 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse +import time import uuid -from bigquery.samples.utils import get_service -from bigquery.samples.utils import poll_job -from six.moves import input +from googleapiclient import discovery +from oauth2client.client import GoogleCredentials # [START export_table] -def export_table(service, cloud_storage_path, - projectId, datasetId, tableId, +def export_table(bigquery, cloud_storage_path, + project_id, dataset_id, table_id, export_format="CSV", num_retries=5): """ Starts an export job Args: - service: initialized and authorized bigquery + bigquery: initialized and authorized bigquery google-api-client object. cloud_storage_path: fully qualified path to a Google Cloud Storage location. @@ -42,60 +43,102 @@ def export_table(service, cloud_storage_path, # don't accidentally duplicate export job_data = { 'jobReference': { - 'projectId': projectId, + 'projectId': project_id, 'jobId': str(uuid.uuid4()) }, 'configuration': { 'extract': { 'sourceTable': { - 'projectId': projectId, - 'datasetId': datasetId, - 'tableId': tableId, + 'projectId': project_id, + 'datasetId': dataset_id, + 'tableId': table_id, }, 'destinationUris': [cloud_storage_path], 'destinationFormat': export_format } } } - return service.jobs().insert( - projectId=projectId, + return bigquery.jobs().insert( + projectId=project_id, body=job_data).execute(num_retries=num_retries) # [END export_table] +# [START poll_job] +def poll_job(bigquery, job): + """Waits for a job to complete.""" + + print('Waiting for job to finish...') + + request = bigquery.jobs().get( + projectId=job['jobReference']['projectId'], + jobId=job['jobReference']['jobId']) + + while True: + result = request.execute(num_retries=2) + + if result['status']['state'] == 'DONE': + if 'errorResult' in result['status']: + raise RuntimeError(result['status']['errorResult']) + print('Job complete.') + return + + time.sleep(1) +# [END poll_job] + + # [START run] -def run(cloud_storage_path, - projectId, datasetId, tableId, - num_retries, interval, export_format="CSV"): - - bigquery = get_service() - resource = export_table(bigquery, cloud_storage_path, - projectId, datasetId, tableId, - num_retries=num_retries, - export_format=export_format) - poll_job(bigquery, - resource['jobReference']['projectId'], - resource['jobReference']['jobId'], - interval, - num_retries) +def main(cloud_storage_path, project_id, dataset_id, table_id, + num_retries, interval, export_format="CSV"): + # [START build_service] + # Grab the application's default credentials from the environment. + credentials = GoogleCredentials.get_application_default() + + # Construct the service object for interacting with the BigQuery API. + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) + # [END build_service] + + job = export_table( + bigquery, + cloud_storage_path, + project_id, + dataset_id, + table_id, + num_retries=num_retries, + export_format=export_format) + poll_job(bigquery, job) # [END run] # [START main] -def main(): - projectId = input("Enter the project ID: ") - datasetId = input("Enter a dataset ID: ") - tableId = input("Enter a table name to copy: ") - cloud_storage_path = input( - "Enter a Google Cloud Storage URI: ") - interval = input( - "Enter how often to poll the job (in seconds): ") - num_retries = input( - "Enter the number of retries in case of 500 error: ") - - run(cloud_storage_path, - projectId, datasetId, tableId, - num_retries, interval) - - print('Done exporting!') +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Exports data from BigQuery to Google Cloud Storage.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('dataset_id', help='BigQuery dataset to export.') + parser.add_argument('table_id', help='BigQuery table to export.') + parser.add_argument( + 'gcs_path', + help=('Google Cloud Storage path to store the exported data. For ' + 'example, gs://mybucket/mydata.csv')) + parser.add_argument( + '-p', '--poll_interval', + help='How often to poll the query for completion (seconds).', + type=int, + default=1) + parser.add_argument( + '-r', '--num_retries', + help='Number of times to retry in case of 500 error.', + type=int, + default=5) + + args = parser.parse_args() + + main( + args.gcs_path, + args.project_id, + args.dataset_id, + args.table_id, + args.num_retries, + args.poll_interval) # [END main] diff --git a/bigquery/samples/getting_started.py b/bigquery/samples/getting_started.py index 94aec36e41e..8c8780028e0 100644 --- a/bigquery/samples/getting_started.py +++ b/bigquery/samples/getting_started.py @@ -20,9 +20,10 @@ words. """ # [START all] +import argparse + from apiclient.discovery import build from apiclient.errors import HttpError - from oauth2client.client import GoogleCredentials @@ -38,10 +39,11 @@ def main(project_id): # [START run_query] query_request = bigquery_service.jobs() query_data = { - 'query': ('SELECT TOP(corpus, 10) as title, ' - 'COUNT(*) as unique_words ' - 'FROM [publicdata:samples.shakespeare];') - } + 'query': ( + 'SELECT TOP(corpus, 10) as title, ' + 'COUNT(*) as unique_words ' + 'FROM [publicdata:samples.shakespeare];') + } query_response = query_request.query( projectId=project_id, @@ -60,7 +62,11 @@ def main(project_id): if __name__ == '__main__': - # The id of the project to run queries under. - project_id = input("Enter the project ID: ") - main(project_id) + parser = argparse.ArgumentParser( + description='Queries the public BigQuery Shakespeare dataset.') + parser.add_argument('project_id', help='Your Google Cloud Project ID.') + + args = parser.parse_args() + + main(args.project_id) # [END all] diff --git a/bigquery/samples/list_datasets_projects.py b/bigquery/samples/list_datasets_projects.py index 0f97c668516..d8c22c73b79 100644 --- a/bigquery/samples/list_datasets_projects.py +++ b/bigquery/samples/list_datasets_projects.py @@ -35,7 +35,21 @@ application-default-credentials#howtheywork [2] https://cloud.google.com/sdk/ [3] https://console.developers.google.com -""" # NOQA + +For more information on the BigQuery API you can visit: + + https://developers.google.com/bigquery/docs/overview + +For more information on the BigQuery API Python library surface you +can visit: + + https://developers.google.com/resources/api-libraries/documentation/ + bigquery/v2/python/latest/ + +For information on the Python Client Library visit: + + https://developers.google.com/api-client-library/python/start/get_started +""" import argparse from pprint import pprint @@ -55,6 +69,7 @@ def list_datasets(service, project): except HTTPError as err: print('Error in list_datasets: %s' % err.content) + raise err # [END list_datasets] @@ -70,6 +85,7 @@ def list_projects(service): except HTTPError as err: print('Error in list_projects: %s' % err.content) + raise err # [END list_projects] @@ -81,19 +97,6 @@ def main(project_id): list_datasets(service, project_id) list_projects(service) - -# For more information on the BigQuery API you can visit: -# -# https://developers.google.com/bigquery/docs/overview -# -# For more information on the BigQuery API Python library surface you -# can visit: -# -# https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/ -# -# For information on the Python Client Library visit: -# -# https://developers.google.com/api-client-library/python/start/get_started if __name__ == '__main__': parser = argparse.ArgumentParser( description='Lists BigQuery datasets and projects.') diff --git a/bigquery/samples/load_data_by_post.py b/bigquery/samples/load_data_by_post.py index 26b1e2236ff..08d2bd2bc3b 100644 --- a/bigquery/samples/load_data_by_post.py +++ b/bigquery/samples/load_data_by_post.py @@ -11,16 +11,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse import json +import time -from bigquery.samples.utils import get_service, poll_job +from googleapiclient import discovery import httplib2 from oauth2client.client import GoogleCredentials -from six.moves import input # [START make_post] -def make_post(http, schema, data, projectId, datasetId, tableId): +def make_post(http, schema, data, project_id, dataset_id, table_id): """ Creates an http POST request for loading data into a bigquery table @@ -34,7 +35,7 @@ def make_post(http, schema, data, projectId, datasetId, tableId): Returns: an http.request object """ url = ('https://www.googleapis.com/upload/bigquery/v2/projects/' + - projectId + '/jobs') + project_id + '/jobs') # Create the body of the request, separated by a boundary of xxx resource = ('--xxx\n' + 'Content-Type: application/json; charset=UTF-8\n' + '\n' + @@ -45,9 +46,9 @@ def make_post(http, schema, data, projectId, datasetId, tableId): ' "fields": ' + str(schema) + '\n' + ' },\n' + ' "destinationTable": {\n' + - ' "projectId": "' + projectId + '",\n' + - ' "datasetId": "' + datasetId + '",\n' + - ' "tableId": "' + tableId + '"\n' + + ' "projectId": "' + project_id + '",\n' + + ' "datasetId": "' + dataset_id + '",\n' + + ' "tableId": "' + table_id + '"\n' + ' }\n' + ' }\n' + ' }\n' + @@ -70,39 +71,76 @@ def make_post(http, schema, data, projectId, datasetId, tableId): # [END make_post] +# [START poll_job] +def poll_job(bigquery, job): + """Waits for a job to complete.""" + + print('Waiting for job to finish...') + + request = bigquery.jobs().get( + projectId=job['jobReference']['projectId'], + jobId=job['jobReference']['jobId']) + + while True: + result = request.execute(num_retries=2) + + if result['status']['state'] == 'DONE': + if 'errorResult' in result['status']: + raise RuntimeError(result['status']['errorResult']) + print('Job complete.') + return + + time.sleep(1) +# [END poll_job] + + # [START main] -def main(): +def main(project_id, dataset_id, table_name, schema_path, data_path): credentials = GoogleCredentials.get_application_default() http = credentials.authorize(httplib2.Http()) - projectId = input('Enter the project ID: ') - datasetId = input('Enter a dataset ID: ') - tableId = input('Enter a table name to load the data to: ') - schema_path = input( - 'Enter the path to the schema file for the table: ') + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) with open(schema_path, 'r') as schema_file: schema = schema_file.read() - data_path = input('Enter the path to the data file: ') - with open(data_path, 'r') as data_file: data = data_file.read() - resp, content = make_post(http, - schema, - data, - projectId, - datasetId, - tableId) + resp, content = make_post( + http, + schema, + data, + project_id, + dataset_id, + table_name) if resp.status == 200: - job_resource = json.loads(content) - service = get_service(credentials) - poll_job(service, **job_resource['jobReference']) + job = json.loads(content) + poll_job(bigquery, job) print("Success!") else: print("Http error code: {}".format(resp.status)) # [END main] if __name__ == '__main__': - main() + parser = argparse.ArgumentParser( + description='Loads data into BigQuery.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('dataset_id', help='A BigQuery dataset ID.') + parser.add_argument( + 'table_name', help='Name of the table to load data into.') + parser.add_argument( + 'schema_file', + help='Path to a schema file describing the table schema.') + parser.add_argument( + 'data_file', + help='Path to the data file.') + + args = parser.parse_args() + + main( + args.project_id, + args.dataset_id, + args.table_name, + args.schema_file, + args.data_file) diff --git a/bigquery/samples/load_data_from_csv.py b/bigquery/samples/load_data_from_csv.py index b8000785cbb..084eecca63d 100644 --- a/bigquery/samples/load_data_from_csv.py +++ b/bigquery/samples/load_data_from_csv.py @@ -11,21 +11,23 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse import json +import time import uuid -from bigquery.samples.utils import get_service, poll_job -from six.moves import input +from googleapiclient import discovery +from oauth2client.client import GoogleCredentials # [START load_table] -def load_table(service, source_schema, source_csv, - projectId, datasetId, tableId, num_retries=5): +def load_table(bigquery, project_id, dataset_id, table_name, source_schema, + source_path, num_retries=5): """ Starts a job to load a bigquery table from CSV Args: - service: an initialized and authorized bigquery + bigquery: an initialized and authorized bigquery client google-api-client object source_schema: a valid bigquery schema, see https://cloud.google.com/bigquery/docs/reference/v2/tables @@ -40,70 +42,115 @@ def load_table(service, source_schema, source_csv, # don't accidentally duplicate query job_data = { 'jobReference': { - 'projectId': projectId, + 'projectId': project_id, 'job_id': str(uuid.uuid4()) }, 'configuration': { 'load': { - 'sourceUris': [source_csv], + 'sourceUris': [source_path], 'schema': { 'fields': source_schema }, 'destinationTable': { - 'projectId': projectId, - 'datasetId': datasetId, - 'tableId': tableId + 'projectId': project_id, + 'datasetId': dataset_id, + 'tableId': table_name } } } } - return service.jobs().insert( - projectId=projectId, + return bigquery.jobs().insert( + projectId=project_id, body=job_data).execute(num_retries=num_retries) # [END load_table] +# [START poll_job] +def poll_job(bigquery, job): + """Waits for a job to complete.""" + + print('Waiting for job to finish...') + + request = bigquery.jobs().get( + projectId=job['jobReference']['projectId'], + jobId=job['jobReference']['jobId']) + + while True: + result = request.execute(num_retries=2) + + if result['status']['state'] == 'DONE': + if 'errorResult' in result['status']: + raise RuntimeError(result['status']['errorResult']) + print('Job complete.') + return + + time.sleep(1) +# [END poll_job] + + # [START run] -def run(source_schema, source_csv, - projectId, datasetId, tableId, interval, num_retries): - service = get_service() - - job = load_table(service, source_schema, source_csv, - projectId, datasetId, tableId, num_retries) - - poll_job(service, - job['jobReference']['projectId'], - job['jobReference']['jobId'], - interval, - num_retries) +def main(project_id, dataset_id, table_name, schema_file, data_path, + poll_interval, num_retries): + # [START build_service] + # Grab the application's default credentials from the environment. + credentials = GoogleCredentials.get_application_default() + + # Construct the service object for interacting with the BigQuery API. + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) + # [END build_service] + + with open(schema_file, 'r') as f: + schema = json.load(f) + + job = load_table( + bigquery, + project_id, + dataset_id, + table_name, + schema, + data_path, + num_retries) + + poll_job(bigquery, job) # [END run] # [START main] -def main(): - projectId = input("Enter the project ID: ") - datasetId = input("Enter a dataset ID: ") - tableId = input("Enter a destination table name: ") - - schema_file_path = input( - "Enter the path to the table schema: ") - with open(schema_file_path, 'r') as schema_file: - schema = json.load(schema_file) - - data_file_path = input( - "Enter the Cloud Storage path for the CSV file: ") - num_retries = input( - "Enter number of times to retry in case of 500 error: ") - interval = input( - "Enter how often to poll the query for completion (seconds): ") - run(schema, - data_file_path, - projectId, - datasetId, - tableId, - interval, - num_retries) - - print("Job complete!") +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Loads data into BigQuery from a CSV file in Google ' + 'Cloud Storage.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('dataset_id', help='A BigQuery dataset ID.') + parser.add_argument( + 'table_name', help='Name of the table to load data into.') + parser.add_argument( + 'schema_file', + help='Path to a schema file describing the table schema.') + parser.add_argument( + 'data_path', + help='Google Cloud Storage path to the CSV data, for example: ' + 'gs://mybucket/in.csv') + parser.add_argument( + '-p', '--poll_interval', + help='How often to poll the query for completion (seconds).', + type=int, + default=1) + parser.add_argument( + '-r', '--num_retries', + help='Number of times to retry in case of 500 error.', + type=int, + default=5) + + args = parser.parse_args() + + main( + args.project_id, + args.dataset_id, + args.table_name, + args.schema_file, + args.data_path, + args.poll_interval, + args.num_retries) # [END main] diff --git a/bigquery/samples/streaming.py b/bigquery/samples/streaming.py index 958efcf5f99..06c1cfca052 100644 --- a/bigquery/samples/streaming.py +++ b/bigquery/samples/streaming.py @@ -11,20 +11,18 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse import ast import json import uuid -from bigquery.samples.utils import get_service +from googleapiclient import discovery +from oauth2client.client import GoogleCredentials from six.moves import input # [START stream_row_to_bigquery] -def stream_row_to_bigquery(service, - project_id, - dataset_id, - table_id, - row, +def stream_row_to_bigquery(bigquery, project_id, dataset_id, table_name, row, num_retries=5): # Generate a unique row id so retries # don't accidentally duplicate insert @@ -32,45 +30,63 @@ def stream_row_to_bigquery(service, 'insertId': str(uuid.uuid4()), 'rows': [{'json': row}] } - return service.tabledata().insertAll( + return bigquery.tabledata().insertAll( projectId=project_id, datasetId=dataset_id, - tableId=table_id, + tableId=table_name, body=insert_all_data).execute(num_retries=num_retries) # [END stream_row_to_bigquery] # [START run] -def run(project_id, dataset_id, table_id, rows, num_retries): - service = get_service() - for row in rows: - response = stream_row_to_bigquery(service, - project_id, - dataset_id, - table_id, - row, - num_retries) - yield json.dumps(response) -# [END run] +def main(project_id, dataset_id, table_name, num_retries): + # [START build_service] + # Grab the application's default credentials from the environment. + credentials = GoogleCredentials.get_application_default() + + # Construct the service object for interacting with the BigQuery API. + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) + # [END build_service] + + for row in get_rows(): + response = stream_row_to_bigquery( + bigquery, project_id, dataset_id, table_name, row, num_retries) + print(json.dumps(response)) -# [START main] def get_rows(): line = input("Enter a row (python dict) into the table: ") while line: yield ast.literal_eval(line) line = input("Enter another row into the table \n" + "[hit enter to stop]: ") +# [END run] + +# [START main] +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Streams data into BigQuery from the command line.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('dataset_id', help='A BigQuery dataset ID.') + parser.add_argument( + 'table_name', help='Name of the table to load data into.') + parser.add_argument( + '-p', '--poll_interval', + help='How often to poll the query for completion (seconds).', + type=int, + default=1) + parser.add_argument( + '-r', '--num_retries', + help='Number of times to retry in case of 500 error.', + type=int, + default=5) -def main(): - project_id = input("Enter the project ID: ") - dataset_id = input("Enter a dataset ID: ") - table_id = input("Enter a table ID : ") - num_retries = int(input( - "Enter number of times to retry in case of 500 error: ")) + args = parser.parse_args() - for result in run(project_id, dataset_id, table_id, - get_rows(), num_retries): - print(result) + main( + args.project_id, + args.dataset_id, + args.table_name, + args.num_retries) # [END main] diff --git a/bigquery/samples/sync_query.py b/bigquery/samples/sync_query.py index 56be93ef262..5c08e2bc6cb 100644 --- a/bigquery/samples/sync_query.py +++ b/bigquery/samples/sync_query.py @@ -11,52 +11,80 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import argparse import json -from bigquery.samples.utils import get_service, paging -from six.moves import input +from googleapiclient import discovery +from oauth2client.client import GoogleCredentials # [START sync_query] -def sync_query(service, project_id, query, timeout=10000, num_retries=5): +def sync_query(bigquery, project_id, query, timeout=10000, num_retries=5): query_data = { 'query': query, 'timeoutMs': timeout, } - return service.jobs().query( + return bigquery.jobs().query( projectId=project_id, body=query_data).execute(num_retries=num_retries) # [END sync_query] # [START run] -def run(project_id, query, timeout, num_retries): - service = get_service() - response = sync_query(service, - project_id, - query, - timeout, - num_retries) - - for page in paging(service, - service.jobs().getQueryResults, - num_retries=num_retries, - **response['jobReference']): - yield json.dumps(page['rows']) +def main(project_id, query, timeout, num_retries): + # [START build_service] + # Grab the application's default credentials from the environment. + credentials = GoogleCredentials.get_application_default() + + # Construct the service object for interacting with the BigQuery API. + bigquery = discovery.build('bigquery', 'v2', credentials=credentials) + # [END build_service] + + query_job = sync_query( + bigquery, + project_id, + query, + timeout, + num_retries) + + # Page through the result set and print all results. + page_token = None + while True: + page = bigquery.jobs().getQueryResults( + pageToken=page_token, + **query_job['jobReference']).execute(num_retries=2) + + print(json.dumps(page['rows'])) + + page_token = page.get('pageToken') + if not page_token: + break # [END run] # [START main] -def main(): - project_id = input("Enter the project ID: ") - query_string = input("Enter the Bigquery SQL Query: ") - timeout = input( - "Enter how long to wait for the query to complete in milliseconds" - "\n (if longer than 10 seconds, use an asynchronous query): ") - num_retries = int(input( - "Enter how many times to retry in case of server error")) - - for result in run(project_id, query_string, timeout, num_retries): - print(result) +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Loads data into BigQuery.') + parser.add_argument('project_id', help='Your Google Cloud project ID.') + parser.add_argument('query', help='BigQuery SQL Query.') + parser.add_argument( + '-t', '--timeout', + help='Number seconds to wait for a result', + type=int, + default=30) + parser.add_argument( + '-r', '--num_retries', + help='Number of times to retry in case of 500 error.', + type=int, + default=5) + + args = parser.parse_args() + + main( + args.project_id, + args.query, + args.timeout, + args.num_retries) # [END main] diff --git a/bigquery/samples/utils.py b/bigquery/samples/utils.py deleted file mode 100644 index 8a2f9af9557..00000000000 --- a/bigquery/samples/utils.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2015, Google, Inc. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -# [START get_service] -def get_service(): - """returns an initialized and authorized bigquery client""" - - from googleapiclient.discovery import build - from oauth2client.client import GoogleCredentials - - credentials = GoogleCredentials.get_application_default() - - # Create and return the service object for v2 of the bigquery api. You can - # browse available apis and versions here: - # https://developers.google.com/api-client-library/python/apis/ - # - # The build() method also takes care of injecting the necessary scopes - # to access the given service, when using application default credentials. - return build('bigquery', 'v2', credentials=credentials) -# [END get_service] - - -# [START poll_job] -def poll_job(service, projectId, jobId, interval=5.0, num_retries=5): - """checks the status of a job every *interval* seconds""" - - import time - - job_get = service.jobs().get(projectId=projectId, jobId=jobId) - job_resource = job_get.execute(num_retries=num_retries) - - while not job_resource['status']['state'] == 'DONE': - print('Job is {}, waiting {} seconds...' - .format(job_resource['status']['state'], interval)) - time.sleep(float(interval)) - job_resource = job_get.execute(num_retries=num_retries) - - return job_resource -# [END poll_job] - - -# [START paging] -def paging(service, request_func, num_retries=5, **kwargs): - """pages though the results of an asynchronous job""" - - has_next = True - while has_next: - response = request_func(**kwargs).execute(num_retries=num_retries) - if 'pageToken' in response: - kwargs['pageToken'] = response['pageToken'] - else: - has_next = False - yield response -# [END paging] diff --git a/bigquery/tests/test_async_query.py b/bigquery/tests/test_async_query.py index 0792cb57072..8ea0abc293c 100644 --- a/bigquery/tests/test_async_query.py +++ b/bigquery/tests/test_async_query.py @@ -12,36 +12,23 @@ # limitations under the License. # import json -import os -import unittest - -from bigquery.samples.async_query import main, run +from bigquery.samples.async_query import main import tests class TestAsyncQuery(tests.CloudBaseTest): def test_async_query(self): - for result in run(self.constants['projectId'], - self.constants['query'], - False, - 5, - 5): - self.assertIsNotNone(json.loads(result)) - - -class TestAsyncRunner(tests.CloudBaseTest): - - def test_async_query_runner(self): - test_project_id = os.environ.get(tests.PROJECT_ID_ENV) - answers = [test_project_id, self.constants['query'], 'n', - '1', '1'] - - with tests.mock_input_answers( - answers, target='bigquery.samples.async_query.input'): - main() - - -if __name__ == '__main__': - unittest.main() + with tests.capture_stdout() as stdout: + main( + self.constants['projectId'], + self.constants['query'], + False, + 5, + 5) + + value = stdout.getvalue().strip().split('\n').pop() + + self.assertIsNotNone( + json.loads(value)) diff --git a/bigquery/tests/test_export_data_to_cloud_storage.py b/bigquery/tests/test_export_data_to_cloud_storage.py index d5872d87241..97501bea2ce 100644 --- a/bigquery/tests/test_export_data_to_cloud_storage.py +++ b/bigquery/tests/test_export_data_to_cloud_storage.py @@ -13,41 +13,38 @@ # """Tests for export_table_to_gcs.""" - -import unittest - -from bigquery.samples.export_data_to_cloud_storage import run +from bigquery.samples.export_data_to_cloud_storage import main from tests import CloudBaseTest class TestExportTableToGCS(CloudBaseTest): def test_export_table_csv(self): - run(self.constants['cloudStorageInputURI'], + main( + self.constants['cloudStorageOutputURI'], self.constants['projectId'], self.constants['datasetId'], self.constants['newTableId'], 5, - 5, + 1, export_format="CSV") def test_export_table_json(self): - run(self.constants['cloudStorageInputURI'], + main( + self.constants['cloudStorageOutputURI'], self.constants['projectId'], self.constants['datasetId'], self.constants['newTableId'], 5, - 5, + 1, export_format="NEWLINE_DELIMITED_JSON") def test_export_table_avro(self): - run(self.constants['cloudStorageInputURI'], + main( + self.constants['cloudStorageOutputURI'], self.constants['projectId'], self.constants['datasetId'], self.constants['newTableId'], 5, - 5, + 1, export_format="AVRO") - -if __name__ == '__main__': - unittest.main() diff --git a/bigquery/tests/test_getting_started.py b/bigquery/tests/test_getting_started.py index ee311173a3b..dbb4ff555a5 100644 --- a/bigquery/tests/test_getting_started.py +++ b/bigquery/tests/test_getting_started.py @@ -12,10 +12,8 @@ # limitations under the License. # import re -import unittest from bigquery.samples import getting_started - import tests @@ -23,10 +21,7 @@ class TestGettingStarted(tests.CloudBaseTest): def test_main(self): with tests.capture_stdout() as mock_stdout: getting_started.main(self.constants['projectId']) + stdout = mock_stdout.getvalue() self.assertRegexpMatches(stdout, re.compile( r'Query Results:.hamlet', re.DOTALL)) - - -if __name__ == '__main__': - unittest.main() diff --git a/bigquery/tests/test_list_datasets_projects.py b/bigquery/tests/test_list_datasets_projects.py index 1cb916e3187..7dfcf39821b 100644 --- a/bigquery/tests/test_list_datasets_projects.py +++ b/bigquery/tests/test_list_datasets_projects.py @@ -12,10 +12,8 @@ # limitations under the License. # import re -import unittest from bigquery.samples import list_datasets_projects - import tests @@ -24,12 +22,10 @@ class TestListDatasetsProjects(tests.CloudBaseTest): def test_main(self): with tests.capture_stdout() as mock_stdout: list_datasets_projects.main(self.constants['projectId']) + stdout = mock_stdout.getvalue() + self.assertRegexpMatches(stdout, re.compile( r'Project list:.*bigquery#projectList.*projects', re.DOTALL)) self.assertRegexpMatches(stdout, re.compile( r'Dataset list:.*datasets.*datasetId', re.DOTALL)) - - -if __name__ == '__main__': - unittest.main() diff --git a/bigquery/tests/test_load_data_from_csv.py b/bigquery/tests/test_load_data_from_csv.py index 53c8039b02c..a800c4be3e7 100644 --- a/bigquery/tests/test_load_data_from_csv.py +++ b/bigquery/tests/test_load_data_from_csv.py @@ -12,33 +12,19 @@ # limitations under the License. # """Tests for load_data_from_csv.""" - -import json import os -import unittest -from bigquery.samples.load_data_from_csv import run +from bigquery.samples.load_data_from_csv import main from tests import CloudBaseTest class TestLoadDataFromCSV(CloudBaseTest): - - def setUp(self): - super(TestLoadDataFromCSV, self).setUp() - with open( - os.path.join(self.resource_path, 'schema.json'), - 'r') as schema_file: - self.schema = json.load(schema_file) - def test_load_table(self): - run(self.schema, - self.constants['cloudStorageInputURI'], + main( self.constants['projectId'], self.constants['datasetId'], self.constants['newTableId'], - 5, + os.path.join(self.resource_path, 'schema.json'), + self.constants['cloudStorageInputURI'], + 1, 5) - - -if __name__ == '__main__': - unittest.main() diff --git a/bigquery/tests/test_streaming.py b/bigquery/tests/test_streaming.py index 791475881ac..c29efa06e63 100644 --- a/bigquery/tests/test_streaming.py +++ b/bigquery/tests/test_streaming.py @@ -14,29 +14,28 @@ """Tests for export_table_to_gcs.""" import json import os -import unittest -from bigquery.samples.streaming import run -from tests import CloudBaseTest +from bigquery.samples import streaming +from tests import capture_stdout, CloudBaseTest class TestStreaming(CloudBaseTest): def test_stream_row_to_bigquery(self): - with open( os.path.join(self.resource_path, 'streamrows.json'), 'r') as rows_file: rows = json.load(rows_file) - for result in run(self.constants['projectId'], - self.constants['datasetId'], - self.constants['newTableId'], - rows, - 5): - self.assertIsNotNone(json.loads(result)) + streaming.get_rows = lambda: rows + with capture_stdout() as stdout: + streaming.main( + self.constants['projectId'], + self.constants['datasetId'], + self.constants['newTableId'], + 5) -if __name__ == '__main__': - unittest.main() + results = stdout.getvalue().split('\n') + self.assertIsNotNone(json.loads(results[0])) diff --git a/bigquery/tests/test_sync_query.py b/bigquery/tests/test_sync_query.py index e9f0637ac6b..f5ed3eadf42 100644 --- a/bigquery/tests/test_sync_query.py +++ b/bigquery/tests/test_sync_query.py @@ -12,22 +12,20 @@ # limitations under the License. # import json -import unittest -from bigquery.samples.sync_query import run -from tests import CloudBaseTest +from bigquery.samples.sync_query import main +from tests import capture_stdout, CloudBaseTest class TestSyncQuery(CloudBaseTest): def test_sync_query(self): - for result in run(self.constants['projectId'], - self.constants['query'], - 5000, - 5): + with capture_stdout() as stdout: + main( + self.constants['projectId'], + self.constants['query'], + 30, + 5) - self.assertIsNotNone(json.loads(result)) - - -if __name__ == '__main__': - unittest.main() + result = stdout.getvalue().split('\n')[0] + self.assertIsNotNone(json.loads(result)) diff --git a/bigquery/requirements.txt b/requirements.txt similarity index 63% rename from bigquery/requirements.txt rename to requirements.txt index 3487fe93bac..396a8981627 100644 --- a/bigquery/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ -argparse==1.2.1 -google-api-python-client==1.3.2 -httplib2==0.9 -oauth2client==1.4.6 +argparse>=1.2.1 +google-api-python-client>=1.4.2 +httplib2>=0.9.1 +oauth2client>=1.5.1 py==1.4.26 pyasn1==0.1.7 pyasn1-modules==0.0.5 diff --git a/tests/__init__.py b/tests/__init__.py index 8c68e5c9bd7..84164816c8a 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -17,7 +17,6 @@ BUCKET_NAME_ENV, capture_stdout, CloudBaseTest, - mock_input_answers, PROJECT_ID_ENV, RESOURCE_PATH) @@ -27,7 +26,6 @@ 'BUCKET_NAME_ENV', 'capture_stdout', 'CloudBaseTest', - 'mock_input_answers', 'PROJECT_ID_ENV', 'RESOURCE_PATH' ] diff --git a/tests/utils.py b/tests/utils.py index 095eb66ae7c..a6aa6e69ebd 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -22,7 +22,6 @@ import tempfile import unittest -from mock import patch from nose.plugins.skip import SkipTest from six.moves import cStringIO @@ -39,28 +38,6 @@ os.path.abspath(os.path.dirname(__file__)), 'resources') -# TODO: This can be written as a much simpler context manager. -class mock_input_answers(object): - - def __init__(self, list_, target): - self.i = 0 - self.list_ = list_ - self.target = target - - def get_next_value(self, question): - ret = self.list_[self.i] - self.i += 1 - print('Responding to {} with {}'.format(question, ret)) - return u"{}".format(ret) - - def __enter__(self): - self.patch = patch(self.target, self.get_next_value) - self.patch.__enter__() - - def __exit__(self, exc_type, exc_value, traceback): - self.patch.__exit__(exc_type, exc_value, traceback) - - class CloudBaseTest(unittest.TestCase): def setUp(self):