Skip to content

Conversation

@rdblue
Copy link
Contributor

@rdblue rdblue commented Mar 25, 2020

What changes were proposed in this pull request?

This adds support for metadata columns to DataSourceV2. If a source implements SupportsMetadataColumns it must also implement SupportsPushDownRequiredColumns to support projecting those columns.

The analyzer is updated to resolve metadata columns from LogicalPlan.metadataOutput, and this adds a rule that will add metadata columns to the output of DataSourceV2Relation if one is used.

Why are the changes needed?

This is the solution discussed for exposing additional data in the Kafka source. It is also needed for a generic MERGE INTO plan.

Does this PR introduce any user-facing change?

Yes. Users can project additional columns from sources that implement the new API. This also updates DescribeTableExec to show metadata columns.

How was this patch tested?

Will include new unit tests.

@rdblue rdblue force-pushed the add-dsv2-metadata-columns branch from 36c0267 to 0a5c7ca Compare March 25, 2020 22:38
@rdblue
Copy link
Contributor Author

rdblue commented Mar 25, 2020

FYI @HeartSaVioR and @brkyvz.

@dongjoon-hyun
Copy link
Member

cc @dbtsai

@SparkQA
Copy link

SparkQA commented Mar 26, 2020

Test build #120378 has finished for PR 28027 at commit 0a5c7ca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn])

@HeartSaVioR
Copy link
Contributor

It would be nice if we have TODO list or what's missing so that the PR is marked as WIP; it can help reviewers to avoid any misunderstanding or confusions. It would also help to determine whether the PR is ready to be reviewed or not.

@rdblue
Copy link
Contributor Author

rdblue commented Mar 27, 2020

@HeartSaVioR, this is currently missing tests.

We have this deployed in our internal Spark build and it works fine. In particular, metadata columns are only projected if they are purposely referenced. We haven't observed any problems with them being added by * expansion and we haven't had any issues with the changes to the analyzer.

@github-actions
Copy link

github-actions bot commented Jul 6, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jul 6, 2020
@github-actions github-actions bot closed this Jul 7, 2020
@rdblue rdblue removed the Stale label Oct 6, 2020
@holdenk holdenk reopened this Oct 6, 2020
@holdenk
Copy link
Contributor

holdenk commented Oct 6, 2020

Would it help @rdblue if someone submited a PR to your PR adding some tests?

@SparkQA
Copy link

SparkQA commented Oct 6, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34082/

@rdblue
Copy link
Contributor Author

rdblue commented Oct 6, 2020

@holdenk, I can add some tests, but I appreciate it. I have mainly not been prioritizing this, but if you can help review I can write up the tests.

@brkyvz
Copy link
Contributor

brkyvz commented Oct 6, 2020

@rdblue I'd be happy to help review as well. I think a lot of nice things can be built correctly on top of this interface, e.g. input_file_name() and also object level metadata, etc

@SparkQA
Copy link

SparkQA commented Oct 6, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34082/

@rdblue
Copy link
Contributor Author

rdblue commented Oct 6, 2020

Thanks, @brkyvz! It would be great to get your opinion as well.

@SparkQA
Copy link

SparkQA commented Oct 7, 2020

Test build #129475 has finished for PR 28027 at commit 0a5c7ca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn])

