Skip to content

Commit 7916d72

Browse files
committed
https://github.com/apache/spark/pull/29085#discussion_r458692902
1 parent f5ec656 commit 7916d72

2 files changed

Lines changed: 95 additions & 39 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog}
4343
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
4444
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
4545
import org.apache.spark.sql.internal.SQLConf
46-
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
4746
import org.apache.spark.sql.types._
4847
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
4948
import org.apache.spark.util.random.RandomSampler
@@ -755,19 +754,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
755754
outRowFormat: RowFormatContext,
756755
recordReader: Token,
757756
schemaLess: Boolean): ScriptInputOutputSchema = {
758-
if (recordWriter != null || recordReader != null) {
759-
// TODO: what does this message mean?
760-
throw new ParseException(
761-
"Unsupported operation: Used defined record reader/writer classes.", ctx)
762-
}
763-
764757
// Decode and input/output format.
765758
type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
766759

767-
def format(
768-
fmt: RowFormatContext,
769-
configKey: String,
770-
defaultConfigValue: String): Format = fmt match {
760+
def format(fmt: RowFormatContext): Format = fmt match {
771761
case c: RowFormatDelimitedContext =>
772762
// TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
773763
// expects a seq of pairs in which the old parsers' token names are used as keys.
@@ -785,43 +775,18 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
785775

786776
(entries, None, Seq.empty, None)
787777

788-
case c: RowFormatContext if !conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") =>
789-
throw new ParseException("TRANSFORM with serde is only supported in hive mode", ctx)
790-
791778
case c: RowFormatSerdeContext =>
792-
// Use a serde format.
793-
val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c)
794-
795-
// SPARK-10310: Special cases LazySimpleSerDe
796-
val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
797-
Option(conf.getConfString(configKey, defaultConfigValue))
798-
} else {
799-
None
800-
}
801-
(Seq.empty, Option(name), props.toSeq, recordHandler)
802-
803-
case null if conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") =>
804-
// Use default (serde) format.
805-
val name = conf.getConfString("hive.script.serde",
806-
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
807-
val props = Seq("field.delim" -> "\t")
808-
val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue))
809-
(Nil, Option(name), props, recordHandler)
779+
throw new ParseException("TRANSFORM with serde is only supported in hive mode", ctx)
810780

811781
// SPARK-32106: When there is no definition about format, we return empty result
812782
// to use a built-in default Serde in SparkScriptTransformationExec.
813783
case null =>
814784
(Nil, None, Seq.empty, None)
815785
}
816786

817-
val (inFormat, inSerdeClass, inSerdeProps, reader) =
818-
format(
819-
inRowFormat, "hive.script.recordreader", "org.apache.hadoop.hive.ql.exec.TextRecordReader")
787+
val (inFormat, inSerdeClass, inSerdeProps, reader) = format(inRowFormat)
820788

821-
val (outFormat, outSerdeClass, outSerdeProps, writer) =
822-
format(
823-
outRowFormat, "hive.script.recordwriter",
824-
"org.apache.hadoop.hive.ql.exec.TextRecordWriter")
789+
val (outFormat, outSerdeClass, outSerdeProps, writer) = format(outRowFormat)
825790

826791
ScriptInputOutputSchema(
827792
inFormat, outFormat,

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
3535
import org.apache.spark.sql.execution.command._
3636
import org.apache.spark.sql.execution.datasources._
3737
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
38+
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
3839
import org.apache.spark.sql.types.StructType
3940

4041
/**
@@ -663,6 +664,96 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
663664
}
664665
}
665666

667+
/**
668+
* Create a hive serde [[ScriptInputOutputSchema]].
669+
*/
670+
override protected def withScriptIOSchema(
671+
ctx: ParserRuleContext,
672+
inRowFormat: RowFormatContext,
673+
recordWriter: Token,
674+
outRowFormat: RowFormatContext,
675+
recordReader: Token,
676+
schemaLess: Boolean): ScriptInputOutputSchema = {
677+
if (recordWriter != null || recordReader != null) {
678+
// TODO: what does this message mean?
679+
throw new ParseException(
680+
"Unsupported operation: Used defined record reader/writer classes.", ctx)
681+
}
682+
683+
if (!conf.getConf(CATALOG_IMPLEMENTATION).equals("hive")) {
684+
super.withScriptIOSchema(
685+
ctx,
686+
inRowFormat,
687+
recordWriter,
688+
outRowFormat,
689+
recordReader,
690+
schemaLess)
691+
} else {
692+
693+
// Decode and input/output format.
694+
type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
695+
696+
def format(
697+
fmt: RowFormatContext,
698+
configKey: String,
699+
defaultConfigValue: String): Format = fmt match {
700+
case c: RowFormatDelimitedContext =>
701+
// TODO we should use visitRowFormatDelimited function here. However HiveScriptIOSchema
702+
// expects a seq of pairs in which the old parsers' token names are used as keys.
703+
// Transforming the result of visitRowFormatDelimited would be quite a bit messier than
704+
// retrieving the key value pairs ourselves.
705+
def entry(key: String, value: Token): Seq[(String, String)] = {
706+
Option(value).map(t => key -> t.getText).toSeq
707+
}
708+
709+
val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
710+
entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
711+
entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
712+
entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
713+
entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
714+
715+
(entries, None, Seq.empty, None)
716+
717+
case c: RowFormatSerdeContext =>
718+
// Use a serde format.
719+
val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c)
720+
721+
// SPARK-10310: Special cases LazySimpleSerDe
722+
val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
723+
Option(conf.getConfString(configKey, defaultConfigValue))
724+
} else {
725+
None
726+
}
727+
(Seq.empty, Option(name), props.toSeq, recordHandler)
728+
729+
case null =>
730+
// Use default (serde) format.
731+
val name = conf.getConfString("hive.script.serde",
732+
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
733+
val props = Seq("field.delim" -> "\t")
734+
val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue))
735+
(Nil, Option(name), props, recordHandler)
736+
}
737+
738+
val (inFormat, inSerdeClass, inSerdeProps, reader) =
739+
format(
740+
inRowFormat, "hive.script.recordreader",
741+
"org.apache.hadoop.hive.ql.exec.TextRecordReader")
742+
743+
val (outFormat, outSerdeClass, outSerdeProps, writer) =
744+
format(
745+
outRowFormat, "hive.script.recordwriter",
746+
"org.apache.hadoop.hive.ql.exec.TextRecordWriter")
747+
748+
ScriptInputOutputSchema(
749+
inFormat, outFormat,
750+
inSerdeClass, outSerdeClass,
751+
inSerdeProps, outSerdeProps,
752+
reader, writer,
753+
schemaLess)
754+
}
755+
}
756+
666757
/**
667758
* Create a clause for DISTRIBUTE BY.
668759
*/

0 commit comments

Comments
 (0)