Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

Tips

What is the purpose of the pull request

This PR rebases Data Skipping flow from relying on bespoke Column Stats Index implementation to instead leverage MT Column Stats Index.

Brief change log

  • Added HoodieDatasetUitls
  • Rebased HoodieFileIndex to use MT instead of bespoke CS Index
  • Fixing tests
  • Cleaning up

Verify this pull request

This pull request is already covered by existing tests, such as (please describe tests).

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@xiarixiaoyao xiarixiaoyao self-assigned this Mar 4, 2022
@xiarixiaoyao
Copy link
Contributor

@alexeykudinkin great works

@xiarixiaoyao xiarixiaoyao removed their assignment Mar 4, 2022
@yihua yihua self-assigned this Mar 4, 2022
case t: Throwable =>
logError("Failed to read col-stats index; skipping", t)
None
if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fs.exists is going to happen in every call if data skipping is enabled. This will hit perf as we observed in Presto. We should try to avoid it. I think we should just assume that metadata table exists and error out if it doesn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, we can replace it with config check whether MT is enabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope on a second thought -- there still could be case, when MT is enabled, but it's not bootstrapped yet, so we can't equate MT being enabled in config, with its presence in FS. Frankly, i don't see a way w/o fs.exists in some shape or form -- if not here it would happen w/in Spark's Data Source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the table config to determine which MT partitions are available for reading. Can you please track this in a JIRA?

val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)

