From 17ec02f05958e2ceb201ea63df347505f5ff287c Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 30 Oct 2019 16:24:01 -0700 Subject: [PATCH 1/6] feat(bigquery): add `create_bqstorage_client` param to `to_dataframe` and `to_arrow` When the `create_bqstorage_client` parameter is set to `True`, the BigQuery client constructs a BigQuery Storage API client for you. This removes the need for boilerplate code to manually construct both clients explitly with the same credentials. Does this make the `bqstorage_client` parameter unnecessary? In most cases, yes, but there are a few cases where we'll want to continue using it. * When partner tools use `to_dataframe`, they should continue to use `bqstorage_client` so that they can set the correct amended user-agent strings. * When a developer needs to override the default API endpoint for the BQ Storage API, they'll need to manually supply a `bqstorage_client`. --- bigquery/google/cloud/bigquery/client.py | 13 ++++ bigquery/google/cloud/bigquery/job.py | 40 +++++++++++- bigquery/google/cloud/bigquery/table.py | 61 +++++++++++++++++-- bigquery/samples/download_public_data.py | 33 ++++++++++ .../samples/download_public_data_sandbox.py | 34 +++++++++++ .../tests/test_download_public_data.py | 24 ++++++++ .../test_download_public_data_sandbox.py | 24 ++++++++ bigquery/tests/unit/test_client.py | 25 ++++++++ bigquery/tests/unit/test_table.py | 58 ++++++++++++++++++ 9 files changed, 304 insertions(+), 8 deletions(-) create mode 100644 bigquery/samples/download_public_data.py create mode 100644 bigquery/samples/download_public_data_sandbox.py create mode 100644 bigquery/samples/tests/test_download_public_data.py create mode 100644 bigquery/samples/tests/test_download_public_data_sandbox.py diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index e6eaf5fcb3ba..f3b7aab40789 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -353,6 +353,19 @@ def dataset(self, dataset_id, project=None): return DatasetReference(project, dataset_id) + def _create_bqstorage_client(self): + """Create a BigQuery Storage API client using this client's credentials. + + Returns: + google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient: + A BigQuery Storage API client. + """ + from google.cloud import bigquery_storage_v1beta1 + + return bigquery_storage_v1beta1.BigQueryStorageClient( + credentials=self._credentials + ) + def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY): """API call: create the dataset via a POST request. diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index f0312b0d4219..d4d1b88073c6 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -3152,7 +3152,12 @@ def result( # If changing the signature of this method, make sure to apply the same # changes to table.RowIterator.to_arrow() - def to_arrow(self, progress_bar_type=None, bqstorage_client=None): + def to_arrow( + self, + progress_bar_type=None, + bqstorage_client=None, + create_bqstorage_client=False, + ): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -3185,6 +3190,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): Reading from a specific partition or snapshot is not currently supported by this method. + create_bqstorage_client (bool): + **Beta Feature** Optional. If ``True``, create a BigQuery + Storage API client using the default API settings. The + BigQuery Storage API is a faster way to fetch rows from + BigQuery. See the ``bqstorage_client`` parameter for more + information. + + This argument does nothing if ``bqstorage_client`` is supplied. + + ..versionadded:: 1.22.0 Returns: pyarrow.Table @@ -3199,12 +3214,20 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): ..versionadded:: 1.17.0 """ return self.result().to_arrow( - progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client + progress_bar_type=progress_bar_type, + bqstorage_client=bqstorage_client, + create_bqstorage_client=create_bqstorage_client, ) # If changing the signature of this method, make sure to apply the same # changes to table.RowIterator.to_dataframe() - def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): + def to_dataframe( + self, + bqstorage_client=None, + dtypes=None, + progress_bar_type=None, + create_bqstorage_client=False, + ): """Return a pandas DataFrame from a QueryJob Args: @@ -3237,6 +3260,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non for details. ..versionadded:: 1.11.0 + create_bqstorage_client (bool): + **Beta Feature** Optional. If ``True``, create a BigQuery + Storage API client using the default API settings. The + BigQuery Storage API is a faster way to fetch rows from + BigQuery. See the ``bqstorage_client`` parameter for more + information. + + This argument does nothing if ``bqstorage_client`` is supplied. + + ..versionadded:: 1.22.0 Returns: A :class:`~pandas.DataFrame` populated with row data and column @@ -3250,6 +3283,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non bqstorage_client=bqstorage_client, dtypes=dtypes, progress_bar_type=progress_bar_type, + create_bqstorage_client=create_bqstorage_client, ) def __iter__(self): diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 77cb67bfd0fe..ea04bdb47e18 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1456,7 +1456,12 @@ def _to_arrow_iterable(self, bqstorage_client=None): # If changing the signature of this method, make sure to apply the same # changes to job.QueryJob.to_arrow() - def to_arrow(self, progress_bar_type=None, bqstorage_client=None): + def to_arrow( + self, + progress_bar_type=None, + bqstorage_client=None, + create_bqstorage_client=False, + ): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1489,6 +1494,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): Reading from a specific partition or snapshot is not currently supported by this method. + create_bqstorage_client (bool): + **Beta Feature** Optional. If ``True``, create a BigQuery + Storage API client using the default API settings. The + BigQuery Storage API is a faster way to fetch rows from + BigQuery. See the ``bqstorage_client`` parameter for more + information. + + This argument does nothing if ``bqstorage_client`` is supplied. + + ..versionadded:: 1.22.0 Returns: pyarrow.Table @@ -1504,6 +1519,9 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): if pyarrow is None: raise ValueError(_NO_PYARROW_ERROR) + if not bqstorage_client and create_bqstorage_client: + bqstorage_client = self.client._create_bqstorage_client() + progress_bar = self._get_progress_bar(progress_bar_type) record_batches = [] @@ -1558,14 +1576,20 @@ def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): # If changing the signature of this method, make sure to apply the same # changes to job.QueryJob.to_dataframe() - def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): + def to_dataframe( + self, + bqstorage_client=None, + dtypes=None, + progress_bar_type=None, + create_bqstorage_client=False, + ): """Create a pandas DataFrame by loading all pages of a query. Args: bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient): **Beta Feature** Optional. A BigQuery Storage API client. If supplied, use the faster BigQuery Storage API to fetch rows - from BigQuery. This API is a billable API. + from BigQuery. This method requires the ``pyarrow`` and ``google-cloud-bigquery-storage`` libraries. @@ -1602,6 +1626,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non progress bar as a graphical dialog box. ..versionadded:: 1.11.0 + create_bqstorage_client (bool): + **Beta Feature** Optional. If ``True``, create a BigQuery + Storage API client using the default API settings. The + BigQuery Storage API is a faster way to fetch rows from + BigQuery. See the ``bqstorage_client`` parameter for more + information. + + This argument does nothing if ``bqstorage_client`` is supplied. + + ..versionadded:: 1.22.0 Returns: pandas.DataFrame: @@ -1621,6 +1655,9 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non if dtypes is None: dtypes = {} + if not bqstorage_client and create_bqstorage_client: + bqstorage_client = self.client._create_bqstorage_client() + if bqstorage_client and self.max_results is not None: warnings.warn( "Cannot use bqstorage_client if max_results is set, " @@ -1667,11 +1704,18 @@ class _EmptyRowIterator(object): pages = () total_rows = 0 - def to_arrow(self, progress_bar_type=None): + def to_arrow( + self, + progress_bar_type=None, + bqstorage_client=None, + create_bqstorage_client=False, + ): """[Beta] Create an empty class:`pyarrow.Table`. Args: progress_bar_type (Optional[str]): Ignored. Added for compatibility with RowIterator. + bqstorage_client (Any): Ignored. Added for compatibility with RowIterator. + create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. Returns: pyarrow.Table: An empty :class:`pyarrow.Table`. @@ -1680,13 +1724,20 @@ def to_arrow(self, progress_bar_type=None): raise ValueError(_NO_PYARROW_ERROR) return pyarrow.Table.from_arrays(()) - def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): + def to_dataframe( + self, + bqstorage_client=None, + dtypes=None, + progress_bar_type=None, + create_bqstorage_client=False, + ): """Create an empty dataframe. Args: bqstorage_client (Any): Ignored. Added for compatibility with RowIterator. dtypes (Any): Ignored. Added for compatibility with RowIterator. progress_bar_type (Any): Ignored. Added for compatibility with RowIterator. + create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. diff --git a/bigquery/samples/download_public_data.py b/bigquery/samples/download_public_data.py new file mode 100644 index 000000000000..815d140fc6f1 --- /dev/null +++ b/bigquery/samples/download_public_data.py @@ -0,0 +1,33 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def download_public_data(client): + + # [START bigquery_pandas_public_data] + # TODO(developer): Import the client library. + # from google.cloud import bigquery + + # TODO(developer): Construct a BigQuery client object. + # client = bigquery.Client() + + # TODO(developer): Set table_id to the fully-qualified table ID in standard + # SQL format, including the project ID and dataset ID. + table_id = "bigquery-public-data.usa_names.usa_1910_current" + + # Use the BigQuery Storage API to speed-up downloads of large tables. + dataframe = client.list_rows(table_id).to_dataframe(create_bqstorage_client=True) + + print(dataframe.info()) + # [END bigquery_pandas_public_data] diff --git a/bigquery/samples/download_public_data_sandbox.py b/bigquery/samples/download_public_data_sandbox.py new file mode 100644 index 000000000000..edb1466e4bd7 --- /dev/null +++ b/bigquery/samples/download_public_data_sandbox.py @@ -0,0 +1,34 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def download_public_data_sandbox(client): + + # [START bigquery_pandas_public_data_sandbox] + # TODO(developer): Import the client library. + # from google.cloud import bigquery + + # TODO(developer): Construct a BigQuery client object. + # client = bigquery.Client() + + # `SELECT *` is an anti-pattern in BigQuery because it is cheaper and + # faster to use the BigQuery Storage API directly, but BigQuery Sandbox + # users can only use the BigQuery Storage API to download query results. + query_string = "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_current`" + + # Use the BigQuery Storage API to speed-up downloads of large tables. + dataframe = client.query(query_string).to_dataframe(create_bqstorage_client=True) + + print(dataframe.info()) + # [END bigquery_pandas_public_data_sandbox] diff --git a/bigquery/samples/tests/test_download_public_data.py b/bigquery/samples/tests/test_download_public_data.py new file mode 100644 index 000000000000..4d47d488e2eb --- /dev/null +++ b/bigquery/samples/tests/test_download_public_data.py @@ -0,0 +1,24 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .. import download_public_data + + +def test_download_public_data(capsys, client): + + download_public_data.download_public_data(client) + out, _ = capsys.readouterr() + assert "year" in out + assert "gender" in out + assert "name" in out diff --git a/bigquery/samples/tests/test_download_public_data_sandbox.py b/bigquery/samples/tests/test_download_public_data_sandbox.py new file mode 100644 index 000000000000..ca2eed96ac02 --- /dev/null +++ b/bigquery/samples/tests/test_download_public_data_sandbox.py @@ -0,0 +1,24 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .. import download_public_data_sandbox + + +def test_download_public_data_sandbox(capsys, client): + + download_public_data_sandbox.download_public_data_sandbox(client) + out, _ = capsys.readouterr() + assert "year" in out + assert "gender" in out + assert "name" in out diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index e661c86970db..c4cdb7fdfd2f 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -49,6 +49,11 @@ import google.cloud._helpers from google.cloud import bigquery_v2 from google.cloud.bigquery.dataset import DatasetReference + +try: + from google.cloud import bigquery_storage_v1beta1 +except (ImportError, AttributeError): # pragma: NO COVER + bigquery_storage_v1beta1 = None from tests.unit.helpers import make_connection @@ -535,6 +540,26 @@ def test_get_dataset(self): ) self.assertEqual(dataset.dataset_id, self.DS_ID) + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_create_bqstorage_client(self): + mock_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + mock_client_instance = object() + mock_client.return_value = mock_client_instance + creds = _make_credentials() + client = self._make_one(project=self.PROJECT, credentials=creds) + + with mock.patch( + "google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient", mock_client + ): + bqstorage_client = client._create_bqstorage_client() + + self.assertIs(bqstorage_client, mock_client_instance) + mock_client.assert_called_once_with(credentials=creds) + def test_create_dataset_minimal(self): from google.cloud.bigquery.dataset import Dataset diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 97a7b4ae745e..676e59adee22 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1882,6 +1882,35 @@ def test_to_arrow_w_bqstorage(self): total_rows = expected_num_rows * total_pages self.assertEqual(actual_tbl.num_rows, total_rows) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_arrow_w_bqstorage_creates_client(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + + mock_client = _mock_client() + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + mock_client._create_bqstorage_client.return_value = bqstorage_client + session = bigquery_storage_v1beta1.types.ReadSession() + bqstorage_client.create_read_session.return_value = session + row_iterator = mut.RowIterator( + mock_client, + None, # api_request: ignored + None, # path: ignored + [ + schema.SchemaField("colA", "STRING"), + schema.SchemaField("colC", "STRING"), + schema.SchemaField("colB", "STRING"), + ], + table=mut.TableReference.from_string("proj.dset.tbl"), + ) + row_iterator.to_arrow(create_bqstorage_client=True) + mock_client._create_bqstorage_client.assert_called_once() + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" @@ -2292,6 +2321,35 @@ def test_to_dataframe_max_results_w_bqstorage_warning(self): ] self.assertEqual(len(matches), 1, msg="User warning was not emitted.") + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_dataframe_w_bqstorage_creates_client(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + + mock_client = _mock_client() + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + mock_client._create_bqstorage_client.return_value = bqstorage_client + session = bigquery_storage_v1beta1.types.ReadSession() + bqstorage_client.create_read_session.return_value = session + row_iterator = mut.RowIterator( + mock_client, + None, # api_request: ignored + None, # path: ignored + [ + schema.SchemaField("colA", "STRING"), + schema.SchemaField("colC", "STRING"), + schema.SchemaField("colB", "STRING"), + ], + table=mut.TableReference.from_string("proj.dset.tbl"), + ) + row_iterator.to_dataframe(create_bqstorage_client=True) + mock_client._create_bqstorage_client.assert_called_once() + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" From 1303d57fddca3011bf8dc6bee554ff8adf70a4e1 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 4 Nov 2019 14:09:14 -0800 Subject: [PATCH 2/6] test for BQ Storage API usage in samples tests. --- .../samples/tests/test_download_public_data.py | 12 +++++++++++- .../tests/test_download_public_data_sandbox.py | 14 ++++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/bigquery/samples/tests/test_download_public_data.py b/bigquery/samples/tests/test_download_public_data.py index 4d47d488e2eb..8ee0e6a68c17 100644 --- a/bigquery/samples/tests/test_download_public_data.py +++ b/bigquery/samples/tests/test_download_public_data.py @@ -12,13 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from .. import download_public_data -def test_download_public_data(capsys, client): +def test_download_public_data(caplog, capsys, client): + # Enable debug-level logging to verify the BigQuery Storage API is used. + caplog.set_level(logging.DEBUG) download_public_data.download_public_data(client) out, _ = capsys.readouterr() assert "year" in out assert "gender" in out assert "name" in out + + assert any( + "Started reading table 'bigquery-public-data.usa_names.usa_1910_current' with BQ Storage API session" + in message + for message in caplog.messages + ) diff --git a/bigquery/samples/tests/test_download_public_data_sandbox.py b/bigquery/samples/tests/test_download_public_data_sandbox.py index ca2eed96ac02..74dadc1db3fb 100644 --- a/bigquery/samples/tests/test_download_public_data_sandbox.py +++ b/bigquery/samples/tests/test_download_public_data_sandbox.py @@ -12,13 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from .. import download_public_data_sandbox -def test_download_public_data_sandbox(capsys, client): +def test_download_public_data_sandbox(caplog, capsys, client): + # Enable debug-level logging to verify the BigQuery Storage API is used. + caplog.set_level(logging.DEBUG) download_public_data_sandbox.download_public_data_sandbox(client) - out, _ = capsys.readouterr() + out, err = capsys.readouterr() assert "year" in out assert "gender" in out assert "name" in out + + assert any( + # An anonymous table is used because this sample reads from query results. + ("Started reading table" in message and "BQ Storage API session" in message) + for message in caplog.messages + ) From bf00d45cb49e9e191d80f60b4783bb899c1a447f Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 16 Dec 2019 12:37:58 -0800 Subject: [PATCH 3/6] fix: close bqstorage client if created by to_dataframe/to_arrow --- bigquery/google/cloud/bigquery/table.py | 77 +++++++++++++++---------- bigquery/tests/unit/test_table.py | 61 ++++++++++++++++++++ 2 files changed, 106 insertions(+), 32 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index ea04bdb47e18..929df3817010 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1519,25 +1519,31 @@ def to_arrow( if pyarrow is None: raise ValueError(_NO_PYARROW_ERROR) + owns_bqstorage_client = False if not bqstorage_client and create_bqstorage_client: + owns_bqstorage_client = True bqstorage_client = self.client._create_bqstorage_client() - progress_bar = self._get_progress_bar(progress_bar_type) + try: + progress_bar = self._get_progress_bar(progress_bar_type) - record_batches = [] - for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client): - record_batches.append(record_batch) + record_batches = [] + for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client): + record_batches.append(record_batch) - if progress_bar is not None: - # In some cases, the number of total rows is not populated - # until the first page of rows is fetched. Update the - # progress bar's total to keep an accurate count. - progress_bar.total = progress_bar.total or self.total_rows - progress_bar.update(record_batch.num_rows) + if progress_bar is not None: + # In some cases, the number of total rows is not populated + # until the first page of rows is fetched. Update the + # progress bar's total to keep an accurate count. + progress_bar.total = progress_bar.total or self.total_rows + progress_bar.update(record_batch.num_rows) - if progress_bar is not None: - # Indicate that the download has finished. - progress_bar.close() + if progress_bar is not None: + # Indicate that the download has finished. + progress_bar.close() + finally: + if owns_bqstorage_client: + bqstorage_client.transport.channel.close() if record_batches: return pyarrow.Table.from_batches(record_batches) @@ -1655,35 +1661,42 @@ def to_dataframe( if dtypes is None: dtypes = {} - if not bqstorage_client and create_bqstorage_client: - bqstorage_client = self.client._create_bqstorage_client() - - if bqstorage_client and self.max_results is not None: + if (bqstorage_client or create_bqstorage_client) and self.max_results is not None: warnings.warn( "Cannot use bqstorage_client if max_results is set, " "reverting to fetching data with the tabledata.list endpoint.", stacklevel=2, ) + create_bqstorage_client = False bqstorage_client = None - progress_bar = self._get_progress_bar(progress_bar_type) + owns_bqstorage_client = False + if not bqstorage_client and create_bqstorage_client: + owns_bqstorage_client = True + bqstorage_client = self.client._create_bqstorage_client() - frames = [] - for frame in self._to_dataframe_iterable( - bqstorage_client=bqstorage_client, dtypes=dtypes - ): - frames.append(frame) + try: + progress_bar = self._get_progress_bar(progress_bar_type) + + frames = [] + for frame in self._to_dataframe_iterable( + bqstorage_client=bqstorage_client, dtypes=dtypes + ): + frames.append(frame) + + if progress_bar is not None: + # In some cases, the number of total rows is not populated + # until the first page of rows is fetched. Update the + # progress bar's total to keep an accurate count. + progress_bar.total = progress_bar.total or self.total_rows + progress_bar.update(len(frame)) if progress_bar is not None: - # In some cases, the number of total rows is not populated - # until the first page of rows is fetched. Update the - # progress bar's total to keep an accurate count. - progress_bar.total = progress_bar.total or self.total_rows - progress_bar.update(len(frame)) - - if progress_bar is not None: - # Indicate that the download has finished. - progress_bar.close() + # Indicate that the download has finished. + progress_bar.close() + finally: + if owns_bqstorage_client: + bqstorage_client.transport.channel.close() # Avoid concatting an empty list. if not frames: diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 676e59adee22..1043df45f9a3 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -26,8 +26,12 @@ try: from google.cloud import bigquery_storage_v1beta1 + from google.cloud.bigquery_storage_v1beta1.gapic.transports import ( + big_query_storage_grpc_transport, + ) except ImportError: # pragma: NO COVER bigquery_storage_v1beta1 = None + big_query_storage_grpc_transport = None try: import pandas @@ -1817,6 +1821,9 @@ def test_to_arrow_w_bqstorage(self): bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) + bqstorage_client.transport = mock.create_autospec( + big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + ) streams = [ # Use two streams we want to check frames are read from each stream. {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, @@ -1882,6 +1889,9 @@ def test_to_arrow_w_bqstorage(self): total_rows = expected_num_rows * total_pages self.assertEqual(actual_tbl.num_rows, total_rows) + # Don't close the client if it was passed in. + bqstorage_client.transport.channel.close.assert_not_called() + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" @@ -1894,6 +1904,9 @@ def test_to_arrow_w_bqstorage_creates_client(self): bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) + bqstorage_client.transport = mock.create_autospec( + big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + ) mock_client._create_bqstorage_client.return_value = bqstorage_client session = bigquery_storage_v1beta1.types.ReadSession() bqstorage_client.create_read_session.return_value = session @@ -1910,6 +1923,7 @@ def test_to_arrow_w_bqstorage_creates_client(self): ) row_iterator.to_arrow(create_bqstorage_client=True) mock_client._create_bqstorage_client.assert_called_once() + bqstorage_client.transport.channel.close.assert_called_once() @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( @@ -2321,6 +2335,43 @@ def test_to_dataframe_max_results_w_bqstorage_warning(self): ] self.assertEqual(len(matches), 1, msg="User warning was not emitted.") + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_to_dataframe_max_results_w_create_bqstorage_warning(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + mock_client = _mock_client() + + row_iterator = self._make_one( + client=mock_client, + api_request=api_request, + path=path, + schema=schema, + max_results=42, + ) + + with warnings.catch_warnings(record=True) as warned: + row_iterator.to_dataframe(create_bqstorage_client=True) + + matches = [ + warning + for warning in warned + if warning.category is UserWarning + and "cannot use bqstorage_client" in str(warning).lower() + and "tabledata.list" in str(warning) + ] + self.assertEqual(len(matches), 1, msg="User warning was not emitted.") + mock_client._create_bqstorage_client.assert_not_called() + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" @@ -2333,6 +2384,9 @@ def test_to_dataframe_w_bqstorage_creates_client(self): bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) + bqstorage_client.transport = mock.create_autospec( + big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + ) mock_client._create_bqstorage_client.return_value = bqstorage_client session = bigquery_storage_v1beta1.types.ReadSession() bqstorage_client.create_read_session.return_value = session @@ -2349,6 +2403,7 @@ def test_to_dataframe_w_bqstorage_creates_client(self): ) row_iterator.to_dataframe(create_bqstorage_client=True) mock_client._create_bqstorage_client.assert_called_once() + bqstorage_client.transport.channel.close.assert_called_once() @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( @@ -2485,6 +2540,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self): bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) + bqstorage_client.transport = mock.create_autospec( + big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + ) streams = [ # Use two streams we want to check frames are read from each stream. {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, @@ -2539,6 +2597,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self): total_rows = len(page_items) * total_pages self.assertEqual(len(got.index), total_rows) + # Don't close the client if it was passed in. + bqstorage_client.transport.channel.close.assert_not_called() + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" From 0ea9e31982193d0fb1eb0a2b192c88cb53c0e4a9 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 16 Dec 2019 14:51:15 -0800 Subject: [PATCH 4/6] chore: blacken --- bigquery/google/cloud/bigquery/table.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 929df3817010..7e1071d796fd 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1528,7 +1528,9 @@ def to_arrow( progress_bar = self._get_progress_bar(progress_bar_type) record_batches = [] - for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client): + for record_batch in self._to_arrow_iterable( + bqstorage_client=bqstorage_client + ): record_batches.append(record_batch) if progress_bar is not None: @@ -1661,7 +1663,9 @@ def to_dataframe( if dtypes is None: dtypes = {} - if (bqstorage_client or create_bqstorage_client) and self.max_results is not None: + if ( + bqstorage_client or create_bqstorage_client + ) and self.max_results is not None: warnings.warn( "Cannot use bqstorage_client if max_results is set, " "reverting to fetching data with the tabledata.list endpoint.", From e9e70224cd02e5a11c10b702c759390e6a282a95 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 16 Dec 2019 15:47:27 -0800 Subject: [PATCH 5/6] doc: update versionadded --- bigquery/google/cloud/bigquery/table.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 7e1071d796fd..a71acf8ecc8a 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1503,7 +1503,7 @@ def to_arrow( This argument does nothing if ``bqstorage_client`` is supplied. - ..versionadded:: 1.22.0 + ..versionadded:: 1.24.0 Returns: pyarrow.Table @@ -1643,7 +1643,7 @@ def to_dataframe( This argument does nothing if ``bqstorage_client`` is supplied. - ..versionadded:: 1.22.0 + ..versionadded:: 1.24.0 Returns: pandas.DataFrame: From fb58ecf0394056037c1ae4dad23afff8221d098e Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 16 Dec 2019 15:50:00 -0800 Subject: [PATCH 6/6] doc: update versionadded --- bigquery/google/cloud/bigquery/job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index d4d1b88073c6..19e4aaf185e4 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -3199,7 +3199,7 @@ def to_arrow( This argument does nothing if ``bqstorage_client`` is supplied. - ..versionadded:: 1.22.0 + ..versionadded:: 1.24.0 Returns: pyarrow.Table @@ -3269,7 +3269,7 @@ def to_dataframe( This argument does nothing if ``bqstorage_client`` is supplied. - ..versionadded:: 1.22.0 + ..versionadded:: 1.24.0 Returns: A :class:`~pandas.DataFrame` populated with row data and column