-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3225] [RFC-45] for async metadata indexing #4640
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,376 @@ | ||
| <!-- | ||
| Licensed to the Apache Software Foundation (ASF) under one or more | ||
| contributor license agreements. See the NOTICE file distributed with | ||
| this work for additional information regarding copyright ownership. | ||
| The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| (the "License"); you may not use this file except in compliance with | ||
| the License. You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| --> | ||
|
|
||
| # RFC-45: Asynchronous Metadata Indexing | ||
|
|
||
| ## Proposers | ||
|
|
||
| - @codope | ||
| - @manojpec | ||
|
|
||
| ## Approvers | ||
|
|
||
| - @nsivabalan | ||
| - @vinothchandar | ||
|
|
||
| ## Status | ||
|
|
||
| JIRA: [HUDI-2488](https://issues.apache.org/jira/browse/HUDI-2488) | ||
|
|
||
| ## Abstract | ||
|
|
||
| Metadata indexing (aka metadata bootstrapping) is the process of creation of one | ||
| or more metadata-based indexes, e.g. data partitions to files index, that is | ||
| stored in Hudi metadata table. Currently, the metadata table (referred as MDT | ||
| hereafter) supports single partition which is created synchronously with the | ||
| corresponding data table, i.e. commits are first applied to metadata table | ||
| followed by data table. Our goal for MDT is to support multiple partitions to | ||
| boost the performance of existing index and records lookup. However, the | ||
| synchronous manner of metadata indexing is not very scalable as we add more | ||
| partitions to the MDT because the regular writers (writing to the data table) | ||
| have to wait until the MDT commit completes. In this RFC, we propose a design to | ||
| support asynchronous metadata indexing. | ||
|
|
||
| ## Background | ||
|
|
||
| We can read more about the MDT design | ||
| in [RFC-15](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+15%3A+HUDI+File+Listing+Improvements) | ||
| . Here is a quick summary of the current state (Hudi v0.10.1). MDT is an | ||
| internal Merge-on-Read (MOR) table that has a single partition called `files` | ||
| which stores the data partitions to files index that is used in file listing. | ||
| MDT is co-located with the data table (inside `.hoodie/metadata` directory under | ||
| the basepath). In order to handle multi-writer scenario, users configure lock | ||
| provider and only one writer can access MDT in read-write mode. Hence, any write | ||
| to MDT is guarded by the data table lock. This ensures only one write is | ||
| committed to MDT at any point in time and thus guarantees serializability. | ||
| However, locking overhead adversely affects the write throughput and will reach | ||
| its scalability limits as we add more partitions to the MDT. | ||
|
|
||
| ## Goals | ||
|
|
||
| - Support indexing one or more partitions in MDT while regular writers and table | ||
| services (such as cleaning or compaction) are in progress. | ||
| - Locking to be as lightweight as possible. | ||
| - Keep required config changes to a minimum to simplify deployment / upgrade in | ||
| production. | ||
| - Do not require specific ordering of how writers and table service pipelines | ||
| need to be upgraded / restarted. | ||
| - If an external long-running process is being used to initialize the index, the | ||
| process should be made idempotent so it can handle errors from previous runs. | ||
| - To re-initialize the index, make it as simple as running the external | ||
| initialization process again without having to change configs. | ||
|
|
||
| ## Implementation | ||
|
|
||
| ### High Level Design | ||
|
|
||
| #### A new Hudi action: INDEXING | ||
|
|
||
| We introduce a new action `index` which will denote the index building process, | ||
| the mechanics of which is as follows: | ||
|
|
||
| 1. From an external process, users can issue a CREATE INDEX or run a job to | ||
| trigger indexing for an existing table. | ||
| 1. This will schedule INDEXING action and add | ||
| a `<instant_time>.index.requested` to the timeline, which contains the | ||
| indexing plan. Index scheduling will also initialize the filegroup for | ||
| the partitions for which indexing is planned. The creation of filegroups | ||
| will be done within a lock. | ||
| 2. From here on, the index building process will continue to build an index | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be reflected by choosing the index timestamp as t? Table service operations on the metadata table usually take in the timestamp of the last op with a suffix - 001 for compaction, 002 for clean etc. So it may be good to have this as t001. index.requested.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes we can do that and can avoid little serde cost. It can also ease debugging. However, i should point out that index action will be written on the data timeline as it will be known to the user. |
||
| up to instant time `t`, where `t` is the latest completed instant time on | ||
| the timeline without any | ||
| "holes" i.e. no pending async operations prior to it. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not necessarily async. it could be regular writes too. in case of multi-writers, there could be a failed commit waiting to be rolled back. |
||
| 3. The indexing process will write these out as base files within the | ||
| corresponding metadata partition. A metadata partition cannot be used if | ||
| there is any pending indexing action against it. As and when indexing is | ||
| completed for a partition, then table config (`hoodie.properties`) will | ||
| be updated to indicate that partition is available for reads or | ||
| synchronous updates. Hudi table config will be the source of truth for | ||
| the current state of metadata index. | ||
|
|
||
| 2. Any inflight writers (i.e. with instant time `t'` > `t`) will check for any | ||
| new indexing request on the timeline prior to preparing to commit. | ||
| 1. Such writers will proceed to additionally add log entries corresponding | ||
| to each such indexing request into the metadata partition. | ||
| 2. There is always a TOCTOU issue here, where the inflight writer may not | ||
| see an indexing request that was just added and proceed to commit without | ||
| that. We will correct this during indexing action completion. In the | ||
| average case, this may not happen and the design has liveness. | ||
|
|
||
| 3. When the indexing process is about to complete (i.e. indexing upto | ||
| instant `t` is done but before completing indexing commit), it will check for | ||
| all completed commit instants after `t` to ensure each of them added entries | ||
| per its indexing plan, otherwise simply abort after a configurable timeout. | ||
| Let's call this the **indexing catchup**. So, the indexer will not only write | ||
| base files but also ensure that log entries due to instants after `t` are in | ||
| the same filegroup i.e. no new filegroup is initialized by writers while | ||
| indexing is in progress. | ||
| 1. The corner case here would be that the indexing catchup does not factor | ||
| in the inflight writer just about to commit. But given indexing would | ||
| take some finite amount of time to go from requested to completion (or we | ||
| can add some, configurable artificial delays here say 60 seconds), an | ||
| inflight writer, that is just about to commit concurrently, has a very | ||
| high chance of seeing the indexing plan and aborting itself. | ||
|
|
||
| We can just introduce a lock for adding events to the timeline and these races | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in a single write mode, users may not have configured any lock service and we don't enforce one as of today. something to keep in mind.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have to clearly document these, along with other operations that cannot be performed without lock provider configured. As safety, should the indexer always error out if there there is no lock provider configured?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should error out. I tried to think of a way without taking any lock but we need this minimal locking. We should call it out in documentation.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for requiring locking. Having wrong or missing data from the MDT is very difficult to debug in the long run and can have serious data quality issues. Also, anyone having enough scale to be requiring asyc indexing should be able to choose one of the many locking options available. |
||
| would vanish completely, still providing great scalability and asynchrony for | ||
| these processes. The indexer will error out if there is no lock provider | ||
| configured. | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see details on when exactly regular writers will start to make synchronous updates. Also, when exactly the callers can start using the new index that got built out? whats the source of truth. we can rely on timeline completed instant for the index, but after archival? also loading timeline everytime might be costly as well.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once the indexing action completed, any MDT partition that is currently not being indexed, are considered ready for use
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added some more details. Table config will be the source of truth. |
||
| #### Multi-writer scenario | ||
|
|
||
|  | ||
|
|
||
| Let us walkthrough a concrete mutli-writer scenario to understand the above | ||
| indexing mechanism. In this scenario, let instant `t0` be the last completed | ||
| instant on the timeline. Suppose user triggered index building from an external | ||
| process at `t3`. This will create `t3.index.requested` file with the indexing | ||
| plan. The plan contains the metadata partitions that need to be created and the | ||
| last completed instant, e.g. | ||
|
|
||
| ``` | ||
| [ | ||
| {MetadataPartitionType.FILES.partitionPath(), t0}, | ||
| {MetadataPartitionType.BLOOM_FILTER.partitionPath(), t0}, | ||
| {MetadataPartitionType.COLUMN_STATS.partitionPath(), t0} | ||
| ] | ||
| ``` | ||
|
|
||
| Further, suppose there were two inflight writers Writer1 and Writer2 (with | ||
| inflight instants `t1` and `t2` respectively) while the indexing was requested | ||
| or inflight. In this case, the writers will check for pending index action and | ||
| find a pending instant `t3`. Now, if the metadata index creation is pending, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the attached image, I see locks. Would be good to cover whats the critical section for which we acquire lock here for entire design in general.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, i'll update. basically, we need lock when:
|
||
| which means indexer has already intialized a filegroup, then each writer will | ||
| create log files in the same filegroup for the metadata index update. This will | ||
| happen within the existing data table lock. | ||
|
|
||
| The indexer runs in a loop until the metadata for data upto `t0` plus the data | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would like to understand the loop here. I thought we will just go for one round and then timeout. will sync up f2f.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're right..i'll word it better..what i meant is run until timeout. |
||
| written due to `t1` and `t2` has been indexed, or the indexing timed out. | ||
| Whether indexing timed out or not, table config would be updated with any MDT | ||
| partition(s) for which indexing was complete till `t2`. In case of timeout | ||
| indexer will abort. At this point, user can trigger the index process again, | ||
| however, this time indexer will check for available partitions in table config | ||
| and skip those partitions. This design ensures that the regular writers do not | ||
| fail due to indexing. | ||
|
|
||
| ### Low Level Design | ||
|
|
||
| #### Schedule Indexing | ||
|
|
||
| The scheduling initializes the file groups for metadata partitions in a lock. It | ||
| does not update any table config. | ||
|
|
||
| ``` | ||
| 1 Run pre-scheduling validation (valid index requested, lock provider configured, idempotent checks) | ||
| 2 Begin transaction | ||
| 2.a Get the base instant | ||
| 2.b Start initializing file groups for each partition | ||
| 2.c Create index plan and save indexing.requested instant to the timeline | ||
| 3 End transaction | ||
| ``` | ||
|
|
||
| If there is failure in any of the above steps, then we abort gracefully i.e. | ||
| delete the metadata partition if it was initialized. | ||
|
|
||
| #### Run Indexing | ||
|
|
||
| This is a separate executor, which reads the plan and builds the index. | ||
|
|
||
| ``` | ||
| 1 Run pre-indexing checks (lock provider configured, indexing.requested exists, idempotent checks) | ||
| 2 Read the indexing plan and if any of the requested partition is inflight or already completed then error out and return early | ||
| 3 Transition indexing.requested to inflight | ||
| 4 Build metadata partitions | ||
| 4.a Build the base file in the metadata partition to index upto instant as per the plan | ||
| 4.b Update inflight partitions config in hoodie.properties | ||
| 5 Determine the catchup start instant based on write and non-write timeline | ||
| 6 Start indexing catchup in a separate thread (that can be interrupted upon timeout) | ||
| 6.a For each instant to catchup | ||
| 6.a.i if instant is completed and has corresponding deltacommit in metadata timeline then continue | ||
| 6.a.ii if instant is inflight, then reload active timeline periodically until completed or timed out | ||
| 6.a.iii update metadata table, if needed, within a lock | ||
| 7 Build indexing commit metadata with the partition info and caught upto instant | ||
| 8 Begin transaction | ||
| 8.a update completed metadata partitions in table config | ||
| 8.b save indexing commit metadata to the timeline transition indexing.inflight to completed. | ||
| 9 End transaction | ||
| ``` | ||
|
|
||
| If there is failure in any of the above steps, then we abort gracefully i.e. | ||
| delete the metadata partition if it exists and revert the table config updates. | ||
|
|
||
| #### Configs | ||
|
|
||
| ``` | ||
| # enable metadata | ||
| hoodie.metadata.enable=true | ||
| # enable asynchronous metadata indexing | ||
| hoodie.metadata.index.async=true | ||
| # enable column stats index | ||
| hoodie.metadata.index.column.stats.enable=true | ||
| # set indexing catchup timeout | ||
| hoodie.metadata.index.check.timeout.seconds=60 | ||
| # set OCC concurrency mode | ||
| hoodie.write.concurrency.mode=optimistic_concurrency_control | ||
| # set lock provider | ||
| hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider | ||
| ``` | ||
|
|
||
| #### Table upgrade/downgrade | ||
|
|
||
| While upgrading from a previous version to the current version, if metadata is | ||
| enabled and `files` partition exists then completed partitions in | ||
| hoodie.paroperties will be updated to `files` partition. While downgrading to a | ||
| previous version, if metadata table exists then it is deleted because metadata | ||
| table in current version has a schema that is not forward compatible. | ||
|
|
||
| ### Error Handling | ||
|
|
||
| **Case 1: Writer fails while indexer is inflight** | ||
|
|
||
| This means index update due to writer did not complete. Indexer continues to | ||
| build the index ignoring the failed instant due to writer. The next update by | ||
| the writer will trigger a rollback of the failed instant, which will also | ||
| rollback incomplete updates in metadata table. | ||
|
|
||
| **Case 2: Indexer fails while writer is inflight** | ||
|
|
||
| Writer will commit adding log entries to the metadata partition. However, table | ||
| config will indicate that partition is not ready to use. When indexer is | ||
| re-triggered, it will check the plan and table config to figure out which MDT | ||
| partitions to index and start indexing for those partitions. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the indexer starts the next time, it will choose a different instant time. Hence, the older log blocks written are no longer valid. So I think each time the indexer starts (either the first time or after a failure), it should clean out the older file groups and create new ones (with newer instant time).
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's the plan. But, it will start from scratch only for the partitions that were partially indexed i.e. partitions for which table config was not updated in the last indexing. Table config update always happens at the end of indexing for a partition. We don't want to start all over again for all the partitions. So, let's say at some If this sounds right, I can update this example in the RFC. |
||
|
|
||
| **Case 3: Race conditions** | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is another race condition possible:
In essence:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! Intialization fo filegroups happen when index is scheduled. While scheduling we can take a lock. I'll update in RFC. |
||
|
|
||
| a) Writer went inflight just after an indexing request was added but indexer has | ||
| not yet started executing. | ||
|
|
||
| In this case, writer will continue to log updates in metadata partition. At the | ||
| time of execution, indexer will see there are already some log files and ensure | ||
| that the indexing catchup passes. | ||
|
|
||
| b) Inflight writer about to commit, but indexing completed just before that. | ||
|
|
||
| Ideally, the indexing catchup in the indexer should have failed. But this could | ||
| happen in the following sequence of events: | ||
|
|
||
| 1. No pending data commit. Indexing check passed, indexing commit not | ||
| completed (table config yet to be updated). | ||
| 2. Writer went inflight knowing that MDT partition is not ready for use. | ||
| 3. Indexing commit done, table config updated. | ||
|
|
||
| In this case, the writer will continue to write log files under the latest base | ||
| filegroup in the MDT partition. Even though the indexer missed the updates due | ||
| to writer, there is no "index loss" as such i.e. metadata due to writer is still | ||
| updated in the MDT partition. Async compaction on the MDT will eventually merge | ||
| the updates into another base file. | ||
|
|
||
| Or, we can introduce a lock for adding events to the metadata timeline. | ||
|
|
||
| c) Inflight writer about to commit but index is still being scheduled | ||
|
|
||
| Consider the following scenario: | ||
|
|
||
| 1. Writer is in inflight mode. | ||
| 2. Indexer is starting and creating the file-groups. Suppose there are 100 | ||
| file-groups to be created. | ||
| 3. Writer just finished and tries to write log blocks - it only sees a subset of | ||
| file-groups created yet (as the above step 2 above has not completed yet). | ||
| This will cause writer to incorrectly write updated to lesser number of | ||
| shards. | ||
|
|
||
| In this case, we ensure that scheduling for metadata index always happens within | ||
| a lock. Since the initialization of filegroups happen at the time of scheduling, | ||
| indexer will hold the lock until all the filegroups are created. | ||
|
|
||
| **Case 4: Async table services** | ||
|
|
||
| The metadata partition cannot be used if there is any pending index action | ||
| against it. So, async compaction/cleaning/clustering will ignore the metadata | ||
| partition for which indexing is inflight. | ||
|
|
||
| **Case 5: Data timeline with holes** | ||
|
|
||
| Let's say the data timeline when indexer is started looks | ||
| like: `C1, C2,.... C5 (inflight), C6, C7, C8`, where `C1` is a commit at | ||
| instant `1`. In this case the latest completed instant without any hole is `C4`. | ||
| So, indexer will continue to index upto `C4`. Instants `C5-C8` will go through | ||
| the indexing catchup. If `C5` does not complete before the timeout, then indexer | ||
| will abort. The indexer will run through the same process again when | ||
| re-triggered. | ||
|
|
||
| The above example contained only write commits however the indexer will consider | ||
| non-write commits (such as clean/restore/rollback) as well. Let's take such an | ||
| example: | ||
|
|
||
| | DC | DC | DC | CLEAN | DC | DC | COMPACT | DC | INDEXING | DC | | ||
| | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | | ||
| | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | | ||
| | C | C | C | I | C | C | R | C | R | I | | ||
|
|
||
| Here, DC indicates a deltacommit, second row is the instant time, and the last | ||
| row is whether the action is completed (C), inflight (I) or requested(R). In | ||
| this case, the base instant upto which there are no holes in write timeline | ||
| is `DC6`. The indexer will also check the earliest pending instant in non-write | ||
| timeline before this base instant, which is `CLEAN4`. While the indexing is done | ||
| upto base instant, the remaining instants (CLEAN4, COMPACT7, DC8) are checked | ||
| during indexing catchup whether they logged updated to corresponding filegroup | ||
| as per the index plan. Note that during catchup, indexer won't move beyond | ||
| unless the instants to catch up actually get into completed state. For instance, | ||
| if the CLEAN4 was inflight till the configured timeout, then indexer will abort. | ||
|
|
||
| ## Summary of key proposals | ||
|
|
||
| - New INDEXING action on data timeline. | ||
| - Async indexer to handle state change for the new action. | ||
| - Concept of "indexing catchup" to reconcile instants that went inflight after | ||
| indexer started. | ||
| - Table config to be the source of truth for inflight and completed MDT | ||
| partitions. | ||
| - Indexer will error out if lock provider not configured. | ||
|
|
||
| ## Rollout/Adoption Plan | ||
|
|
||
| - What impact (if any) will there be on existing users? | ||
|
|
||
| There can be two kinds of existing users: | ||
|
|
||
| a) Enabling metadata for the first time: There should not be any impact on such | ||
| users. When they enable metadata, they can trigger indexing process. b) Metadata | ||
| already enabled: Such users already have metadata table with at least one | ||
| partition. If they trigger indexing process, then the indexer should take into | ||
| account the existing metadata and ignore instants upto which MDT is in sync with | ||
| the data table. | ||
|
|
||
| - If we are changing behavior how will we phase out the older behavior? | ||
|
|
||
| The changes will be backward-compatible and if the async indexing is diabled | ||
| then the existing behavior of MDT creation and updates will be used. | ||
|
|
||
| - If we need special migration tools, describe them here. | ||
|
|
||
| Not required. | ||
|
|
||
| - When will we remove the existing behavior | ||
|
|
||
| Not required | ||
|
|
||
| ## Test Plan | ||
|
|
||
| - Extensive unit tests to cover all scenarios including conflicts and | ||
| error-handling. | ||
| - Run a long-running test on EMR cluster with async indexing enabled. | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how the metadata indexing solves the multi-writer problem for MDT. Strictly speaking we just need table service scheduling on MDT by guarded by the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metadata table is unique in the respect that each write to MDT will involve multiple partitions to be updated together in a transaction. So I do not see a truly parallel commit to MDT possible.