Skip to content

Commit ae35a98

Browse files
feat: Initial support for biglake iceberg tables (#2409)
1 parent b925aa2 commit ae35a98

File tree

23 files changed

+753
-262
lines changed

23 files changed

+753
-262
lines changed

bigframes/core/array_value.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
import datetime
1818
import functools
1919
import typing
20-
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple
20+
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple, Union
2121

22-
import google.cloud.bigquery
2322
import pandas
2423
import pyarrow as pa
2524

@@ -91,7 +90,7 @@ def from_range(cls, start, end, step):
9190
@classmethod
9291
def from_table(
9392
cls,
94-
table: google.cloud.bigquery.Table,
93+
table: Union[bq_data.BiglakeIcebergTable, bq_data.GbqNativeTable],
9594
session: Session,
9695
*,
9796
columns: Optional[Sequence[str]] = None,
@@ -103,8 +102,6 @@ def from_table(
103102
):
104103
if offsets_col and primary_key:
105104
raise ValueError("must set at most one of 'offests', 'primary_key'")
106-
# define data source only for needed columns, this makes row-hashing cheaper
107-
table_def = bq_data.GbqTable.from_table(table, columns=columns or ())
108105

109106
# create ordering from info
110107
ordering = None
@@ -115,7 +112,9 @@ def from_table(
115112
[ids.ColumnId(key_part) for key_part in primary_key]
116113
)
117114

118-
bf_schema = schemata.ArraySchema.from_bq_table(table, columns=columns)
115+
bf_schema = schemata.ArraySchema.from_bq_schema(
116+
table.physical_schema, columns=columns
117+
)
119118
# Scan all columns by default, we define this list as it can be pruned while preserving source_def
120119
scan_list = nodes.ScanList(
121120
tuple(
@@ -124,7 +123,7 @@ def from_table(
124123
)
125124
)
126125
source_def = bq_data.BigqueryDataSource(
127-
table=table_def,
126+
table=table,
128127
schema=bf_schema,
129128
at_time=at_time,
130129
sql_predicate=predicate,

bigframes/core/bq_data.py

Lines changed: 173 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,74 +22,214 @@
2222
import queue
2323
import threading
2424
import typing
25-
from typing import Any, Iterator, Optional, Sequence, Tuple
25+
from typing import Any, Iterator, List, Literal, Optional, Sequence, Tuple, Union
2626

2727
from google.cloud import bigquery_storage_v1
2828
import google.cloud.bigquery as bq
2929
import google.cloud.bigquery_storage_v1.types as bq_storage_types
3030
from google.protobuf import timestamp_pb2
3131
import pyarrow as pa
3232

33+
import bigframes.constants
3334
from bigframes.core import pyarrow_utils
3435
import bigframes.core.schema
3536

3637
if typing.TYPE_CHECKING:
3738
import bigframes.core.ordering as orderings
3839

3940

41+
def _resolve_standard_gcp_region(bq_region: str):
42+
"""
43+
Resolve bq regions to standardized
44+
"""
45+
if bq_region.casefold() == "US":
46+
return "us-central1"
47+
elif bq_region.casefold() == "EU":
48+
return "europe-west4"
49+
return bq_region
50+
51+
52+
def is_irc_table(table_id: str):
53+
"""
54+
Determines if a table id should be resolved through the iceberg rest catalog.
55+
"""
56+
return len(table_id.split(".")) == 4
57+
58+
59+
def is_compatible(
60+
data_region: Union[GcsRegion, BigQueryRegion], session_location: str
61+
) -> bool:
62+
# based on https://docs.cloud.google.com/bigquery/docs/locations#storage-location-considerations
63+
if isinstance(data_region, BigQueryRegion):
64+
return data_region.name == session_location
65+
else:
66+
assert isinstance(data_region, GcsRegion)
67+
# TODO(b/463675088): Multi-regions don't yet support rest catalog tables
68+
if session_location in bigframes.constants.BIGQUERY_MULTIREGIONS:
69+
return False
70+
return _resolve_standard_gcp_region(session_location) in data_region.included
71+
72+
73+
def get_default_bq_region(data_region: Union[GcsRegion, BigQueryRegion]) -> str:
74+
if isinstance(data_region, BigQueryRegion):
75+
return data_region.name
76+
elif isinstance(data_region, GcsRegion):
77+
# should maybe try to track and prefer primary replica?
78+
return data_region.included[0]
79+
80+
81+
@dataclasses.dataclass(frozen=True)
82+
class BigQueryRegion:
83+
name: str
84+
85+
86+
@dataclasses.dataclass(frozen=True)
87+
class GcsRegion:
88+
# this is the name of gcs regions, which may be names for multi-regions, so shouldn't be compared with non-gcs locations
89+
storage_regions: tuple[str, ...]
90+
# this tracks all the included standard, specific regions (eg us-east1), and should be comparable to bq regions (except non-standard US, EU, omni regions)
91+
included: tuple[str, ...]
92+
93+
94+
# what is the line between metadata and core fields? Mostly metadata fields are optional or unreliable, but its fuzzy
4095
@dataclasses.dataclass(frozen=True)
41-
class GbqTable:
96+
class TableMetadata:
97+
# this size metadata might be stale, don't use where strict correctness is needed
98+
location: Union[BigQueryRegion, GcsRegion]
99+
type: Literal["TABLE", "EXTERNAL", "VIEW", "MATERIALIZE_VIEW", "SNAPSHOT"]
100+
numBytes: Optional[int] = None
101+
numRows: Optional[int] = None
102+
created_time: Optional[datetime.datetime] = None
103+
modified_time: Optional[datetime.datetime] = None
104+
105+
106+
@dataclasses.dataclass(frozen=True)
107+
class GbqNativeTable:
42108
project_id: str = dataclasses.field()
43109
dataset_id: str = dataclasses.field()
44110
table_id: str = dataclasses.field()
45111
physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field()
46-
is_physically_stored: bool = dataclasses.field()
47-
cluster_cols: typing.Optional[Tuple[str, ...]]
112+
metadata: TableMetadata = dataclasses.field()
113+
partition_col: Optional[str] = None
114+
cluster_cols: typing.Optional[Tuple[str, ...]] = None
115+
primary_key: Optional[Tuple[str, ...]] = None
48116

49117
@staticmethod
50-
def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
118+
def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqNativeTable:
51119
# Subsetting fields with columns can reduce cost of row-hash default ordering
52120
if columns:
53121
schema = tuple(item for item in table.schema if item.name in columns)
54122
else:
55123
schema = tuple(table.schema)
56-
return GbqTable(
124+
125+
metadata = TableMetadata(
126+
numBytes=table.num_bytes,
127+
numRows=table.num_rows,
128+
location=BigQueryRegion(table.location), # type: ignore
129+
type=table.table_type or "TABLE", # type: ignore
130+
created_time=table.created,
131+
modified_time=table.modified,
132+
)
133+
partition_col = None
134+
if table.range_partitioning:
135+
partition_col = table.range_partitioning.field
136+
elif table.time_partitioning:
137+
partition_col = table.time_partitioning.field
138+
139+
return GbqNativeTable(
57140
project_id=table.project,
58141
dataset_id=table.dataset_id,
59142
table_id=table.table_id,
60143
physical_schema=schema,
61-
is_physically_stored=(table.table_type in ["TABLE", "MATERIALIZED_VIEW"]),
144+
partition_col=partition_col,
62145
cluster_cols=None
63-
if table.clustering_fields is None
146+
if (table.clustering_fields is None)
64147
else tuple(table.clustering_fields),
148+
primary_key=tuple(_get_primary_keys(table)),
149+
metadata=metadata,
65150
)
66151

67152
@staticmethod
68153
def from_ref_and_schema(
69154
table_ref: bq.TableReference,
70155
schema: Sequence[bq.SchemaField],
156+
location: str,
157+
table_type: Literal["TABLE"] = "TABLE",
71158
cluster_cols: Optional[Sequence[str]] = None,
72-
) -> GbqTable:
73-
return GbqTable(
159+
) -> GbqNativeTable:
160+
return GbqNativeTable(
74161
project_id=table_ref.project,
75162
dataset_id=table_ref.dataset_id,
76163
table_id=table_ref.table_id,
164+
metadata=TableMetadata(location=BigQueryRegion(location), type=table_type),
77165
physical_schema=tuple(schema),
78-
is_physically_stored=True,
79166
cluster_cols=tuple(cluster_cols) if cluster_cols else None,
80167
)
81168

169+
@property
170+
def is_physically_stored(self) -> bool:
171+
return self.metadata.type in ["TABLE", "MATERIALIZED_VIEW"]
172+
82173
def get_table_ref(self) -> bq.TableReference:
83174
return bq.TableReference(
84175
bq.DatasetReference(self.project_id, self.dataset_id), self.table_id
85176
)
86177

178+
def get_full_id(self, quoted: bool = False) -> str:
179+
if quoted:
180+
return f"`{self.project_id}`.`{self.dataset_id}`.`{self.table_id}`"
181+
return f"{self.project_id}.{self.dataset_id}.{self.table_id}"
182+
87183
@property
88184
@functools.cache
89185
def schema_by_id(self):
90186
return {col.name: col for col in self.physical_schema}
91187

92188

189+
@dataclasses.dataclass(frozen=True)
190+
class BiglakeIcebergTable:
191+
project_id: str = dataclasses.field()
192+
catalog_id: str = dataclasses.field()
193+
namespace_id: str = dataclasses.field()
194+
table_id: str = dataclasses.field()
195+
physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field()
196+
cluster_cols: typing.Optional[Tuple[str, ...]]
197+
metadata: TableMetadata
198+
199+
def get_full_id(self, quoted: bool = False) -> str:
200+
if quoted:
201+
return f"`{self.project_id}`.`{self.catalog_id}`.`{self.namespace_id}`.`{self.table_id}`"
202+
return (
203+
f"{self.project_id}.{self.catalog_id}.{self.namespace_id}.{self.table_id}"
204+
)
205+
206+
@property
207+
@functools.cache
208+
def schema_by_id(self):
209+
return {col.name: col for col in self.physical_schema}
210+
211+
@property
212+
def partition_col(self) -> Optional[str]:
213+
# TODO: Use iceberg partition metadata
214+
return None
215+
216+
@property
217+
def dataset_id(self) -> str:
218+
"""
219+
Not a true dataset, but serves as the dataset component of the identifer in sql queries
220+
"""
221+
return f"{self.catalog_id}.{self.namespace_id}"
222+
223+
@property
224+
def primary_key(self) -> Optional[Tuple[str, ...]]:
225+
return None
226+
227+
def get_table_ref(self) -> bq.TableReference:
228+
return bq.TableReference(
229+
bq.DatasetReference(self.project_id, self.dataset_id), self.table_id
230+
)
231+
232+
93233
@dataclasses.dataclass(frozen=True)
94234
class BigqueryDataSource:
95235
"""
@@ -104,13 +244,13 @@ def __post_init__(self):
104244
self.schema.names
105245
)
106246

107-
table: GbqTable
247+
table: Union[GbqNativeTable, BiglakeIcebergTable]
108248
schema: bigframes.core.schema.ArraySchema
109249
at_time: typing.Optional[datetime.datetime] = None
110250
# Added for backwards compatibility, not validated
111251
sql_predicate: typing.Optional[str] = None
112252
ordering: typing.Optional[orderings.RowOrdering] = None
113-
# Optimization field
253+
# Optimization field, must be correct if set, don't put maybe-stale number here
114254
n_rows: Optional[int] = None
115255

116256

@@ -188,6 +328,8 @@ def get_arrow_batches(
188328
project_id: str,
189329
sample_rate: Optional[float] = None,
190330
) -> ReadResult:
331+
assert isinstance(data.table, GbqNativeTable)
332+
191333
table_mod_options = {}
192334
read_options_dict: dict[str, Any] = {"selected_fields": list(columns)}
193335

@@ -245,3 +387,21 @@ def process_batch(pa_batch):
245387
return ReadResult(
246388
batches, session.estimated_row_count, session.estimated_total_bytes_scanned
247389
)
390+
391+
392+
def _get_primary_keys(
393+
table: bq.Table,
394+
) -> List[str]:
395+
"""Get primary keys from table if they are set."""
396+
397+
primary_keys: List[str] = []
398+
if (
399+
(table_constraints := getattr(table, "table_constraints", None)) is not None
400+
and (primary_key := table_constraints.primary_key) is not None
401+
# This will be False for either None or empty list.
402+
# We want primary_keys = None if no primary keys are set.
403+
and (columns := primary_key.columns)
404+
):
405+
primary_keys = columns if columns is not None else []
406+
407+
return primary_keys

bigframes/core/compile/ibis_compiler/ibis_compiler.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,7 @@ def _table_to_ibis(
215215
source: bq_data.BigqueryDataSource,
216216
scan_cols: typing.Sequence[str],
217217
) -> ibis_types.Table:
218-
full_table_name = (
219-
f"{source.table.project_id}.{source.table.dataset_id}.{source.table.table_id}"
220-
)
218+
full_table_name = source.table.get_full_id(quoted=False)
221219
# Physical schema might include unused columns, unsupported datatypes like JSON
222220
physical_schema = ibis_bigquery.BigQuerySchema.to_ibis(
223221
list(source.table.physical_schema)

bigframes/core/nodes.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -825,9 +825,7 @@ def variables_introduced(self) -> int:
825825

826826
@property
827827
def row_count(self) -> typing.Optional[int]:
828-
if self.source.sql_predicate is None and self.source.table.is_physically_stored:
829-
return self.source.n_rows
830-
return None
828+
return self.source.n_rows
831829

832830
@property
833831
def node_defined_ids(self) -> Tuple[identifiers.ColumnId, ...]:

bigframes/core/schema.py

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from dataclasses import dataclass
1818
import functools
1919
import typing
20-
from typing import Dict, List, Optional, Sequence
20+
from typing import Dict, Optional, Sequence
2121

2222
import google.cloud.bigquery
2323
import pyarrow
@@ -40,31 +40,16 @@ class ArraySchema:
4040
def __iter__(self):
4141
yield from self.items
4242

43-
@classmethod
44-
def from_bq_table(
45-
cls,
46-
table: google.cloud.bigquery.Table,
47-
column_type_overrides: Optional[
48-
typing.Dict[str, bigframes.dtypes.Dtype]
49-
] = None,
50-
columns: Optional[Sequence[str]] = None,
51-
):
52-
if not columns:
53-
fields = table.schema
54-
else:
55-
lookup = {field.name: field for field in table.schema}
56-
fields = [lookup[col] for col in columns]
57-
58-
return ArraySchema.from_bq_schema(
59-
fields, column_type_overrides=column_type_overrides
60-
)
61-
6243
@classmethod
6344
def from_bq_schema(
6445
cls,
65-
schema: List[google.cloud.bigquery.SchemaField],
46+
schema: Sequence[google.cloud.bigquery.SchemaField],
6647
column_type_overrides: Optional[Dict[str, bigframes.dtypes.Dtype]] = None,
48+
columns: Optional[Sequence[str]] = None,
6749
):
50+
if columns:
51+
lookup = {field.name: field for field in schema}
52+
schema = [lookup[col] for col in columns]
6853
if column_type_overrides is None:
6954
column_type_overrides = {}
7055
items = tuple(

0 commit comments

Comments
 (0)