Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException
Expand Down Expand Up @@ -696,4 +697,47 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
}

test("Test Insert Into With Catalog Identifier for spark >= 3.2.0") {
Seq("hudi", "parquet").foreach { format =>
withTempDir { tmp =>
val tableName = s"spark_catalog.default.$generateTableName"
// Create a partitioned table
if (HoodieSparkUtils.gteqSpark3_2) {
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using $format
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}'
""".stripMargin)
// Insert into dynamic partition
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-01-05' as dt
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)
// Insert into static partition
spark.sql(
s"""
| insert into $tableName partition(dt = '2021-01-05')
| select 2 as id, 'a2' as name, 10 as price, 1000 as ts
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 10.0, 1000, "2021-01-05")
)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,31 @@ package org.apache.spark.sql.adapter

import org.apache.hudi.Spark3RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.SPARK_VERSION
import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like, Predicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Row, SparkSession}

import scala.util.control.NonFatal

/**
* Base implementation of [[SparkAdapter]] for Spark 3.x branch
*/
abstract class BaseSpark3Adapter extends SparkAdapter {
abstract class BaseSpark3Adapter extends SparkAdapter with Logging {

override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = {
new Spark3RowSerDe(encoder)
Expand Down Expand Up @@ -115,7 +116,13 @@ abstract class BaseSpark3Adapter extends SparkAdapter {
unfoldSubqueryAliases(table) match {
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
case relation: UnresolvedRelation =>
isHoodieTable(toTableIdentifier(relation), spark)
try {
isHoodieTable(toTableIdentifier(relation), spark)
} catch {
case NonFatal(e) =>
logWarning("Failed to determine whether the table is a hoodie table", e)
false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add log warn printing to facilitate troubleshooting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties())
case _=> false
}
Expand Down