Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,12 @@
# ----------------------------------------------------------------------------

extras = ["tqdm"]
extras_by_python = {
"3.9": ["tqdm", "db-dtypes"],
}
templated_files = common.py_library(
unit_test_python_versions=["3.7", "3.8", "3.9", "3.10"],
system_test_python_versions=["3.7", "3.8", "3.9", "3.10"],
cov_level=86,
unit_test_extras=extras,
system_test_extras=extras,
system_test_extras_by_python=extras_by_python,
intersphinx_dependencies={
"pandas": "https://pandas.pydata.org/pandas-docs/stable/",
"pydata-google-auth": "https://pydata-google-auth.readthedocs.io/en/latest/",
Expand Down
44 changes: 44 additions & 0 deletions pandas_gbq/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

"""Helper methods for loading data into BigQuery"""

import decimal
import io
from typing import Any, Callable, Dict, List, Optional

import db_dtypes
import pandas
import pyarrow.lib
from google.cloud import bigquery
Expand Down Expand Up @@ -56,6 +58,47 @@ def split_dataframe(dataframe, chunksize=None):
yield remaining_rows, chunk


def cast_dataframe_for_parquet(
dataframe: pandas.DataFrame, schema: Optional[Dict[str, Any]],
) -> pandas.DataFrame:
"""Cast columns to needed dtype when writing parquet files.

See: https://github.com/googleapis/python-bigquery-pandas/issues/421
"""
columns = schema.get("fields", [])
for column in columns:
# Schema can be a superset of the columns in the dataframe, so ignore
# columns that aren't present.
column_name = column.get("name")
if column_name not in dataframe.columns:
continue

# Skip array columns.
if column.get("mode", "NULLABLE").upper() not in {"REQUIRED", "NULLABLE"}:
continue

column_type = column.get("type", "").upper()
if (
column_type == "DATE"
and dataframe[column_name].dtype != db_dtypes.DateDtype()
):
# Construct converted column manually, because I can't use
# .astype() with DateDtype. With .astype(), I get the error:
#
# TypeError: Cannot interpret '<db_dtypes.DateDtype ...>' as a data type
cast_column = pandas.Series(
dataframe[column_name], dtype=db_dtypes.DateDtype()
)
elif column_type in {"NUMERIC", "DECIMAL", "BIGNUMERIC", "BIGDECIMAL"}:
cast_column = dataframe[column_name].map(decimal.Decimal)
else:
cast_column = None

if cast_column is not None:
dataframe = dataframe.assign(**{column_name: cast_column})
return dataframe


def load_parquet(
client: bigquery.Client,
dataframe: pandas.DataFrame,
Expand All @@ -70,6 +113,7 @@ def load_parquet(
if schema is not None:
schema = pandas_gbq.schema.remove_policy_tags(schema)
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)
dataframe = cast_dataframe_for_parquet(dataframe, schema)

try:
client.load_table_from_dataframe(
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
release_status = "Development Status :: 4 - Beta"
dependencies = [
"setuptools",
"db-dtypes >=0.3.0,<2.0.0",
"numpy>=1.16.6",
"pandas>=0.23.2",
"pandas>=0.24.2",
"pyarrow >=3.0.0, <7.0dev",
"pydata-google-auth",
"google-auth",
Expand All @@ -35,7 +36,6 @@
]
extras = {
"tqdm": "tqdm>=4.23.0",
"db-dtypes": "db-dtypes >=0.3.0,<2.0.0",
}

# Setup boilerplate below this line.
Expand Down
3 changes: 2 additions & 1 deletion testing/constraints-3.7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
#
# e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev",
# Then this file should have foo==1.14.0
db-dtypes==0.3.0
google-auth==1.4.1
google-auth-oauthlib==0.0.1
google-cloud-bigquery==1.11.1
google-cloud-bigquery-storage==1.1.0
numpy==1.16.6
pandas==0.23.2
pandas==0.24.2
pyarrow==3.0.0
pydata-google-auth==0.1.2
tqdm==4.23.0
48 changes: 43 additions & 5 deletions tests/system/test_to_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

import datetime
import decimal
import functools
import random

Expand Down Expand Up @@ -118,53 +120,89 @@ def test_series_round_trip(
(
pandas.DataFrame(
{
"row_num": [0, 1, 2],
"date_col": pandas.Series(
["2021-04-17", "1999-12-31", "2038-01-19"], dtype="datetime64[ns]",
),
}
),
None,
[{"name": "date_col", "type": "DATE"}],
True,
),
# Loading a DATE column should work for string objects. See:
# https://github.com/googleapis/python-bigquery-pandas/issues/421
(
pandas.DataFrame(
{"row_num": [123], "date_col": ["2021-12-12"]},
columns=["row_num", "date_col"],
),
pandas.DataFrame(
{"row_num": [123], "date_col": [datetime.date(2021, 12, 12)]},
columns=["row_num", "date_col"],
),
[{"name": "row_num", "type": "INTEGER"}, {"name": "date_col", "type": "DATE"}],
False,
),
# Loading a NUMERIC column should work for floating point objects. See:
# https://github.com/googleapis/python-bigquery-pandas/issues/421
(
pandas.DataFrame(
{"row_num": [123], "num_col": [1.25]}, columns=["row_num", "num_col"],
),
pandas.DataFrame(
{"row_num": [123], "num_col": [decimal.Decimal("1.25")]},
columns=["row_num", "num_col"],
),
[
{"name": "row_num", "type": "INTEGER"},
{"name": "num_col", "type": "NUMERIC"},
],
False,
),
]

if db_dtypes is not None:
DATAFRAME_ROUND_TRIPS.append(
(
pandas.DataFrame(
{
"row_num": [0, 1, 2],
"date_col": pandas.Series(
["2021-04-17", "1999-12-31", "2038-01-19"], dtype="dbdate",
),
}
),
None,
[{"name": "date_col", "type": "DATE"}],
False,
)
)


@pytest.mark.parametrize(
["input_df", "table_schema", "skip_csv"], DATAFRAME_ROUND_TRIPS
["input_df", "expected_df", "table_schema", "skip_csv"], DATAFRAME_ROUND_TRIPS
)
def test_dataframe_round_trip_with_table_schema(
method_under_test,
random_dataset_id,
bigquery_client,
input_df,
expected_df,
table_schema,
api_method,
skip_csv,
):
if api_method == "load_csv" and skip_csv:
pytest.skip("Loading with CSV not supported.")
if expected_df is None:
expected_df = input_df
table_id = f"{random_dataset_id}.round_trip_w_schema_{random.randrange(1_000_000)}"
input_df["row_num"] = input_df.index
input_df.sort_values("row_num", inplace=True)
method_under_test(
input_df, table_id, table_schema=table_schema, api_method=api_method
)
round_trip = bigquery_client.list_rows(table_id).to_dataframe(
dtypes=dict(zip(input_df.columns, input_df.dtypes))
dtypes=dict(zip(expected_df.columns, expected_df.dtypes))
)
round_trip.sort_values("row_num", inplace=True)
pandas.testing.assert_frame_equal(input_df, round_trip)
pandas.testing.assert_frame_equal(expected_df, round_trip)