Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions tests/system/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io
import operator

import google.api_core.retry
import pkg_resources
import pytest
import pytz
Expand All @@ -41,6 +42,10 @@
PANDAS_INT64_VERSION = pkg_resources.parse_version("1.0.0")


class MissingDataError(Exception):
pass


def test_load_table_from_dataframe_w_automatic_schema(bigquery_client, dataset_id):
"""Test that a DataFrame with dtypes that map well to BigQuery types
can be uploaded without specifying a schema.
Expand Down Expand Up @@ -666,27 +671,34 @@ def test_insert_rows_from_dataframe(bigquery_client, dataset_id):
)
for errors in chunk_errors:
assert not errors

# Use query to fetch rows instead of listing directly from the table so
# that we get values from the streaming buffer.
rows = list(
bigquery_client.query(
"SELECT * FROM `{}.{}.{}`".format(
table.project, table.dataset_id, table.table_id
)
)
)

sorted_rows = sorted(rows, key=operator.attrgetter("int_col"))
row_tuples = [r.values() for r in sorted_rows]
expected = [
# Pandas often represents NULL values as NaN. Convert to None for
# easier comparison.
tuple(None if col != col else col for col in data_row)
for data_row in dataframe.itertuples(index=False)
]

assert len(row_tuples) == len(expected)
# Use query to fetch rows instead of listing directly from the table so
# that we get values from the streaming buffer "within a few seconds".
# https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability
@google.api_core.retry.Retry(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, retries with a deadline of 2 minutes: https://googleapis.dev/python/google-api-core/latest/retry.html#google.api_core.retry.Retry

This should be well within the "few seconds" documented.

predicate=google.api_core.retry.if_exception_type(MissingDataError)
)
def get_rows():
rows = list(
bigquery_client.query(
"SELECT * FROM `{}.{}.{}`".format(
table.project, table.dataset_id, table.table_id
)
)
)
if len(rows) != len(expected):
raise MissingDataError()
return rows

rows = get_rows()
sorted_rows = sorted(rows, key=operator.attrgetter("int_col"))
row_tuples = [r.values() for r in sorted_rows]

for row, expected_row in zip(row_tuples, expected):
assert (
Expand Down