Skip to content

Commit bf18e26

Browse files
author
Alexey Kudinkin
committed
Bubbled resolveHoodieTable down to Spark >= 3.2 adapters
1 parent e5d2a9f commit bf18e26

6 files changed

Lines changed: 32 additions & 22 deletions

File tree

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ trait SparkAdapter extends Serializable {
100100

101101
def resolveHoodieTable(plan: LogicalPlan): Option[CatalogTable] = {
102102
EliminateSubqueryAliases(plan) match {
103-
// TODO add HoodieLogicalRelation
104103
case LogicalRelation(_, _, Some(table), _) if isHoodieTable(table) => Some(table)
105104
case _ => None
106105
}

hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,4 @@ object HoodieSpark3CatalystPlanUtils extends SparkAdapterSupport {
7575
case _ => None
7676
}
7777
}
78-
79-
}
78+
}

hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,14 @@ package org.apache.spark.sql.adapter
2020
import org.apache.hudi.Spark3RowSerDe
2121
import org.apache.hudi.client.utils.SparkRowSerDe
2222
import org.apache.spark.internal.Logging
23+
import org.apache.spark.sql.SparkSession
2324
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
24-
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
25-
import org.apache.spark.sql.catalyst.catalog.CatalogTable
2625
import org.apache.spark.sql.catalyst.encoders.RowEncoder
2726
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Predicate}
28-
import org.apache.spark.sql.catalyst.plans.logical.{HoodieLogicalRelation, LogicalPlan}
29-
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
3027
import org.apache.spark.sql.execution.datasources._
31-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3228
import org.apache.spark.sql.hudi.SparkAdapter
3329
import org.apache.spark.sql.internal.SQLConf
3430
import org.apache.spark.sql.types.StructType
35-
import org.apache.spark.sql.{HoodieCatalystPlansUtils, HoodieSpark3CatalystPlanUtils, SparkSession}
3631
import org.apache.spark.storage.StorageLevel
3732
import org.apache.spark.storage.StorageLevel._
3833

@@ -62,15 +57,6 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
6257
FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes)
6358
}
6459

65-
override def resolveHoodieTable(plan: LogicalPlan): Option[CatalogTable] = {
66-
EliminateSubqueryAliases(plan) match {
67-
case HoodieLogicalRelation(LogicalRelation(_, _, Some(table), _)) => Some(table)
68-
case LogicalRelation(_, _, Some(table), _) if isHoodieTable(table) => Some(table)
69-
case DataSourceV2Relation(v2Table: V2TableWithV1Fallback, _, _, _, _) if isHoodieTable(v2Table.v1Table) => Some(v2Table.v1Table)
70-
case _ => None
71-
}
72-
}
73-
7460
override def createInterpretedPredicate(e: Expression): InterpretedPredicate = {
7561
Predicate.createInterpreted(e)
7662
}

hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@ import org.apache.hudi.Spark32HoodieFileScanRDD
2222
import org.apache.spark.sql._
2323
import org.apache.spark.sql.avro._
2424
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
26+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
2527
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
2628
import org.apache.spark.sql.catalyst.parser.ParserInterface
27-
import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable}
29+
import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, HoodieLogicalRelation, LogicalPlan}
2830
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
31+
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
2932
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32HoodieParquetFileFormat}
30-
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
33+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
34+
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, LogicalRelation, PartitionedFile}
3135
import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_2ExtendedSqlParser}
3236
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType}
3337

@@ -45,6 +49,15 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
4549

4650
override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils
4751

52+
override def resolveHoodieTable(plan: LogicalPlan): Option[CatalogTable] = {
53+
EliminateSubqueryAliases(plan) match {
54+
case HoodieLogicalRelation(LogicalRelation(_, _, Some(table), _)) => Some(table)
55+
case LogicalRelation(_, _, Some(table), _) if isHoodieTable(table) => Some(table)
56+
case DataSourceV2Relation(v2Table: V2TableWithV1Fallback, _, _, _, _) if isHoodieTable(v2Table.v1Table) => Some(v2Table.v1Table)
57+
case _ => None
58+
}
59+
}
60+
4861
override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer =
4962
new HoodieSpark3_2AvroSerializer(rootCatalystType, rootAvroType, nullable)
5063

hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
2121
import org.apache.hudi.{DataSourceReadOptions, DefaultSource, SparkAdapterSupport}
2222
import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable
2323
import org.apache.spark.sql.catalyst.TableIdentifier
24-
import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedPartitionSpec}
24+
import org.apache.spark.sql.catalyst.analysis.UnresolvedPartitionSpec
2525
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.rules.Rule

hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
2424
import org.apache.spark.sql.catalyst.parser.ParserInterface
2525
import org.apache.spark.sql.catalyst.plans.logical._
2626
import org.apache.spark.sql.catalyst.InternalRow
27+
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
28+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
2729
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
28-
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
30+
import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
31+
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, LogicalRelation, PartitionedFile}
2932
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark33HoodieParquetFileFormat}
33+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3034
import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_3ExtendedSqlParser}
3135
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType}
3236
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark33CatalystExpressionUtils, HoodieSpark33CatalystPlanUtils, SparkSession}
@@ -45,6 +49,15 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
4549

4650
override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark33CatalystExpressionUtils
4751

52+
override def resolveHoodieTable(plan: LogicalPlan): Option[CatalogTable] = {
53+
EliminateSubqueryAliases(plan) match {
54+
case HoodieLogicalRelation(LogicalRelation(_, _, Some(table), _)) => Some(table)
55+
case LogicalRelation(_, _, Some(table), _) if isHoodieTable(table) => Some(table)
56+
case DataSourceV2Relation(v2Table: V2TableWithV1Fallback, _, _, _, _) if isHoodieTable(v2Table.v1Table) => Some(v2Table.v1Table)
57+
case _ => None
58+
}
59+
}
60+
4861
override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer =
4962
new HoodieSpark3_3AvroSerializer(rootCatalystType, rootAvroType, nullable)
5063

0 commit comments

Comments
 (0)