-
Notifications
You must be signed in to change notification settings - Fork 513
feat: add multi-path support for lance data paths #4765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add multi-path support for lance data paths #4765
Conversation
e76b7c0 to
0b1bddc
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4765 +/- ##
==========================================
+ Coverage 81.64% 81.69% +0.04%
==========================================
Files 333 333
Lines 131594 132497 +903
Branches 131594 132497 +903
==========================================
+ Hits 107444 108243 +799
- Misses 20550 20619 +69
- Partials 3600 3635 +35
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
jackye1995
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is exciting feature! Added some initial thoughts
test_multi_bucket_logging.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's turn this to proper tests in python and rust.
|
Thanks @jackye1995 for the review! I will incorporate your suggestions and try to add tests in all current flows too! |
|
posting some offline discussions here:
|
7d00d09 to
80b8d32
Compare
a4fe19e to
a0e27dd
Compare
d2c3cfe to
60224bc
Compare
60224bc to
a4946f1
Compare
python/python/lance/dataset.py
Outdated
| If both `commit_message` and `properties` are provided, `commit_message` will | ||
| override any "lance.commit.message" key in `properties`. | ||
| initial_data_paths: list of str, optional | ||
| *Experimental*. Data file base URIs for registering in the manifest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no need to mark as experimental
python/python/lance/dataset.py
Outdated
| Only used in CREATE mode for manifest registration. | ||
| Example: ["s3://storage1/data", "s3://storage2/data"] | ||
| target_data_paths: list of str, optional | ||
| *Experimental*. Target URI for writing data files (array with exactly one element). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no need to mark as experimental
rust/lance/src/dataset.rs
Outdated
| } | ||
|
|
||
| /// Get the ObjectStore for a specific path based on base_id | ||
| pub(crate) fn get_object_store_for_path(&self, base_id: Option<&u32>) -> Result<Arc<lance_io::object_store::ObjectStore>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this would be necessary for multi-cloud use case, but apart from that, would the configuration within the same cloud be different? We cannot really pass different configurations for the same cloud, so I think we should cache object stores by the path scheme, instead of by the path. This will reduce the number of object stores we cache per dataset, especially if you need to set a lot of bases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we need this for each bucket since object store has the bucket info in the constructor , but that could be changed i guess.
Else we write to the same bucket as the primary. The cache key here is the bucket uri, so there will just be one entry per bucket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key thing here I think is, do you expect to pass in different object store configs per data path? Today these will just be initialized with whatever the credentials and settings is in the environment or you pass in when opening the dataset. They won't really be different object stores from configuration perspective. If you need to customize at that level, we need to probably find a way to pass that info along the way here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for the current use case but i guess in general it could be nice to have in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. But in general, we already have a cache in ObjectStoreRegistry, we should use that instead of creating another cache here. That cache already uses base path as the cache key as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackye1995 we need the object store cache at the dataset level (like the primary object store is stored here
This is used by the reader to fetch the correct object store (using get_object_store_for_path)
Are you suggesting to store the ObjectStoreRegistry here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry let me be clear, I mean in the dataset builder, we should not just initialize these extra object stores. It is not always gonna be used, and it is not a cheap struct to initialize. We should try to initialize it on demand when we need to use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Additional path URIs for multi-path support (cached in ObjectStoreRegistry)
pub(crate) extra_path_uris: Arc<HashMap<u32, String>>,
/// Store parameters used to create object stores (needed for extra paths)
pub(crate) store_params: ObjectStoreParams,
I will need to store these in dataset so that we can create the object store on demand
protos/transaction.proto
Outdated
| // Key-value pairs to merge with existing config. | ||
| map<string, string> config_upsert_values = 4; | ||
| // Additional path URIs for data file distribution in multi-path layouts | ||
| repeated string initial_data_paths = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regardless of what we do in the Python API, at transaction level we should have a list of BasePath , not just string data paths. In BasePath we have important fields like is_dataset_root, as well as a user provided name of the path.
OK, found the context! I think there is difference with the two cases: reserving fragment IDs does not add content into manifest right? It's an updating field operation. The transactional issue would bring unused paths(might even exists). Even if we have multi-statement transaction, I think we might not garentee people use it correctly and might result in an bloated manifest file for a long lifetime dataset. Maybe we could add cleaning unrelated base_paths in the cleanup procedure for this. Or make this a table config like cleaning unrelated paths during latest n snapshots. And of course this is optional in case users want to manage it manually. This could also guard the files deletion/compaction cases. What do you think? @jackye1995 @dacort |
can you elaborate a bit more? I did not get the concern here. I think it is important for users to assign a logical name for the path for deduping purpose, so that if you try to add paths of the same name, it should fail. |
python/python/lance/dataset.py
Outdated
| and can be retrieved using read_transaction(). | ||
| If both `commit_message` and `properties` are provided, `commit_message` will | ||
| override any "lance.commit.message" key in `properties`. | ||
| initial_data_paths: list of str, optional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about this last night, I feel there is an issue between simple user experience and flexibility and correctness here:
In the most precise way, what user should define is the full base path information, something like:
write_dataset(data, dataset, mode="create", initial_bases=[
{"name": "b1", "path": "s3://a/b1", "is_dataset_root": "true"},
{"name": "b2", "path": "s3://a/b2", "is_dataset_root": "false"},
])
# target existing bases
write_dataset(data2, dataset, mode="append", target_bases=[
"b1",
"b2"
])
# target new bases
write_dataset(data2, dataset, mode="append", target_bases=[
{"name": "b3", "path": "s3://a/b3", "is_dataset_root": "true"},
])but I also understand this is quite cumbersome to write, comparing to directly write initial_data_paths and target_data_paths. One big concern I have around that is people write paths slightly differently all the time (e.g. s3://a/b1 vs s3://a/b1/). It is hard to know if they mean the same one or they actually mean different paths.
We also have a similar problem about how expressive we should be in the Overwrite transaction, as I commented above.
So here is my latest thinking: at the user interface level, instead of having a dedicated field only for create mode, we just fully separate the concept of the new bases to register and the target bases to use. Regardless of create mode or not, user always register new bases in the field new_bases, and specify what to use in target_bases. Here is the updated example:
write_dataset(data, dataset, mode="create", new_bases=[
{"name": "b1", "path": "s3://a/b1", "is_dataset_root": "true"},
{"name": "b2", "path": "s3://a/b2", "is_dataset_root": "false"},
],
target_bases=["b1", "b2"])
# target existing bases
write_dataset(data2, dataset, mode="append", target_bases=[
"b1",
"b2"
])
# target new bases
write_dataset(data2, dataset, mode="append", new_bases=[
{"name": "b3", "path": "s3://a/b3", "is_dataset_root": "true"},
], target_bases=["b3"])And at transaction level, the new_bases directly translate to the new bases we should add in the transaction model, for example in Overwrite:
// Create or overwrite the entire dataset.
message Overwrite {
// The new fragments
//
// Fragment IDs are not yet assigned.
repeated DataFragment fragments = 1;
// The new schema
repeated lance.file.Field schema = 2;
// Schema metadata.
map<string, bytes> schema_metadata = 3;
// Key-value pairs to merge with existing config.
map<string, string> config_upsert_values = 4;
// base paths for the whole dataset
repeated BasePath initial_bases = 5;
}@majin1102 @jaystarshot what do we think about this proposal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can define the api for the overwrite behavior once we have concrete use cases. For us just having a separate API is fine but there could be other use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with the proposal of putting them together in one operation.
I was wondering if it is that necessary of registering unrelated base paths in write interface. If we make adding base path idempotent I think we could just:
write_dataset(data2, dataset, mode="append", bases=[{"path": "s3://a/b3"}])
This means name is none(if we don't force there must be one) and the is_dataset_root is default.
Or we could use:
write_dataset(data2, dataset, mode="append", bases=[{"name":"b3", "path": "s3://a/b3", "is_dataset_root":"false"}])
In the runtime we check if the base path exists and make sure it is added before commiting the data.
64be91c to
7a85bc2
Compare
rust/lance/src/dataset.rs
Outdated
| ).await?; | ||
|
|
||
| &bucket_uri, | ||
| &ObjectStoreParams::default(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default params likely won’t work when storage params are specified ( gcs/oci).
8ae5bb8 to
e53dc1c
Compare
Sorry for the late reply. I was thinking that if the scenario was writing files to a none added base. This should potentially be an ACID operation. But on second thought this was reasonable to split this into two operations. One to add base, one to write data, with a little limitation to the scenario if people want to dynamically manage bases according to data. Two cases in my mental mind:
For the second scenario, I assumed we have to add the base before commit the data. Then we might encounter the case that data commit failed but base added. For this secnario we could expect a commit retry but without assurance. Then the base could be left there forever(if we use UUID then it is just a rubbish hole). I think the multi-base management could be a complex issue and somehow I think is related to the scenario of partitions(like this multi-bucket case). Or let's say it might provide a basement that partitions aims for. Nowdays when we talk about partitions we usually thought of partition spec balabala in Iceberg, but if we looked back to Hive, the partitions are just managed paths. I was thinking if we could reuse this multi-base ability in partition solution. What do you think of this? @dacort @jackye1995 To be clear I'm on the side of not designing multi-base as heavy as partitions. But I think if we considered there are common things between them we could draw lessons from APIs and consider how the upper layers will evolve towards a partition-oriented solution based on the multi-base architecture in the future |
I didn't get this. I think the path itself is identical and could be used deduping purpose? if we force people to use a name as identifier I might get something from the path which would be a little unnecessary. |
b02e14e to
527759c
Compare
jaystarshot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, looks great
df59fb0 to
44ce9bf
Compare
f58919d to
27cef48
Compare
The lance.write_dataset() function already supports writing to multiple storage buckets via the target_bases parameter in #4765 However, write_fragments() did not expose this capability, even though the underlying Rust implementation _write_fragments, _write_fragments_transaction already supported it
Fixes #4702
Supports multi path support for lance datasets.
sample api shown
How to Use It
Creating a multi-path dataset:
Appending to a different location:
Overwriting with completely new paths:
Main Changes
WIP and unclear