-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-4186] Support Hudi with Spark 3.3.0 #5943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Upgrade Parquet version to 1.12.3. |
c6f80a4 to
95c9cde
Compare
b99eb59 to
e523893
Compare
|
@CTTY is this still WIP? |
|
@alexeykudinkin @XuQianJin-Stars @YannByron This PR is going to add support for Spark 3.3. In the long term, how should we maintain the support matrix for Spark in Hudi? Do we deprecate the support for older Spark versions as we add new versions? cc @xushiyan @vinothchandar |
| "Partition column types may not be specified in Create Table As Select (CTAS)", | ||
| ctx) | ||
|
|
||
| // CreateTable / CreateTableAsSelect was migrated to v2 in Spark 3.3.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change made according to https://issues.apache.org/jira/browse/SPARK-36850
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also see SPARK-36902
| ) | ||
|
|
||
| new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits) | ||
| // TODO: which schema to use here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes made according to https://issues.apache.org/jira/browse/SPARK-37273
| MessageType parquetSchema = new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath); | ||
| Configuration hadoopConf = context.getHadoopConf().get(); | ||
| MessageType parquetSchema = new ParquetUtils().readSchema(hadoopConf, filePath); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change made according to SPARK-36935. ParquetSchemaConverter change
| case IdentityTransform(FieldReference(Seq(col))) => | ||
| identityCols += col | ||
|
|
||
| case BucketTransform(numBuckets, Seq(FieldReference(Seq(col))), _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPARK-37627 Separate SortedBucketTransform from BucketTransform
...datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
Outdated
Show resolved
Hide resolved
| // It's critical for this rules to follow in this order, so that DataSource V2 to V1 fallback | ||
| // is performed prior to other rules being evaluated | ||
| rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, spark32ResolveAlterTableCommands) | ||
| rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, resolveAlterTableCommands) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPARK-38939 DropColumns syntax change
|
|
||
| override def parseDataType(sqlText: String): DataType = delegate.parseDataType(sqlText) | ||
|
|
||
| // SPARK-37266 Added parseQuery to ParserInterface in Spark 3.3.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPARK-37266 ParserInterface change
|
removed [WIP] tag to unblock Azure CI. This PR is still under work |
|
@yihua this is a good question. IMO we should avoid breaking unless we absolutely have to and make sure we maintain compatibility as long as it makes sense from the standpoint of investing resources to maintain it. In this case, i'd say that we should not break any existing compatibility (Spark 2.4, 3.1, 3.2, 3.3) but instead, say, declare that 3.1 is in maintenance (EOL) mode and new features are not guaranteed to work in there in the future releases. Thoughts? |
|
Most hive sync CI tests are failing. I saw another PR working on this: #6110 |
af12dbd to
4eff788
Compare
...datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
Show resolved
Hide resolved
...e/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
Outdated
Show resolved
Hide resolved
...source/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
Outdated
Show resolved
Hide resolved
...spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
Outdated
Show resolved
Hide resolved
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@CTTY I see a lot of classes for Spark 3.3 support, e.g., Spark33DataSourceUtils, are just copied from existing Spark 3.2 support classes in Hudi. Are they safe? Should we update them based on corresponding Spark 3.3 classes?
...ache/spark/sql/execution/datasources/parquet/Spark33HoodieVectorizedParquetRecordReader.java
Show resolved
Hide resolved
hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
Show resolved
Hide resolved
...hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
Outdated
Show resolved
Hide resolved
...lient/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
Show resolved
Hide resolved
| <groupId>org.apache.hadoop</groupId> | ||
| <artifactId>hadoop-auth</artifactId> | ||
| </dependency> | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good find. so can we now re-enable spark 3.2 quickstart test in GH action? check out bot.yml
...k-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
Outdated
Show resolved
Hide resolved
It's not easy to decide. |
This reverts commit 9584223.
| if (deleteTable.condition.isDefined) { | ||
| df = df.filter(Column(deleteTable.condition.get)) | ||
| } | ||
| // SPARK-38626 DeleteFromTable.condition is changed from Option[Expression] to Expression in Spark 3.3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the comment can go into the Spark adapter implementation and is not necessary here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be addressed in a separate PR.
alexeykudinkin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few minor comments, we can take those in a follow-up PR
| val resolvedCondition = condition.map(resolveExpressionFrom(table)(_)) | ||
| // Return the resolved DeleteTable | ||
| DeleteFromTable(table, resolvedCondition) | ||
| val resolveExpression = resolveExpressionFrom(table, None)_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest we keep syntax as it was (with parenthesis)
| new Spark2HoodieFileScanRDD(sparkSession, readFunction, filePartitions) | ||
| } | ||
|
|
||
| override def resolveDeleteFromTable(deleteFromTable: Command, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we have extractCondition we can get rid of resolveDeleteFromTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't simply reuse the method, resolveDeleteFromTable has a different logic that involves resolveFromExpression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why we can't:
- We get rid of the method completely
- We use
extractConditionto extract condition and then do everything else (resolution, etc) in the caller
| /* SPARK-37266 Added parseQuery to ParserInterface in Spark 3.3.0. This is a patch to prevent | ||
| hackers from tampering text with persistent view, it won't be called in older Spark | ||
| Don't mark this as override for backward compatibility | ||
| Can't use sparkExtendedParser directly here due to the same reason */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, but i can't understand the java-doc: can you please elaborate on why this is here?
- What exactly are we trying to prevent from happening?
- What BWC are we referring to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parseQuery is a new method of Spark trait ParserInterface. there would be compile issue If we call this method from any class that’s shared across different versions of spark, because older ParserInterface doesn’t have this method
Due to the same reason, we can't mark this method with override because for older spark there isn't parseQuery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed on Slack, let's instead of doing parsing in SparkAdapter create ExtendedParserInterface, where we can place this new parseQuery method and that could be used in Hudi's code-base (this is similar to how HoodieCatalystExpressionUtils set up)
|
|
||
| override def getQueryParserFromExtendedSqlParser(session: SparkSession, delegate: ParserInterface, | ||
| sqlText: String): LogicalPlan = { | ||
| new HoodieSpark3_3ExtendedSqlParser(session, delegate).parseQuery(sqlText) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a query parser -- this is already parsed query
| hackers from tampering text with persistent view, it won't be called in older Spark | ||
| Don't mark this as override for backward compatibility | ||
| Can't use sparkExtendedParser directly here due to the same reason */ | ||
| def parseQuery(sqlText: String): LogicalPlan = parse(sqlText) { parser => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we doing double-parsing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reused the code flow from parsePlan method under the same class here. Calling parse might not be needed here. good point
| DeleteFromTable(deleteFromTableCommand.table, resolvedCondition) | ||
| } | ||
|
|
||
| override def extractCondition(deleteFromTable: Command): Expression = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also return Option instead of null
|
@CTTY please add the Jiras in the description so that they're more easily discoverable |
Updated follow-up jiras |
| * Extract condition in [[DeleteFromTable]] | ||
| * SPARK-38626 condition is no longer Option in Spark 3.3 | ||
| */ | ||
| def extractCondition(deleteFromTable: Command): Expression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to extractDeleteCondition?
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @CTTY you should address the minor comments in a separate PR.
Created this jira to track it: https://issues.apache.org/jira/browse/HUDI-4491 |
|
Created this umbrella jira and linked existing follow-up jiras to it: https://issues.apache.org/jira/browse/HUDI-4492 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| <spark3.version>3.3.0</spark3.version> | ||
| <spark.version>${spark3.version}</spark.version> | ||
| <sparkbundle.version>3</sparkbundle.version> | ||
| <sparkbundle.version>3.3</sparkbundle.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't sparkbundle.version still be 3 for this profilie? After building code with -Dspark3 option, I see the bundle named hudi-spark3.3-bundle-* instead of hudi-spark3-bundle-*. Is that expected?
| <scala.version>${scala12.version}</scala.version> | ||
| <scala.binary.version>2.12</scala.binary.version> | ||
| <hudi.spark.module>hudi-spark3</hudi.spark.module> | ||
| <hudi.spark.module>hudi-spark3.3.x</hudi.spark.module> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. should we keep it hudi-spark3?
Tips
What is the purpose of the pull request
Support Spark 3.3.0
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.