@rdblue rdblue force-pushed the add-dsv2-metadata-columns branch from 0a5c7ca to 14f8c3e Compare October 14, 2020 20:34
@rdblue rdblue changed the title [SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2 (WIP) [SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2 Oct 14, 2020
@rdblue
Copy link
Contributor Author

rdblue commented Oct 14, 2020

@holdenk, @brkyvz, I updated this and added tests. Please have a look, I think it is ready to review now.

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34377/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34377/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Test build #129771 has finished for PR 28027 at commit 14f8c3e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests look good, small question on if I understood a bit of the code correctly.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a fantastic interface. I'm a bit worried about how we expose it. Let me know what you think!

Comment on lines +884 to +893
override def metadataOutput: Seq[Attribute] = {
val qualifierList = identifier.qualifier :+ alias
child.metadataOutput.map(_.withQualifier(qualifierList))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this differentiation needed? Won't the metadata columns be part of output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are eventually part of the output, but they can't be at first because * expansion uses all of output. If we added them immediately, we would get metadata columns in a select *.

Instead, we add the metadata columns to this and then update column resolution to look up columns here. The result is that we can resolve everything just like normal, including *, but the columns are missing from output. Then the new analyzer rule adds the columns to the output if they are resolved, but missing. Since the parent node is already resolved, we know that this is safe and happens after * expansion.

Comment on lines 53 to 55
val attrs = hasMeta.metadataColumns
val outputNames = outputSet.map(_.name).toSet
attrs.filterNot(col => outputNames.contains(col.name)).toAttributes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you need to resolve column names case insensitively (according to column name)?
What if there's a match between a data column and the metadata column?

I believe we'll have to come up with UDFs to access these metadata columns. input_file_name() could be one such UDF

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is that metadata columns can be part of a struct, and you can select fields out of those metadata columns. A get_source_metadata() UDF can be a catch all, and you can use it like:
get_source_metadata("file_size") or get_source_metadata("row_id") and things like that. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that this should respect the case sensitivity setting. I'll update it.

I'd rather not use a struct for metadata columns. I think that would make them harder to produce and would also introduce an unnecessary row copy in a lot of cases. For example, if I run select id, metadata._file, then the source will produce something like struct(id=1, metadata=struct(_file='path.parquet')), which would then be copied into the expected struct(id=1, _file='path.parquet').

I also thought that we would like to get away from using a function that behaves differently based on its context, like input_file_name. It is better to project _file, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we don't have the structs, but just have to use this UDF to access a metadata column?
So you can do:

select id, source_metadata(file)

this would avoid name collisions and explicitly tells you which functions need to be resolved. You can also return better error messages and things knowing that the column you tried to resolve was a metadata column or a column.

I believe other systems like Oracle didn't have a function for pseudocolumns, therefore I'm also fine with this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about this suggestion. I think there are two ways we might implement the function-style approach that I'll respond to separately.

First, we could push the function to the data source so that metadata columns are no longer hidden columns. When requesting a projection, Spark would also pass an Expression (the public one) and that would be something like metadata_column("_file") or input_file(). This would require a different interface, but would also be more flexible (metadata tables could return things like row_count()). The trade-off is that the rules are a lot more complicated. The optimizer is needed to push expressions down to the leaves so that these functions can be pushed to the source. But, non-deterministic expressions can prevent that pushdown, which could lead to some cases where the function works and some where it doesn't because there is no way to run the function without pushing it to the data source. And, the function would need to appear like it is resolved when it actually isn't to make it to the optimizer. Also, I think we discussed this option in the v2 syncs and came to the conclusion that it would be simpler to project columns.

Second, we could use a function to signal that a column is a metadata column only, and keep the interface to sources the same. So the way to project _file is via metadata_column("_file"), and Spark will add _file to the output of the relation (and request it through the DSv2 API) if it finds a metadata column with that name. That is a nice way to avoid the current way that columns are exposed through metadataColumns in the Spark plan. But it also has the drawbacks of the first option: converting the function call into an attribute reference is much more complicated.

The current implementation works well. We've been running this code in production for months now, so I would prefer to move forward with this approach.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the column approach has the following benefits over the function approach:

  • You can scope the location of the calls (you can't reference the metadata unless you're directly SELECTing from the table, so no calls in places where it doesn't make sense),

  • You can call them immediately from a join, eg. SELECT t1._input_file_name, t2._input_file_name FROM Table1 t1 JOIN Table2 t2. The function syntax doesn't allow this, unless you make it something "special" like input_file_name(t1).

  • You can easily push down the projection using existing mechanisms to turn off the overhead of generating the columns. The semantics of such pushdown with functions is iffy at best.

  • If you convert this to the physical level directly, a column is more compatible with physical transformations e.g. things that add readahead and buffering. The current approach like input_file_name() fundamentally makes buffering and readahead hard. You'd have to convert it to columns under the covers anyway.

However, the column approach has name conflicts. This gets problematic if people actually select these columns without assigning new aliases and then write the result to files, and then try to read those back. That means that either DataFrames don't round trip with 100% fidelity via files (which may break assumptions of libraries that people have built), or you can't access metadata columns on files that have columns with these names, which breaks libraries that assume that these features work. If I'd write table._some_metadata_column in an auditing-related library, I'd like to be sure that I get what I asked for, and that this cannot be spoofed.

How about using special syntax like input_file_name(t1), or metadata(t1, file_name), where t1 must be a table (data source, not subquery/view) in the FROM clause, optional if there's only one table. So it's not a real function, it's just function-call-like syntax. That can be converted into a special type of column reference under the covers, even at parsing time, giving it all the advantages of columns from an implementation perspective. The column reference could use a reserved name that is not possible to write otherwise. When used in the SELECT list, we wouldn't assign that column name as the SELECT list alias, so it wouldn't show up in files! You still get an error at analysis time if you call it from a place where there's no table, so there's no incorrect use like with normal nondeterministic functions. This has the advantage that you can't have name conflicts. If we would add input_file_name(t1) and input_file_name() as such special functions with special syntax, then we'd have backward compatibility with existing queries, and we could migrate that mechanism to the new mechanism. And to avoid having to add extra functions for everything, we'd addmetadata(...) for all other metadata columns.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative would be to make the column names use a recognizable pattern that we can forbid as column names. E.g. Snowflake uses metadata$foo for these columns, which is a lot more distinguishable than just a leading underscore. Hence, we could forbid using these as normal identifiers (basically check for a metadata$ prefix) to prevent the round-tripping-via-file issues, but other than that the accesses would still look like normal column references.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bart-samwel, I like your last suggestion, but with a slight change: I think sources should reject column names that conflict with their metadata columns. That way, a source can choose a format that makes sense (e.g., __offset) and allow everything else. All Spark has to do is to define what the behavior will be when there is a conflict, and recommend that sources reject column names that are also exposed as metadata columns.

I think using a special syntax requires significantly more changes, so I don't think that's a good option. Specific replies are below.

If I'd write table._some_metadata_column in an auditing-related library, I'd like to be sure that I get what I asked for, and that this cannot be spoofed.

I think this is a fair argument, but I expect it would very rarely be a problem that you wanted a metadata column and accidentally projected a data column.

How about using special syntax

This is what I considered above, "we could use a function to signal that a column is a metadata column only, and keep the interface to sources the same . . ." I think the challenge here is carrying the information through analysis and optimization. This approach either has many of the same complications as using a function (if kept as an expression), or we would need to introduce a new attribute type for metadata attrs that would hopefully work in rules just like normal attributes in existing rules. We can't lose the fact that the attribute references metadata, so we can't replace a metadata attribute reference with an attribute reference.

I think the option most likely to not introduce more problems is to use a regular attribute for these columns, but that eliminates the benefit of a special syntax to track the fact that they are metadata column references. This doesn't seem like a good option to me, if all we want to do is guarantee that metadata columns don't conflict with data columns.

The special syntax is also more awkward to use. One of the use cases for this feature is to expose non-schema fields like Kafka topic, offset, and partition. I think it is much more natural to select _offset than metadata_column("offset"). And if these are exposed as columns, we can add them to DESCRIBE EXTENDED, like I've done in this PR.

An alternative would be to make the column names use a recognizable pattern that we can forbid as column names.

I like this suggestion, but I'd leave rejecting column names to sources as I said above. Another option is to make this available, but not enforce it. We could add a check that data columns are not of the form metadata$.*, but not require metadata column names using the pattern. That way there is a way to guarantee names don't conflict, but if a source chooses to use a name that may conflict it still works fine.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bart-samwel, I like your last suggestion, but with a slight change: I think sources should reject column names that conflict with their metadata columns. That way, a source can choose a format that makes sense (e.g., __offset) and allow everything else. All Spark has to do is to define what the behavior will be when there is a conflict, and recommend that sources reject column names that are also exposed as metadata columns.

They probably should reject such column names, yes. Especially at write time -- you don't want to be in a situation where you wrote a file and can then never read it back. But if you did that, how would you deal with it? Could you never read it with Spark? One alternative is to just hide the column if it's in the file. You wouldn't be able to access it, but at least that would satisfy the "auditing library requirement" that the metadata fields always do the expected thing. Another alternative would be to expose the column under a new name. But what if that new name is also there? It gets very annoying very quickly. So maybe just hiding is the best policy.

I think using a special syntax requires significantly more changes, so I don't think that's a good option. Specific replies are below.

If I'd write table._some_metadata_column in an auditing-related library, I'd like to be sure that I get what I asked for, and that this cannot be spoofed.

I think this is a fair argument, but I expect it would very rarely be a problem that you wanted a metadata column and accidentally projected a data column.

It'll be rare, but if I have an auditing library that e.g. inserts a special execution node on top of a scan to track exactly what was read, then it would be great if people can't spoof that by adding a column with a particular special name to their underlying files. If the internal metadata column names take precedence over the columns in the file, then that problem is not there. (FWIW, I usually try to apply the principle of "name resolution must be environment independent", i.e., if a query defines something locally, e.g. an alias, then something in the environment (such as a table's schema) shouldn't be able to make that aspect of the query change meaning, because then things will break randomly as a result of schema changes. This fits in the same category.)

How about using special syntax

This is what I considered above, "we could use a function to signal that a column is a metadata column only, and keep the interface to sources the same . . ." I think the challenge here is carrying the information through analysis and optimization. This approach either has many of the same complications as using a function (if kept as an expression), or we would need to introduce a new attribute type for metadata attrs that would hopefully work in rules just like normal attributes in existing rules. We can't lose the fact that the attribute references metadata, so we can't replace a metadata attribute reference with an attribute reference.

Yeah, I was thinking about the special AttributeReference type. But more in the way of "use a very special naming convention that you can't type" -- maybe because we'd reject it in the parser if you'd try to explicitly write an identifier like that. We'd have to do something to prevent that column name from showing up in output files etc. though, so it's not entirely foolproof.

I think the option most likely to not introduce more problems is to use a regular attribute for these columns, but that eliminates the benefit of a special syntax to track the fact that they are metadata column references. This doesn't seem like a good option to me, if all we want to do is guarantee that metadata columns don't conflict with data columns.

The special syntax is also more awkward to use. One of the use cases for this feature is to expose non-schema fields like Kafka topic, offset, and partition. I think it is much more natural to select _offset than metadata_column("offset"). And if these are exposed as columns, we can add them to DESCRIBE EXTENDED, like I've done in this PR.

That aspect of columns is certainly nice.

An alternative would be to make the column names use a recognizable pattern that we can forbid as column names.

I like this suggestion, but I'd leave rejecting column names to sources as I said above. Another option is to make this available, but not enforce it. We could add a check that data columns are not of the form metadata$.*, but not require metadata column names using the pattern. That way there is a way to guarantee names don't conflict, but if a source chooses to use a name that may conflict it still works fine.

So we could set the example by using this pattern in data sources that we control, but other data sources could choose otherwise. I guess that works. I'm personally inclined to be a bit more prescriptive about these things, basically because if something is a best practice but there's no reviewer for the next data source that knows about this best practice, then the best practice won't be followed. And before you know it, there's a data source out in the wild that you can't migrate to use the best practice because there's already usage out there. Maybe it's already enough if all the existing data sources consistently use the pattern -- then at least it's "cargo cult proof".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the internal metadata column names take precedence over the columns in the file, then that problem is not there.

To be clear, I think that data columns should always take precedence over metadata, so that you can always read what was written. You just may not be able to load the optional metadata columns if they conflict. And that conflict should handled by the source by rejecting columns at write time.

For your source, you'd have the guarantee that metadata$.* columns do not conflict to satisfy your requirement, but another source may use a different convention.

@brkyvz
Copy link
Contributor

brkyvz commented Oct 27, 2020

cc @jose-torres

@SparkQA
Copy link

SparkQA commented Nov 4, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35178/

@SparkQA
Copy link

SparkQA commented Nov 4, 2020

Test build #130577 has finished for PR 28027 at commit d661246.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue rdblue force-pushed the add-dsv2-metadata-columns branch from d661246 to 9a4dfb1 Compare November 4, 2020 17:07
@rdblue
Copy link
Contributor Author

rdblue commented Nov 4, 2020

Rebased to fix the conflict in DataSourceV2SQLSuite.

@SparkQA
Copy link

SparkQA commented Nov 4, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35212/

@SparkQA
Copy link

SparkQA commented Nov 4, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35212/

@SparkQA
Copy link

SparkQA commented Nov 4, 2020

Test build #130611 has finished for PR 28027 at commit 9a4dfb1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. If you can add the final documentation on the behavior during collisions, that would be great

Comment on lines +11 to +12
* row.
* <p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we talk about the behavior of reserving names for metadata columns or the behavior that will happen during name collisions here (data columns will be selected over metadata)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I'll add that and rebase to fix the conflicts. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this:

If a table column and a metadata column have the same name, the metadata column will never be requested and is ignored. It is recommended that Table implementations reject data column names that conflict with metadata column names.

@rdblue rdblue force-pushed the add-dsv2-metadata-columns branch from 9a4dfb1 to 64997f2 Compare November 18, 2020 00:40
@rdblue rdblue force-pushed the add-dsv2-metadata-columns branch from 64997f2 to 6e72f4d Compare November 18, 2020 00:41
@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35842/

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35842/

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Test build #131238 has finished for PR 28027 at commit 6e72f4d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented Nov 18, 2020

@holdenk, @brkyvz, @bart-samwel, is everyone okay with merging this? I think we have agreement, but I just want to make sure. Thanks!

@brkyvz
Copy link
Contributor

brkyvz commented Nov 18, 2020

I looked at this PR a couple hours ago and it had failing tests. Now that it has passed all builders, I'm going to merge this. Thank you @rdblue !

@asfgit asfgit closed this in 1df69f7 Nov 18, 2020
@rdblue
Copy link
Contributor Author

rdblue commented Nov 18, 2020

Thanks, @brkyvz! Great to have this in.

@@ -0,0 +1,58 @@
package org.apache.spark.sql.connector.catalog;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm @brkyvz @rdblue don't we require license header for new files?

Copy link
Contributor

@HeartSaVioR HeartSaVioR Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that's weird RAT missed this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open a new PR to add the license headers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #30415

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was due to incorrect exclusion rule. I made a PR.

dongjoon-hyun pushed a commit that referenced this pull request Nov 19, 2020
### What changes were proposed in this pull request?

Add missing license headers for new files added in #28027.

### Why are the changes needed?

To fix licenses.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This is a purely non-functional change.

Closes #30415 from rdblue/license-headers.

Authored-by: Ryan Blue <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Nov 23, 2020

Hi, All.
This seems to break Scala 2.13 Unit Test.

$ dev/change-scala-version.sh 2.13
$ build/sbt "sql/testOnly *.DataSourceV2SQLSuite" -Pscala-2.13
...
[info] - SPARK-31255: Project a metadata column *** FAILED *** (96 milliseconds)
[info] - SPARK-31255: Projects data column when metadata column has the same name *** FAILED *** (77 milliseconds)

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Nov 23, 2020

The newly added test case seems dependent on Scala versions.

I filed https://issues.apache.org/jira/browse/SPARK-33524 .

@dongjoon-hyun
Copy link
Member

cc @srowen

@dongjoon-hyun
Copy link
Member

cloud-fan added a commit that referenced this pull request Feb 5, 2021
### What changes were proposed in this pull request?

This is a follow-up of #28027

#28027 added a DS v2 API that allows data sources to produce metadata/hidden columns that can only be seen when it's explicitly selected. The way we integrate this API into Spark is:
1. The v2 relation gets normal output and metadata output from the data source, and the metadata output is excluded from the plan output by default.
2. column resolution can resolve `UnresolvedAttribute` with metadata columns, even if the child plan doesn't output metadata columns.
3. An analyzer rule searches the query plan, trying to find a node that has missing inputs. If such node is found, transform the sub-plan of this node, and update the v2 relation to include the metadata output.

The analyzer rule in step 3 brings a perf regression, for queries that do not read v2 tables at all. This rule will calculate `QueryPlan.inputSet` (which builds an `AttributeSet` from outputs of all children) and `QueryPlan.missingInput` (which does a set exclusion and creates a new `AttributeSet`) for every plan node in the query plan. In our benchmark, the TPCDS query compilation time gets increased by more than 10%

This PR proposes a simple way to improve it: we add a special metadata entry to the metadata attribute, which allows us to quickly check if a plan needs to add metadata columns: we just check all the references of this plan, and see if the attribute contains the special metadata entry, instead of calculating `QueryPlan.missingInput`.

This PR also fixes one bug: we should not change the final output schema of the plan, if we only use metadata columns in operators like filter, sort, etc.

### Why are the changes needed?

Fix perf regression in SQL query compilation, and fix a bug.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Run `org.apache.spark.sql.TPCDSQuerySuite`, before this PR, `AddMetadataColumns` is the top 4 rule ranked by running time
```
=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 407641
Total time: 47.257239779 seconds

Rule                                  Effective Time / Total Time                     Effective Runs / Total Runs

OptimizeSubqueries                      4157690003 / 8485444626                         49 / 2778
Analyzer$ResolveAggregateFunctions      1238968711 / 3369351761                         49 / 2141
ColumnPruning                           660038236 / 2924755292                          338 / 6391
Analyzer$AddMetadataColumns             0 / 2918352992                                  0 / 2151
```
after this PR:
```
Analyzer$AddMetadataColumns             0 / 122885629                                   0 / 2151
```
This rule is 20 times faster and is negligible to the total compilation time.

This PR also add new tests to verify the bug fix.

Closes #31440 from cloud-fan/metadata-col.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan added a commit that referenced this pull request Feb 5, 2021
### What changes were proposed in this pull request?

This is a follow-up of #28027

#28027 added a DS v2 API that allows data sources to produce metadata/hidden columns that can only be seen when it's explicitly selected. The way we integrate this API into Spark is:
1. The v2 relation gets normal output and metadata output from the data source, and the metadata output is excluded from the plan output by default.
2. column resolution can resolve `UnresolvedAttribute` with metadata columns, even if the child plan doesn't output metadata columns.
3. An analyzer rule searches the query plan, trying to find a node that has missing inputs. If such node is found, transform the sub-plan of this node, and update the v2 relation to include the metadata output.

The analyzer rule in step 3 brings a perf regression, for queries that do not read v2 tables at all. This rule will calculate `QueryPlan.inputSet` (which builds an `AttributeSet` from outputs of all children) and `QueryPlan.missingInput` (which does a set exclusion and creates a new `AttributeSet`) for every plan node in the query plan. In our benchmark, the TPCDS query compilation time gets increased by more than 10%

This PR proposes a simple way to improve it: we add a special metadata entry to the metadata attribute, which allows us to quickly check if a plan needs to add metadata columns: we just check all the references of this plan, and see if the attribute contains the special metadata entry, instead of calculating `QueryPlan.missingInput`.

This PR also fixes one bug: we should not change the final output schema of the plan, if we only use metadata columns in operators like filter, sort, etc.

### Why are the changes needed?

Fix perf regression in SQL query compilation, and fix a bug.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Run `org.apache.spark.sql.TPCDSQuerySuite`, before this PR, `AddMetadataColumns` is the top 4 rule ranked by running time
```
=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 407641
Total time: 47.257239779 seconds

Rule                                  Effective Time / Total Time                     Effective Runs / Total Runs

OptimizeSubqueries                      4157690003 / 8485444626                         49 / 2778
Analyzer$ResolveAggregateFunctions      1238968711 / 3369351761                         49 / 2141
ColumnPruning                           660038236 / 2924755292                          338 / 6391
Analyzer$AddMetadataColumns             0 / 2918352992                                  0 / 2151
```
after this PR:
```
Analyzer$AddMetadataColumns             0 / 122885629                                   0 / 2151
```
This rule is 20 times faster and is negligible to the total compilation time.

This PR also add new tests to verify the bug fix.

Closes #31440 from cloud-fan/metadata-col.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 989eb68)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants