Skip to content

Commit e53dc1c

Browse files
committed
push more things down to rust layer
1 parent 25f5b8a commit e53dc1c

14 files changed

Lines changed: 560 additions & 406 deletions

File tree

protos/transaction.proto

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ message Transaction {
6868
map<string, bytes> schema_metadata = 3;
6969
// Key-value pairs to merge with existing config.
7070
map<string, string> config_upsert_values = 4;
71-
// Additional base paths for data file distribution in multi-path layouts
72-
repeated BasePath initial_data_paths = 5;
71+
// Additional base paths for data file distribution in multi-path layouts.
72+
// Each BasePath must have a properly assigned ID (non-zero).
73+
repeated BasePath initial_bases = 5;
7374
}
7475

7576
// Add or replace a new secondary index.

python/python/lance/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from . import log
1212
from .blob import BlobColumn, BlobFile
1313
from .dataset import (
14-
DatasetBasePathConfig,
1514
DataStatistics,
1615
FieldStatistics,
1716
Index,
@@ -27,6 +26,7 @@
2726
)
2827
from .fragment import FragmentMetadata, LanceFragment
2928
from .lance import (
29+
DatasetBasePath,
3030
FFILanceTableProvider,
3131
ScanStatistics,
3232
bytes_read_counter,
@@ -48,7 +48,7 @@
4848
__all__ = [
4949
"BlobColumn",
5050
"BlobFile",
51-
"DatasetBasePathConfig",
51+
"DatasetBasePath",
5252
"DataStatistics",
5353
"FieldStatistics",
5454
"FragmentMetadata",

python/python/lance/dataset.py

Lines changed: 52 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
CleanupStats,
5353
Compaction,
5454
CompactionMetrics,
55+
DatasetBasePath,
5556
LanceSchema,
5657
ScanStatistics,
5758
_Dataset,
@@ -3550,39 +3551,6 @@ class AutoCleanupConfig(TypedDict):
35503551
older_than_seconds: int
35513552

35523553

3553-
@dataclass
3554-
class DatasetBasePathConfig:
3555-
"""Configuration for a dataset base path.
3556-
3557-
Used to specify additional storage locations for dataset data files.
3558-
This enables multi-bucket/multi-path dataset configurations.
3559-
3560-
Parameters
3561-
----------
3562-
name : str
3563-
A unique name identifier for this base path. Used to reference
3564-
this base in subsequent operations.
3565-
path : str
3566-
The full URI of the base path (e.g., "s3://bucket/path", "/mnt/storage/data").
3567-
This is the root location where data will be stored.
3568-
is_dataset_root : bool, default False
3569-
Whether this path is a dataset root directory (with subdirectories like
3570-
'data/', '_deletions/') or a direct file directory. Default is False,
3571-
meaning data will be written directly to the path/data directory without
3572-
additional subdirectories for deletions, etc.
3573-
3574-
Examples
3575-
--------
3576-
>>> base1 = DatasetBasePathConfig(name="primary", path="s3://bucket1/dataset")
3577-
>>> base2 = DatasetBasePathConfig(name="archive", path="s3://bucket2/dataset",
3578-
... is_dataset_root=True)
3579-
"""
3580-
3581-
name: str
3582-
path: str
3583-
is_dataset_root: bool = False
3584-
3585-
35863554
# LanceOperation is a namespace for operations that can be applied to a dataset.
35873555
class LanceOperation:
35883556
@staticmethod
@@ -5098,8 +5066,8 @@ def write_dataset(
50985066
# Dataset doesn't exist yet, which is fine for append mode
50995067
pass
51005068

5101-
# Resolve target_bases into new_bases and target_paths
5102-
new_bases, target_paths = _resolve_target_bases(
5069+
# Resolve target_bases into new_bases and target_base_ids
5070+
new_bases, target_base_ids = _resolve_target_bases(
51035071
target_bases, existing_dataset, mode
51045072
)
51055073

@@ -5116,7 +5084,7 @@ def write_dataset(
51165084
"auto_cleanup_options": auto_cleanup_options,
51175085
"transaction_properties": merged_properties,
51185086
"new_bases": new_bases,
5119-
"target_paths": target_paths,
5087+
"target_bases": target_base_ids,
51205088
}
51215089

51225090
if commit_lock:
@@ -5238,24 +5206,19 @@ def _merge_message_to_properties(
52385206
return merged_properties
52395207

52405208

5241-
def _normalize_path(path: str) -> str:
5242-
"""Normalize a path URI by removing trailing slashes."""
5243-
return path.rstrip("/")
5244-
5245-
52465209
def _resolve_target_bases(
5247-
target_bases: Optional[List[Union[str, DatasetBasePathConfig]]],
5210+
target_bases: Optional[List[Union[str, DatasetBasePath]]],
52485211
existing_dataset: Optional["LanceDataset"],
52495212
mode: str,
5250-
) -> Tuple[Optional[List[Dict[str, Any]]], Optional[List[str]]]:
5213+
) -> Tuple[Optional[List[Dict[str, Any]]], Optional[List[int]]]:
52515214
"""
5252-
Parse target_bases into (new_bases, target_paths).
5215+
Parse target_bases into (new_bases, target_base_ids).
52535216
52545217
Parameters
52555218
----------
5256-
target_bases : List[Union[str, DatasetBasePathConfig]], optional
5219+
target_bases : List[Union[str, DatasetBasePath]], optional
52575220
List of base specifications. Can be:
5258-
- DatasetBasePathConfig objects (new bases to register)
5221+
- DatasetBasePath objects (new bases to register)
52595222
- Strings (references to existing bases by name or path)
52605223
existing_dataset : LanceDataset, optional
52615224
The existing dataset (for append/overwrite modes)
@@ -5266,12 +5229,12 @@ def _resolve_target_bases(
52665229
-------
52675230
new_bases : List[Dict[str, Any]], optional
52685231
List of new base configurations to register. Each dict contains:
5232+
- id: int (assigned sequentially starting from 1)
52695233
- name: str
52705234
- path: str
52715235
- is_dataset_root: bool
5272-
IDs will be assigned later in Rust.
5273-
target_paths : List[str], optional
5274-
List of path URIs where data should be written.
5236+
target_base_ids : List[int], optional
5237+
List of base IDs where data should be written.
52755238
52765239
Raises
52775240
------
@@ -5282,115 +5245,90 @@ def _resolve_target_bases(
52825245
return None, None
52835246

52845247
new_bases = []
5285-
target_paths = []
5248+
target_base_ids = []
5249+
next_base_id = 1 # Start assigning IDs from 1
52865250

52875251
# Get existing bases from manifest if available
52885252
existing_bases_by_name = {}
52895253
existing_bases_by_path = {}
52905254
if existing_dataset is not None:
5291-
try:
5292-
base_paths_dict = existing_dataset._ds.base_paths()
5293-
for base_path_dict in base_paths_dict.values():
5294-
# Convert dict to a simple object for easier access
5295-
class BasePath:
5296-
def __init__(self, d):
5297-
self.id = d["id"]
5298-
self.name = d["name"]
5299-
self.path = d["path"]
5300-
self.is_dataset_root = d["is_dataset_root"]
5301-
5302-
base_path = BasePath(base_path_dict)
5303-
normalized_path = _normalize_path(base_path.path)
5304-
existing_bases_by_path[normalized_path] = base_path
5305-
if base_path.name:
5306-
existing_bases_by_name[base_path.name] = base_path
5307-
except Exception:
5308-
# If we can't load base paths, assume no existing bases
5309-
pass
5255+
base_paths_dict = existing_dataset._ds.base_paths()
5256+
for base_path in base_paths_dict.values():
5257+
existing_bases_by_path[base_path.path] = base_path
5258+
if base_path.name:
5259+
existing_bases_by_name[base_path.name] = base_path
53105260

5311-
# Track new bases being added in this operation (by name and path)
5261+
# Track new bases being added in this operation (by name, path, and ID)
53125262
new_bases_by_name = {}
53135263
new_bases_by_path = {}
53145264

53155265
for item in target_bases:
5316-
if isinstance(item, DatasetBasePathConfig):
5317-
# Validate that new bases can only be registered in CREATE or OVERWRITE mode
5318-
if mode == "append":
5266+
if isinstance(item, DatasetBasePath):
5267+
# Validate that new bases can only be registered in CREATE mode
5268+
if mode != "create":
53195269
raise ValueError(
5320-
"Cannot register new bases in APPEND mode. "
5321-
"New bases can only be registered in CREATE or OVERWRITE "
5322-
"mode. For APPEND mode, reference existing bases by name "
5323-
"or path as strings."
5270+
f"Cannot register new bases in {mode.upper()} mode. "
5271+
"New bases can only be registered in CREATE mode. "
5272+
f"For {mode.upper()} mode, reference existing bases by name "
5273+
"or path as strings. OVERWRITE mode always inherits existing base_paths."
53245274
)
53255275

5326-
# New base definition
5276+
# Assign ID to this base
5277+
base_id = next_base_id
5278+
next_base_id += 1
5279+
5280+
# New base definition with assigned ID
53275281
base_dict = {
5282+
"id": base_id,
53285283
"name": item.name,
53295284
"path": item.path,
53305285
"is_dataset_root": item.is_dataset_root,
53315286
}
53325287
new_bases.append(base_dict)
5333-
# Don't add to target_paths - DatasetBasePathConfig is just registering
5288+
# Don't add to target_base_ids - DatasetBasePath is just registering
53345289
# a base, not indicating where to write. Only string references
53355290
# specify write targets.
53365291

53375292
# Track for resolution within this same operation
5338-
new_bases_by_name[item.name] = item
5339-
new_bases_by_path[_normalize_path(item.path)] = item
5293+
# Store the base with its assigned ID
5294+
temp_base = type('obj', (object,), {
5295+
'id': base_id,
5296+
'name': item.name,
5297+
'path': item.path,
5298+
'is_dataset_root': item.is_dataset_root
5299+
})()
5300+
new_bases_by_name[item.name] = temp_base
5301+
new_bases_by_path[item.path] = temp_base
53405302

53415303
elif isinstance(item, str):
53425304
# String reference - resolve by name first, then by path
53435305
resolved = False
53445306

53455307
# 1. Try to match by name in newly defined bases
53465308
if item in new_bases_by_name:
5347-
target_paths.append(new_bases_by_name[item].path)
5309+
target_base_ids.append(new_bases_by_name[item].id)
53485310
resolved = True
53495311
# 2. Try to match by name in existing bases
53505312
elif item in existing_bases_by_name:
5351-
target_paths.append(existing_bases_by_name[item].path)
5313+
target_base_ids.append(existing_bases_by_name[item].id)
53525314
resolved = True
53535315
# 3. Try to match by path in newly defined bases
5354-
elif _normalize_path(item) in new_bases_by_path:
5355-
target_paths.append(item)
5316+
elif item in new_bases_by_path:
5317+
target_base_ids.append(new_bases_by_path[item].id)
53565318
resolved = True
53575319
# 4. Try to match by path in existing bases
5358-
elif _normalize_path(item) in existing_bases_by_path:
5359-
target_paths.append(item)
5320+
elif item in existing_bases_by_path:
5321+
target_base_ids.append(existing_bases_by_path[item].id)
53605322
resolved = True
53615323

53625324
if not resolved:
5363-
# Build helpful error message
5364-
available_names = list(existing_bases_by_name.keys()) + list(
5365-
new_bases_by_name.keys()
5366-
)
5367-
available_paths = list(existing_bases_by_path.keys()) + list(
5368-
new_bases_by_path.keys()
5369-
)
5370-
5371-
error_msg = f"Base '{item}' not found. "
5372-
if available_names or available_paths:
5373-
error_msg += "Available bases:\n"
5374-
for name in available_names:
5375-
base = existing_bases_by_name.get(
5376-
name
5377-
) or new_bases_by_name.get(name)
5378-
error_msg += f" - {name}: {base.path}\n"
5379-
for path in available_paths:
5380-
if path not in [
5381-
b.path for b in existing_bases_by_name.values()
5382-
] and path not in [b.path for b in new_bases_by_name.values()]:
5383-
error_msg += f" - {path}\n"
5384-
else:
5385-
error_msg += "No bases are currently registered."
5386-
5387-
raise ValueError(error_msg)
5325+
raise ValueError(f"Base '{item}' not found.")
53885326
else:
53895327
raise TypeError(
5390-
f"target_bases must contain str or DatasetBasePathConfig, got {type(item)}"
5328+
f"target_bases must contain str or DatasetBasePath, got {type(item)}"
53915329
)
53925330

5393-
return (new_bases if new_bases else None, target_paths if target_paths else None)
5331+
return (new_bases if new_bases else None, target_base_ids if target_base_ids else None)
53945332

53955333

53965334
class VectorIndexReader:

0 commit comments

Comments
 (0)