Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
130 commits
Select commit Hold shift + click to select a range
a701045
`isHoodieTable` > `resolvesToHoodieTable`; Make sure predicate only m…
Aug 10, 2022
3db882d
Cleaned up unnecessary `HoodieCatalystPlanUtils` overrides
Aug 10, 2022
ef71639
Added `resolveHoodieTable` fetching `CatalogTable` from the resolved …
Aug 10, 2022
54fffad
Fixed `TimeTravelRelation` implementation to become `LeafNode` instea…
Aug 10, 2022
05f007d
Converted `TimeTravelRelation` to `UnaryNode`;
Aug 10, 2022
ac038fe
Tidying up
Aug 10, 2022
aee5282
Fixing compilation
Aug 11, 2022
92384b0
Extracted `TimeTravelRelation` resolution rule to `HoodieSpark3Analysis`
Aug 11, 2022
961639f
Fixing compilation for Spark 3.1
Aug 11, 2022
7011c38
(Temporarily) Disabling test
Aug 17, 2022
80e5ddc
Fixing `MergeIntoTable` statement to make sure source/target outputs …
Aug 17, 2022
7b53b8e
Make sure Hudi rules only applicable to resolved constructs
Aug 17, 2022
a2e2499
Fixed `MergeIntoHoodieTableCommand` to reference proper table identif…
Aug 17, 2022
dbbae0c
Fixing tests
Aug 18, 2022
9185242
Rebased Hudi `*Index` commands to bear resolved `CatalogTable` instea…
Aug 18, 2022
6f78b9f
Rebased `Delete`/`Update` commands onto using already resolved tables
Aug 18, 2022
767a01a
Fixing invalid `MERGE INTO` statements in tests
Aug 18, 2022
9cbf64e
Cleaning up dead code
Aug 18, 2022
2089c9f
Extracted `HoodieExtendedParserInterface` to consolidate all extensio…
Aug 19, 2022
e30adb3
Cleaned up Spark 3 specific rules
Aug 19, 2022
d6de88d
Cleaned up `InsertIntoStatement` matching;
Aug 19, 2022
62c50af
Fixed fallback from V2 to V1 for `InsertIntoStatement`;
Aug 19, 2022
7f9ac9d
Avoid unnecessary `DataFrame` de-referencing
Aug 19, 2022
0cb1b70
Fixing test queries
Aug 19, 2022
363f96d
Fixing compilation
Aug 22, 2022
ba3d189
Added `AvroSchemaEvolutionUtils.reconcileFieldNamesCasing`;
Aug 25, 2022
0b6fe49
Make sure `HoodieSparkSqlWriter` reconciles Schema's field-name casin…
Aug 25, 2022
2654d09
Make sure `InternalSchema` can handle case-sensitive use-cases
Aug 25, 2022
77e252d
Revisited `HoodieSparkUtils.createRdd` seq to appropriately handle th…
Aug 26, 2022
8c516be
Clean up `MergeIntoHoodieTableCommand`
Aug 26, 2022
4889e93
Cleaned up `HoodieAnalysis`
Aug 26, 2022
f7f1e1c
Revisited `MergeIntoHoodieTableCommand` impl to address plentiful issues
Aug 27, 2022
0929792
Fixed invalid assignment expressions binding;
Aug 30, 2022
3fcdaef
Fixing invalid equality check
Aug 30, 2022
ffe554e
Fixed resolving pre-combine field attribute to fail w/ proper error d…
Aug 30, 2022
7ff04f2
Fixed source dataset reshaping seq to always override primary/pre-com…
Aug 30, 2022
75a753f
Restricted permitted merge-condition experssions to simple attribute-…
Aug 30, 2022
44c928d
Fixed Cast pattern-matching to accommodate for Spark >= 3.2 appropria…
Aug 30, 2022
95e8ec0
Cleaned up Deleting actions to reuse same code as Updating/Inserting …
Aug 30, 2022
b57b4e3
Added validation of the schema to-be-expected by `ExpressionPayload` …
Aug 30, 2022
6d88450
Cleaned up duplicated methods
Aug 30, 2022
4a3ccc0
Make schema validations to be gated by config;
Aug 30, 2022
089d5a2
Fixed `MergeIntoHoodieTableCommand` to properly bind expressions agai…
Aug 31, 2022
7a942bf
Fixed tests
Aug 31, 2022
5fe692a
Fixed handling of deleting actions
Aug 31, 2022
acf96eb
Tidying up
Aug 31, 2022
aa038be
Gate validation t/h standalone internal config;
Aug 31, 2022
8f163ef
Abstracted meta-field's `Metadata` instantiation w/in `SparkAdapter`
Aug 31, 2022
10dfd2e
After rebsae cleanup
Aug 31, 2022
99d38d3
Abstracting `HoodieLogicalRelation` in `hudi-spark3-common`
Aug 31, 2022
026cd53
Moved `createCatalystMetadataForMetaField` from `HoodieCatalystPlansU…
Aug 31, 2022
b77adc5
Bifurcated `HoodieCatalystPlanUtils` for all Spark 3.x versions;
Aug 31, 2022
24849f5
Fixing compilation
Aug 31, 2022
616225e
Bubbled `resolveHoodieTable` down to Spark >= 3.2 adapters
Aug 31, 2022
677e7c0
Tidying up
Aug 31, 2022
132d384
`HoodieSpark3Analysis` > `HoodieSpark32PlusAnalysis`
Sep 2, 2022
a3c1310
Extracted rules resolving/folding `HoodieLogicalRelation` into `Hoodi…
Sep 2, 2022
c2f361f
Missing license;
Sep 2, 2022
1061025
Fixing init seq
Sep 2, 2022
83c8a66
Disable rules incompatible w/ Spark 3.1
Sep 2, 2022
23648bb
Missing `resolveHoodieTable` override properly handling `HoodieLogica…
Sep 2, 2022
cc0a2a3
Fixing error messages for Spark 3.1
Sep 2, 2022
6e6c6f8
Exempt transforms run w/in analysis phase
Sep 2, 2022
15c40a4
Fixed folding of the `HoodieLogicalRelation`s to properly restore ful…
Sep 10, 2022
79ceac6
Fixing invalid cloning seq
Sep 13, 2022
429601d
Fixing compilation
Nov 28, 2022
57a104d
Addressing rebase artifacts
Nov 28, 2022
db15014
Fixing compilation
Nov 28, 2022
23a102a
Rebasing `ExpressionPayload` to rely on `HoodieAvroSerializer`
Nov 28, 2022
381f811
Fixing compilation (again)
Nov 28, 2022
09dfa3d
Restored missing config overrides
Nov 29, 2022
8819cde
Fixed missing source table's Avro schema being appended to write-opts
Nov 29, 2022
086d6fe
Simplify of MIT handling to make sure it provides for consistent beha…
Nov 29, 2022
5ed6b07
Fixed meta-fields filtering for Spark < 3.2;
Nov 29, 2022
2dd79a0
Fixing compilation
Dec 15, 2022
8b654f6
Cleaning up `attributeEquals` utility
Dec 21, 2022
ada0a7c
Allow partial assignments for updating clauses
Dec 21, 2022
82b17d6
Fixing tests
Dec 21, 2022
cd32d87
Reverting rebase artifacts
Dec 21, 2022
dd8e96a
Fixing more tests
Dec 21, 2022
3e8a182
Revisited canonicalization seq in `HoodieSparkSqlWriter` to also reco…
Dec 21, 2022
836c1cd
Tidying up utils
Dec 22, 2022
37c1d41
Handle superfluous casting appropriately in MIT
Dec 22, 2022
e2ee5e8
Tidying up
Dec 22, 2022
7f73e36
Revisited source dataset reshaping to make sure casing of primary-key…
Dec 22, 2022
cf7ae54
Reference target table attributes instead of ones from the condition
Dec 22, 2022
1e9defe
Reverting unnecessary changes
Dec 22, 2022
eb24027
Tidying up
Dec 22, 2022
b162d3f
Fixing compilation
Dec 22, 2022
e295c2d
Set "spark.testing" property
Dec 22, 2022
970f9ab
Fixed MIT-specific configs to not be overridable by any other configu…
Dec 23, 2022
cd71162
Fixing tests
Dec 23, 2022
96c6b92
Fixing tests (one more time)
Dec 23, 2022
b234436
Fixing presence check for the case-insensitive case
Dec 23, 2022
c06f9f8
Re-enabling tests
Dec 23, 2022
76cc5ff
Enable adapting resolution rules for Spark 2.x (previously Spark 3.x …
Dec 23, 2022
db4fa52
Merged Hudi's implementation resolution rules into `ResolveImplementa…
Dec 23, 2022
f41a134
Ported Spark SQL Merge Into statement resolution logic (from Spark 3.1)
Dec 23, 2022
3f68beb
Fixed tests error messages for Spark 2.x
Dec 23, 2022
23a9dc8
Fixed Spark 2 `ResolveReferences` rule to use appropriate resolver
Dec 23, 2022
4ec4095
Fixing compilation
Dec 23, 2022
721da89
Tidying up
Dec 23, 2022
cbdd8bd
Tidying up
Dec 23, 2022
0376b21
Tidying up
Dec 24, 2022
3cd25b0
Reverting mering of rules resolving Hudi impelementations
Dec 23, 2022
8a39dbd
Fixed Spark 2 resolution of Merge Into statement containing `update *…
Dec 24, 2022
69f677a
Added `HoodieSparkClientTestBase` properly injecting `HoodieSparkSess…
Dec 24, 2022
28daede
Simplified handling of the meta-fields to avoid `HoodieLogicalRelatio…
Dec 24, 2022
dd336b8
Cleaned up `HoodieLogicalRelationAdapter`
Dec 24, 2022
6576d53
Broke out `ResolveImplementationsEarly` from `ResolveImplementations`
Dec 24, 2022
bb24e01
Tidying up
Dec 24, 2022
75b3829
Make sure meta-fields are stripped out from both receiving/providing …
Dec 24, 2022
b32ab6b
Cleaned up `DeleteFromTableCommand`
Dec 24, 2022
2bba8f8
Cleaned up `UpdateTableCommand`
Dec 24, 2022
838e1ab
Fixed `resolveHoodieTable` to be able to resolve t/h projections and …
Dec 24, 2022
b4f8a58
Fixing MIT and UT to project out meta-field attributes instead of eli…
Dec 24, 2022
29f5b25
Fixed IIS to project out meta-fields from both receiving/providing en…
Dec 24, 2022
5d8f0da
Missing casting in `UpdateTable`
Dec 24, 2022
961c9ae
Cleaning up unnecessary meta-fields manipulations
Dec 24, 2022
7b3f3f3
Tidying up
Dec 24, 2022
5294286
Fixed `AdaptIngestionTargetLogicalRelations` to apply only to resolve…
Jan 6, 2023
d7b38db
Fixing tests
Jan 6, 2023
ebca9c4
Fixed Spark 2.x resolution logic for MIT to always manually resolve t…
Jan 6, 2023
4e4dba0
Revert "Fixed Spark 2.x resolution logic for MIT to always manually r…
Jan 6, 2023
04d1820
Avoiding hijacking of the resolution of the `MergeIntoTable` by defau…
Jan 6, 2023
d369b7a
Removing outdated test
Jan 6, 2023
3692878
Make sure MIT target and source tables could be processed independent…
Jan 6, 2023
edfcc04
Revisited `AdaptIngestionTargetLogicalRelations` to
Jan 7, 2023
43feb05
Fixing `TimeTravelRelation` to keep it as non-resolved node (to avoid…
Jan 7, 2023
a3f8cab
Fixed Hudi TVF to be properly resolved for Spark >= 3.2
Jan 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

package org.apache.hudi

import org.apache.avro.Schema.Type
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
import org.apache.avro.{JsonProperties, Schema}
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.spark.rdd.RDD
Expand All @@ -29,31 +28,9 @@ import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

object AvroConversionUtils {

/**
* Check the nullability of the input Avro type and resolve it when it is nullable. The first
* return value is a [[Boolean]] indicating if the input Avro type is nullable. The second
* return value is either provided Avro type if it's not nullable, or its resolved non-nullable part
* in case it is
*/
def resolveAvroTypeNullability(avroType: Schema): (Boolean, Schema) = {
if (avroType.getType == Type.UNION) {
val fields = avroType.getTypes.asScala
val actualType = fields.filter(_.getType != Type.NULL)
if (fields.length != 2 || actualType.length != 1) {
throw new AvroRuntimeException(
s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
"type is supported")
}
(true, actualType.head)
} else {
(false, avroType)
}
}

/**
* Creates converter to transform Avro payload into Spark's Catalyst one
*
Expand Down Expand Up @@ -104,7 +81,7 @@ object AvroConversionUtils {
recordNamespace: String): Row => GenericRecord = {
val serde = sparkAdapter.createSparkRowSerDe(sourceSqlType)
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName, recordNamespace)
val (nullable, _) = resolveAvroTypeNullability(avroSchema)
val nullable = AvroSchemaUtils.resolveNullableSchema(avroSchema) != avroSchema

val converter = AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, avroSchema, nullable)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.SPARK_VERSION
Expand Down Expand Up @@ -84,7 +84,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
// making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
// (and back)
val sameSchema = writerAvroSchema.equals(readerAvroSchema)
val (nullable, _) = AvroConversionUtils.resolveAvroTypeNullability(writerAvroSchema)
val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != writerAvroSchema

// NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
// serializer is not able to digest it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import scala.language.implicitConversions
*/
object JFunction {

def scalaFunction1Noop[T]: T => Unit = _ => {}

////////////////////////////////////////////////////////////
// From Java to Scala
////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@

package org.apache.spark.sql

import org.apache.hudi.SparkAdapterSupport.sparkAdapter
import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Like, Literal, MutableProjection, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeEq, AttributeReference, Cast, Expression, Like, Literal, MutableProjection, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import scala.annotation.tailrec

trait HoodieCatalystExpressionUtils {

// TODO scala-doc
def matchCast(expr: Expression): Option[(Expression, DataType, Option[String])]

/**
* Matches an expression iff
*
Expand Down Expand Up @@ -59,7 +61,7 @@ trait HoodieCatalystExpressionUtils {
def unapplyCastExpression(expr: Expression): Option[(Expression, DataType, Option[String], Boolean)]
}

object HoodieCatalystExpressionUtils {
object HoodieCatalystExpressionUtils extends SparkAdapterSupport {

/**
* Convenience extractor allowing to untuple [[Cast]] across Spark versions
Expand All @@ -69,6 +71,12 @@ object HoodieCatalystExpressionUtils {
sparkAdapter.getCatalystExpressionUtils.unapplyCastExpression(expr)
}

/**
* Leverages [[AttributeEquals]] predicate on 2 provided [[Attribute]]s
*/
def attributeEquals(one: Attribute, other: Attribute): Boolean =
new AttributeEq(one).equals(new AttributeEq(other))

/**
* Generates instance of [[UnsafeProjection]] projecting row of one [[StructType]] into another [[StructType]]
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.internal.SQLConf

trait HoodieCatalystPlansUtils {
Expand All @@ -48,47 +47,19 @@ trait HoodieCatalystPlansUtils {
*/
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan

/**
* Convert a AliasIdentifier to TableIdentifier.
*/
def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier

/**
* Convert a UnresolvedRelation to TableIdentifier.
*/
def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier

/**
* Create Join logical plan.
*/
def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join

/**
* Test if the logical plan is a Insert Into LogicalPlan.
*/
def isInsertInto(plan: LogicalPlan): Boolean

/**
* Get the member of the Insert Into LogicalPlan.
* Decomposes [[InsertIntoStatement]] into its arguments allowing to accommodate for API
* changes in Spark 3.3
*/
def getInsertIntoChildren(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]

/**
* if the logical plan is a TimeTravelRelation LogicalPlan.
*/
def isRelationTimeTravel(plan: LogicalPlan): Boolean

/**
* Get the member of the TimeTravelRelation LogicalPlan.
*/
def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])]

/**
* Create a Insert Into LogicalPlan.
*/
def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan
// TODO scala-docs
def rebaseInsertIntoStatement(iis: LogicalPlan, targetTable: LogicalPlan, query: LogicalPlan): LogicalPlan

/**
* Test if the logical plan is a Repair Table LogicalPlan.
Expand All @@ -98,6 +69,5 @@ trait HoodieCatalystPlansUtils {
/**
* Get the member of the Repair Table LogicalPlan.
*/
def getRepairTableChildren(plan: LogicalPlan):
Option[(TableIdentifier, Boolean, Boolean, String)]
def getRepairTableChildren(plan: LogicalPlan): Option[(TableIdentifier, Boolean, Boolean, String)]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

/**
* This class primarily serves as a proxy for [[AttributeEquals]] inaccessible outside
* the current package
*/
class AttributeEq(attr: Attribute) extends AttributeEquals(attr) {}
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
import org.apache.spark.sql.{HoodieCatalogUtils, HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SQLContext, SparkSession, SparkSessionExtensions}
import org.apache.spark.storage.StorageLevel

Expand All @@ -51,6 +50,12 @@ trait SparkAdapter extends Serializable {
*/
def isColumnarBatchRow(r: InternalRow): Boolean

/**
* Creates Catalyst [[Metadata]] for Hudi's meta-fields (designating these w/
* [[METADATA_COL_ATTR_KEY]] if available (available in Spark >= 3.2)
*/
def createCatalystMetadataForMetaField: Metadata

/**
* Inject table-valued functions to SparkSessionExtensions
*/
Expand Down Expand Up @@ -99,30 +104,31 @@ trait SparkAdapter extends Serializable {
/**
* Create the hoodie's extended spark sql parser.
*/
def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = None
def createExtendedSparkParser(spark: SparkSession, delegate: ParserInterface): HoodieExtendedParserInterface

/**
* Create the SparkParsePartitionUtil.
*/
def getSparkParsePartitionUtil: SparkParsePartitionUtil

/**
* ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called.
*/
def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String]

/**
* Combine [[PartitionedFile]] to [[FilePartition]] according to `maxSplitBytes`.
*/
def getFilePartitions(sparkSession: SparkSession, partitionedFiles: Seq[PartitionedFile],
maxSplitBytes: Long): Seq[FilePartition]

def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
unfoldSubqueryAliases(table) match {
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
case relation: UnresolvedRelation =>
isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
case _=> false
/**
* Checks whether [[LogicalPlan]] refers to Hudi table, and if it's the case extracts
* corresponding [[CatalogTable]]
*/
def resolveHoodieTable(plan: LogicalPlan): Option[CatalogTable] = {
EliminateSubqueryAliases(plan) match {
// First, we need to weed out unresolved plans
case plan if !plan.resolved => None
// NOTE: When resolving Hudi table we allow [[Filter]]s and [[Project]]s be applied
// on top of it
case PhysicalOperation(_, _, LogicalRelation(_, _, Some(table), _)) if isHoodieTable(table) => Some(table)
case _ => None
}
}

Expand All @@ -139,15 +145,6 @@ trait SparkAdapter extends Serializable {
isHoodieTable(table)
}

protected def unfoldSubqueryAliases(plan: LogicalPlan): LogicalPlan = {
plan match {
case SubqueryAlias(_, relation: LogicalPlan) =>
unfoldSubqueryAliases(relation)
case other =>
other
}
}

/**
* Create instance of [[ParquetFileFormat]]
*/
Expand Down Expand Up @@ -179,28 +176,12 @@ trait SparkAdapter extends Serializable {
readDataSchema: StructType,
metadataColumns: Seq[AttributeReference] = Seq.empty): FileScanRDD

/**
* Resolve [[DeleteFromTable]]
* SPARK-38626 condition is no longer Option in Spark 3.3
*/
def resolveDeleteFromTable(deleteFromTable: Command,
resolveExpression: Expression => Expression): LogicalPlan

/**
* Extract condition in [[DeleteFromTable]]
* SPARK-38626 condition is no longer Option in Spark 3.3
*/
def extractDeleteCondition(deleteFromTable: Command): Expression

/**
* Get parseQuery from ExtendedSqlParser, only for Spark 3.3+
*/
def getQueryParserFromExtendedSqlParser(session: SparkSession, delegate: ParserInterface,
sqlText: String): LogicalPlan = {
// unsupported by default
throw new UnsupportedOperationException(s"Unsupported parseQuery method in Spark earlier than Spark 3.3.0")
}

/**
* Converts instance of [[StorageLevel]] to a corresponding string
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.parser

import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* This trait helps us to bridge compatibility gap of [[ParserInterface]] b/w different
* Spark versions
*/
trait HoodieExtendedParserInterface extends ParserInterface {

def parseQuery(sqlText: String): LogicalPlan = {
throw new UnsupportedOperationException(s"Unsupported, parseQuery is implemented in Spark >= 3.3.0")
}

def parseMultipartIdentifier(sqlText: String): Seq[String] = {
throw new UnsupportedOperationException(s"Unsupported, parseMultipartIdentifier is implemented in Spark >= 3.0.0")
}

}
Loading