// Persist DF to avoid re-computing column statistics unraveling
withPersistence(colStatsDF) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for persistence

} else {
logError("Failed to lookup candidate files in Z-index", e)
}
logError("Failed to lookup candidate files in Z-index", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe change z-index to column stats index in this err msg as well.

]
},
{
"doc": "Column name for which this column statistics applies",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change we'll have to revist when we tackle schema evolution. Can you please track it in a JIRA?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding to @codope 's point, does this break the read/write of metadata records in the existing metadata table if users enables it in older releases, e.g., 0.10.0 and 0.10.1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Data Skipping won't be functional w/o this column, so we will have to call out that folks would need to flush and rebuild their MT if they want to use with Data Skipping.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we need an upgrade step if column stats index is enabled in metadata table, and this should be somewhat automatic. @nsivabalan @vinothchandar wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would write break? Addition of field is a valid schema evolution that we support right.
For reads, maybe we just handle this gracefully, if this field is not present in metadata table then fallback to usual query path (w/o data skipping).

Copy link
Contributor

@yihua yihua Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not saying it would break, and based on the context provided it should be supported by schema evolution, so I'm good with it.

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general approach looks OK. I have a couple of doubts about how existing column stats are treated and the new DAG using metadata table column stats for data skipping.

]
},
{
"doc": "Column name for which this column statistics applies",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding to @codope 's point, does this break the read/write of metadata records in the existing metadata table if users enables it in older releases, e.g., 0.10.0 and 0.10.1?

// Simply traverse directory structure until found .hoodie folder
Path current = partitionPath;
while (current != null) {
if (hasTableMetadataFolder(fs, current)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One caveat is that this may incur more than one fs.exists() calls. Is this only used for initialization (which is fine), e.g., getting table path from config, and not for core read/write logic per data file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. This is only useful in a discovery phase.

val indexPath = metaClient.getColumnStatsIndexPath
private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
val fs = metaClient.getFs
val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan to deprecate existing column stats under .hoodie/.colstatsindex and remove all usage of it in 0.11? If not, should we have two modes where metadata col stats index is used when metadata table is enabled, and .hoodie/.colstatsindex is used if metadata table is disabled?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also need clarification on how .hoodie/.colstatsindex is generated. Does that come from clustering or it is also updated per write?

Copy link
Contributor Author

@alexeykudinkin alexeykudinkin Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, current bespoke implementation will be removed in a follow-up. It's currently updated only after clustering completes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say "current bespoke implementation will be removed in a follow-up", is this before or after 0.11.0 release? I think we still need to keep .hoodie/.colstatsindex and the data skipping logic based on that, and there should be a flag to choose how data skipping is done between that vs MT col stats. Because, if user doesn't choose to enable MT col stats in 0.11.0 and there is no data skipping logic based on .hoodie/.colstatsindex, data skipping cannot be done unless user goes back to 0.10.x. The old logic can be removed one release after 0.11.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline: Bespoke implementation of Col Stats Index would be removed in 0.11

Comment on lines +212 to +217
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
val metadataTableDF = spark.read.format("org.apache.hudi")
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")

// TODO filter on (column, partition) prefix
val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
.select(requiredMetadataIndexColumns.map(col): _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the logic of fetching column stats into DF be incorporated into BaseTableMetadata as there is already another API of getColumnStats()? In this way, it may also be possible to make the logic here metadata table agnostic, and instead rely on BaseTableMetadata/HoodieTableMetadata to decide which source (.hoodie/.colstatsindex on fs vs metadata table) to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am gonna start w/ a latter: i don't think we're planning to support both of these, since bespoke ColStats index was purely a stop-gap solution until we get primary MT index.

Having said that, i don't really see this to be commonly used for us to promote it into HoodieTableMetadata API: keep in mind that this table format is very Data Skipping specific and i don't think is very useful outside of that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, this is fine for now. I'm thinking from the perspective of whether this can be reused for index on the write path.

// query at hand might only be referencing a handful of those. As such, we collect all the
// column references from the filtering expressions, and only transpose records corresponding to the
// columns referenced in those
val transposedColStatsDF =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the purpose of doing transposing here is to adapt to the expected input of existing APIs of data skipping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. It also makes code much simpler (otherwise you need to do rows intersection for every column, that makes code more involved)

.reduceLeft((left, right) =>
left.join(right, usingColumn = HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may not scale well with a large number of columns from predicates, as DF joining is expensive, even considering caching. I'm wondering if a different DAG should be written for metadata table col stats, i.e., one row of col stats per file + column. Conceptually, I think such joining can be avoided when prunning the files.

Copy link
Contributor Author

@alexeykudinkin alexeykudinkin Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind that here we only join columns that the table is clustered by, so this is likely bounded by the number of 10. So, frankly, i don't think this will be a bottleneck, unless we're talking about gargantuan tables (with 10s of M files).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, we actually can't avoid joining for the following reason: ultimately to validate whether file will be accepted or not we will have to AND all of the rows of individual columns (ie all of the columns had to satisfy their respective filters) which implicitly requires join by the filename (one way or the other)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the code, I thought all columns from predicates are going to trigger joining (i.e., n number of columns -> n-1 joins), not just the clustering columns, since column stats index in metadata table can contain all columns from the schema.

There are cases where the table is fat (1k to 10k+ number of columns, see this blog) and the queries can have more than 10 predicates at Uber and ByteDance. Bytedance has PB-level tables which can easily have 10s of M files in a few partitions. I worry that the joining can take a hit for this kind of scale.

I understand that some kind of "joining" is needed here, but the spark table join in the current scheme expands the table after each join and adds additional col stats column. If for each of the df from a column from the following applies the filter first and generate a boolean for each file, then the next step is going to do AND, which does not require expanding columns and an additional cached table, reducing memory pressure and possible shuffling. Then that is much less costly than spark table/df join.

queryReferencedColumns.map(colName =>
          colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName))
            .select(targetColStatsIndexColumns.map(col): _*)
            .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, getNumNullsColumnNameFor(colName))
            .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, getMinColumnNameFor(colName))
            .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, getMaxColumnNameFor(colName))
        )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the code, I thought all columns from predicates are going to trigger joining (i.e., n number of columns -> n-1 joins), not just the clustering columns, since column stats index in metadata table can contain all columns from the schema.

Correct, but even in the current setup we only join M columns which are directly referenced in the predicates. So even for fat tables having 1000s of columns, this is unlikely to be a problem since M << N practically at all times.

I understand that some kind of "joining" is needed here, but the spark table join in the current scheme expands the table after each join and adds additional col stats column. If for each of the df from a column from the following applies the filter first and generate a boolean for each file, then the next step is going to do AND, which does not require expanding columns and an additional cached table, reducing memory pressure and possible shuffling. Then that is much less costly than spark table/df join.

Understand your point. Such slicing however will a) require to essentially revisit the whole flow, b) would blend in index reshaping and actual querying, and i think we're trying to optimize it prematurely at the moment. We can certainly fine-tune this flow, but i would much rather focus on its correctness right now and then follow up on the performance tuning after proper testing/profiling is done. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. As synced offline, the optimization of the flow of joining will be a follow-up, not in this PR. We still need a good understanding of the percentage of the time spent in the joining stage vs the overall query planning/execution time in different table sizes (small and medium to start with), to check if this is really the bottleneck, before actually optimizing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created HUDI-3611

@alexeykudinkin alexeykudinkin force-pushed the ak/mtmod-idx-1 branch 2 times, most recently from 14366ca to 97c07be Compare March 11, 2022 22:56
@alexeykudinkin
Copy link
Contributor Author

@hudi-bot run azure

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@apache apache deleted a comment from hudi-bot Mar 12, 2022
@alexeykudinkin
Copy link
Contributor Author

@hudi-bot run azure

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants