Skip to content

Commit dce1326

Browse files
authored
BigQuery: Autofetch table schema on load if not provided (#9108)
* Autofetch table schema on load if not provided * Avoid fetching table schema if WRITE_TRUNCATE job * Skip dataframe columns list check A similar check is already performed on the server, and server-side errors are preferred to client errors. * Raise table NotFound in auto Pandas schema tests A mock should raise this error instead of returning a table to trigger schema generation from Pandas dtypes. * Use list_columns_and_indexes() for names list
1 parent 27e7abd commit dce1326

3 files changed

Lines changed: 163 additions & 7 deletions

File tree

bigquery/google/cloud/bigquery/_pandas_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ def list_columns_and_indexes(dataframe):
210210
"""Return all index and column names with dtypes.
211211
212212
Returns:
213-
Sequence[Tuple[dtype, str]]:
213+
Sequence[Tuple[str, dtype]]:
214214
Returns a sorted list of indexes and column names with
215215
corresponding dtypes. If an index is missing a name or has the
216216
same name as a column, the index is omitted.

bigquery/google/cloud/bigquery/client.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1547,6 +1547,27 @@ def load_table_from_dataframe(
15471547
if location is None:
15481548
location = self.location
15491549

1550+
# If table schema is not provided, we try to fetch the existing table
1551+
# schema, and check if dataframe schema is compatible with it - except
1552+
# for WRITE_TRUNCATE jobs, the existing schema does not matter then.
1553+
if (
1554+
not job_config.schema
1555+
and job_config.write_disposition != job.WriteDisposition.WRITE_TRUNCATE
1556+
):
1557+
try:
1558+
table = self.get_table(destination)
1559+
except google.api_core.exceptions.NotFound:
1560+
table = None
1561+
else:
1562+
columns_and_indexes = frozenset(
1563+
name
1564+
for name, _ in _pandas_helpers.list_columns_and_indexes(dataframe)
1565+
)
1566+
# schema fields not present in the dataframe are not needed
1567+
job_config.schema = [
1568+
field for field in table.schema if field.name in columns_and_indexes
1569+
]
1570+
15501571
job_config.schema = _pandas_helpers.dataframe_to_bq_schema(
15511572
dataframe, job_config.schema
15521573
)

bigquery/tests/unit/test_client.py

Lines changed: 141 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import gzip
2121
import io
2222
import json
23+
import operator
2324
import unittest
2425
import warnings
2526

@@ -5279,15 +5280,23 @@ def test_load_table_from_file_bad_mode(self):
52795280
def test_load_table_from_dataframe(self):
52805281
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
52815282
from google.cloud.bigquery import job
5283+
from google.cloud.bigquery.schema import SchemaField
52825284

52835285
client = self._make_client()
52845286
records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}]
52855287
dataframe = pandas.DataFrame(records)
52865288

5289+
get_table_patch = mock.patch(
5290+
"google.cloud.bigquery.client.Client.get_table",
5291+
autospec=True,
5292+
return_value=mock.Mock(
5293+
schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")]
5294+
),
5295+
)
52875296
load_patch = mock.patch(
52885297
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
52895298
)
5290-
with load_patch as load_table_from_file:
5299+
with load_patch as load_table_from_file, get_table_patch:
52915300
client.load_table_from_dataframe(dataframe, self.TABLE_REF)
52925301

52935302
load_table_from_file.assert_called_once_with(
@@ -5314,15 +5323,23 @@ def test_load_table_from_dataframe(self):
53145323
def test_load_table_from_dataframe_w_client_location(self):
53155324
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
53165325
from google.cloud.bigquery import job
5326+
from google.cloud.bigquery.schema import SchemaField
53175327

53185328
client = self._make_client(location=self.LOCATION)
53195329
records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}]
53205330
dataframe = pandas.DataFrame(records)
53215331

5332+
get_table_patch = mock.patch(
5333+
"google.cloud.bigquery.client.Client.get_table",
5334+
autospec=True,
5335+
return_value=mock.Mock(
5336+
schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")]
5337+
),
5338+
)
53225339
load_patch = mock.patch(
53235340
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
53245341
)
5325-
with load_patch as load_table_from_file:
5342+
with load_patch as load_table_from_file, get_table_patch:
53265343
client.load_table_from_dataframe(dataframe, self.TABLE_REF)
53275344

53285345
load_table_from_file.assert_called_once_with(
@@ -5349,20 +5366,33 @@ def test_load_table_from_dataframe_w_client_location(self):
53495366
def test_load_table_from_dataframe_w_custom_job_config(self):
53505367
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
53515368
from google.cloud.bigquery import job
5369+
from google.cloud.bigquery.schema import SchemaField
53525370

53535371
client = self._make_client()
53545372
records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}]
53555373
dataframe = pandas.DataFrame(records)
5356-
job_config = job.LoadJobConfig()
5374+
job_config = job.LoadJobConfig(
5375+
write_disposition=job.WriteDisposition.WRITE_TRUNCATE
5376+
)
53575377

5378+
get_table_patch = mock.patch(
5379+
"google.cloud.bigquery.client.Client.get_table",
5380+
autospec=True,
5381+
return_value=mock.Mock(
5382+
schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")]
5383+
),
5384+
)
53585385
load_patch = mock.patch(
53595386
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
53605387
)
5361-
with load_patch as load_table_from_file:
5388+
with load_patch as load_table_from_file, get_table_patch as get_table:
53625389
client.load_table_from_dataframe(
53635390
dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION
53645391
)
53655392

5393+
# no need to fetch and inspect table schema for WRITE_TRUNCATE jobs
5394+
assert not get_table.called
5395+
53665396
load_table_from_file.assert_called_once_with(
53675397
client,
53685398
mock.ANY,
@@ -5378,6 +5408,7 @@ def test_load_table_from_dataframe_w_custom_job_config(self):
53785408

53795409
sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
53805410
assert sent_config.source_format == job.SourceFormat.PARQUET
5411+
assert sent_config.write_disposition == job.WriteDisposition.WRITE_TRUNCATE
53815412

53825413
@unittest.skipIf(pandas is None, "Requires `pandas`")
53835414
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
@@ -5421,7 +5452,12 @@ def test_load_table_from_dataframe_w_automatic_schema(self):
54215452
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
54225453
)
54235454

5424-
with load_patch as load_table_from_file:
5455+
get_table_patch = mock.patch(
5456+
"google.cloud.bigquery.client.Client.get_table",
5457+
autospec=True,
5458+
side_effect=google.api_core.exceptions.NotFound("Table not found"),
5459+
)
5460+
with load_patch as load_table_from_file, get_table_patch:
54255461
client.load_table_from_dataframe(
54265462
dataframe, self.TABLE_REF, location=self.LOCATION
54275463
)
@@ -5449,6 +5485,100 @@ def test_load_table_from_dataframe_w_automatic_schema(self):
54495485
SchemaField("ts_col", "TIMESTAMP"),
54505486
)
54515487

5488+
@unittest.skipIf(pandas is None, "Requires `pandas`")
5489+
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
5490+
def test_load_table_from_dataframe_w_index_and_auto_schema(self):
5491+
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
5492+
from google.cloud.bigquery import job
5493+
from google.cloud.bigquery.schema import SchemaField
5494+
5495+
client = self._make_client()
5496+
df_data = collections.OrderedDict(
5497+
[("int_col", [10, 20, 30]), ("float_col", [1.0, 2.0, 3.0])]
5498+
)
5499+
dataframe = pandas.DataFrame(
5500+
df_data,
5501+
index=pandas.Index(name="unique_name", data=["one", "two", "three"]),
5502+
)
5503+
5504+
load_patch = mock.patch(
5505+
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
5506+
)
5507+
5508+
get_table_patch = mock.patch(
5509+
"google.cloud.bigquery.client.Client.get_table",
5510+
autospec=True,
5511+
return_value=mock.Mock(
5512+
schema=[
5513+
SchemaField("int_col", "INTEGER"),
5514+
SchemaField("float_col", "FLOAT"),
5515+
SchemaField("unique_name", "STRING"),
5516+
]
5517+
),
5518+
)
5519+
with load_patch as load_table_from_file, get_table_patch:
5520+
client.load_table_from_dataframe(
5521+
dataframe, self.TABLE_REF, location=self.LOCATION
5522+
)
5523+
5524+
load_table_from_file.assert_called_once_with(
5525+
client,
5526+
mock.ANY,
5527+
self.TABLE_REF,
5528+
num_retries=_DEFAULT_NUM_RETRIES,
5529+
rewind=True,
5530+
job_id=mock.ANY,
5531+
job_id_prefix=None,
5532+
location=self.LOCATION,
5533+
project=None,
5534+
job_config=mock.ANY,
5535+
)
5536+
5537+
sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
5538+
assert sent_config.source_format == job.SourceFormat.PARQUET
5539+
5540+
sent_schema = sorted(sent_config.schema, key=operator.attrgetter("name"))
5541+
expected_sent_schema = [
5542+
SchemaField("float_col", "FLOAT"),
5543+
SchemaField("int_col", "INTEGER"),
5544+
SchemaField("unique_name", "STRING"),
5545+
]
5546+
assert sent_schema == expected_sent_schema
5547+
5548+
@unittest.skipIf(pandas is None, "Requires `pandas`")
5549+
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
5550+
def test_load_table_from_dataframe_unknown_table(self):
5551+
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
5552+
5553+
client = self._make_client()
5554+
records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}]
5555+
dataframe = pandas.DataFrame(records)
5556+
5557+
get_table_patch = mock.patch(
5558+
"google.cloud.bigquery.client.Client.get_table",
5559+
autospec=True,
5560+
side_effect=google.api_core.exceptions.NotFound("Table not found"),
5561+
)
5562+
load_patch = mock.patch(
5563+
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
5564+
)
5565+
with load_patch as load_table_from_file, get_table_patch:
5566+
# there should be no error
5567+
client.load_table_from_dataframe(dataframe, self.TABLE_REF)
5568+
5569+
load_table_from_file.assert_called_once_with(
5570+
client,
5571+
mock.ANY,
5572+
self.TABLE_REF,
5573+
num_retries=_DEFAULT_NUM_RETRIES,
5574+
rewind=True,
5575+
job_id=mock.ANY,
5576+
job_id_prefix=None,
5577+
location=None,
5578+
project=None,
5579+
job_config=mock.ANY,
5580+
)
5581+
54525582
@unittest.skipIf(pandas is None, "Requires `pandas`")
54535583
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
54545584
def test_load_table_from_dataframe_struct_fields_error(self):
@@ -5741,6 +5871,11 @@ def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self):
57415871
records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}]
57425872
dataframe = pandas.DataFrame(records)
57435873

5874+
get_table_patch = mock.patch(
5875+
"google.cloud.bigquery.client.Client.get_table",
5876+
autospec=True,
5877+
side_effect=google.api_core.exceptions.NotFound("Table not found"),
5878+
)
57445879
load_patch = mock.patch(
57455880
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
57465881
)
@@ -5749,7 +5884,7 @@ def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self):
57495884
dataframe, "to_parquet", wraps=dataframe.to_parquet
57505885
)
57515886

5752-
with load_patch, pyarrow_patch, to_parquet_patch as to_parquet_spy:
5887+
with load_patch, get_table_patch, pyarrow_patch, to_parquet_patch as to_parquet_spy:
57535888
client.load_table_from_dataframe(
57545889
dataframe,
57555890
self.TABLE_REF,

0 commit comments

Comments
 (0)