From a40e28d89e5dd70ace9842e25ad7745efaf490e9 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 17 Oct 2022 16:33:49 +1300 Subject: [PATCH 1/5] fix issue --- .../spark/internal/config/package.scala | 2 +- .../execution/HiveSerDeReadWriteSuite.scala | 38 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 64801712c5fbd..4c00b13e9b5ec 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1111,7 +1111,7 @@ package object config { .doc("When true, HadoopRDD/NewHadoopRDD will not create partitions for empty input splits.") .version("2.3.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) private[spark] val SECRET_REDACTION_PATTERN = ConfigBuilder("spark.redaction.regex") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 3de2489f8deff..047acff7c02f9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.hive.execution +import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.Files import java.sql.{Date, Timestamp} import org.apache.spark.sql.{QueryTest, Row} @@ -218,4 +221,39 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS checkAnswer(spark.table("t1"), Seq(Row(Array("SPARK-34512", "HIVE-24797")))) } } + + test("SPARK-40815: Read SymlinkTextInputFormat") { + withTable("t") { + withTempDir { root => + val dataPath = new File(root, "data") + val symlinkPath = new File(root, "symlink") + + spark.range(10).selectExpr("cast(id as string) as value") + .repartition(4).write.text(dataPath.getAbsolutePath) + + // Generate symlink manifest file. + val files = dataPath.listFiles().filter(_.getName.endsWith(".txt")) + assert(files.length > 0) + + symlinkPath.mkdir() + Files.write( + new File(symlinkPath, "symlink.txt").toPath, + files.mkString("\n").getBytes(StandardCharsets.UTF_8) + ) + + sql(s""" + CREATE TABLE t (id bigint) + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + LOCATION '${symlinkPath.getAbsolutePath}'; + """) + + checkAnswer( + sql("SELECT id FROM t ORDER BY id ASC"), + (0 until 10).map(Row(_)) + ) + } + } + } } From 23c9f0f42ff42b406b8c807cb3449c94ed83e599 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 20 Oct 2022 19:16:52 +1300 Subject: [PATCH 2/5] use DelegateSymlinkTextInputFormat --- .../spark/internal/config/package.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 12 ++ .../ql/io/DelegateSymlinkTextInputFormat.java | 111 ++++++++++++++++++ .../hive/execution/HiveTableScanExec.scala | 18 ++- .../execution/HiveSerDeReadWriteSuite.scala | 11 +- 5 files changed, 151 insertions(+), 3 deletions(-) create mode 100644 sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 4c00b13e9b5ec..64801712c5fbd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1111,7 +1111,7 @@ package object config { .doc("When true, HadoopRDD/NewHadoopRDD will not create partitions for empty input splits.") .version("2.3.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) private[spark] val SECRET_REDACTION_PATTERN = ConfigBuilder("spark.redaction.regex") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2f96209222b2f..701960d1ee4a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1383,6 +1383,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT = + buildConf("spark.sql.hive.useDelegateForSymlinkTextInputFormat") + .internal() + .doc("When true, SymlinkTextInputFormat is replaced with a similar delegate class during " + + "table scan in order to fix the issue of empty splits") + .version("3.4.0") + .booleanConf + .createWithDefault(true) + val PARTITION_COLUMN_TYPE_INFERENCE = buildConf("spark.sql.sources.partitionColumnTypeInference.enabled") .doc("When true, automatically infer the data types for partitioned columns.") @@ -4278,6 +4287,9 @@ class SQLConf extends Serializable with Logging { def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT) + def useDelegateForSymlinkTextInputFormat: Boolean = + getConf(USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT) + def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY) def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED) diff --git a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java new file mode 100644 index 0000000000000..af096e3d44e9d --- /dev/null +++ b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * Delegate for SymlinkTextInputFormat, created to address SPARK-40815. + * Fixes an issue where SymlinkTextInputFormat returns empty splits which could result in + * the correctness issue when "spark.hadoopRDD.ignoreEmptySplits" is enabled. + * + * In this class, we update the split start and length to match the target file input thus fixing + * the issue. + */ +@SuppressWarnings("deprecation") +public class DelegateSymlinkTextInputFormat extends SymlinkTextInputFormat { + + public static class DelegateSymlinkTextInputSplit extends FileSplit { + private final SymlinkTextInputSplit split; + + public DelegateSymlinkTextInputSplit() { + super((Path) null, 0, 0, (String[]) null); + split = new SymlinkTextInputSplit(); + } + + public DelegateSymlinkTextInputSplit(Path symlinkPath, SymlinkTextInputSplit split) throws IOException { + // It is fine to set start and length to the target file split because + // SymlinkTextInputFormat maintains 1-1 mapping between SymlinkTextInputSplit and FileSplit. + super(symlinkPath, + split.getTargetSplit().getStart(), + split.getTargetSplit().getLength(), + split.getTargetSplit().getLocations()); + this.split = split; + } + + /** + * Returns delegate input split. + */ + private SymlinkTextInputSplit getSplit() { + return split; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + split.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + split.readFields(in); + } + } + + @Override + public RecordReader getRecordReader( + InputSplit split, JobConf job, Reporter reporter) throws IOException { + InputSplit targetSplit = ((DelegateSymlinkTextInputSplit) split).getSplit(); + return super.getRecordReader(targetSplit, job, reporter); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) + throws IOException { + InputSplit[] splits = super.getSplits(job, numSplits); + for (int i = 0; i < splits.length; i++) { + SymlinkTextInputSplit split = (SymlinkTextInputSplit) splits[i]; + splits[i] = new DelegateSymlinkTextInputSplit(split.getPath(), split); + } + return splits; + } + + @Override + public void configure(JobConf job) { + super.configure(job); + } + + @Override + public ContentSummary getContentSummary(Path p, JobConf job) + throws IOException { + return super.getContentSummary(p, job); + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 05dd3ba6f5567..bf3be09633968 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -20,12 +20,14 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.ql.io.{DelegateSymlinkTextInputFormat, SymlinkTextInputFormat} import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.mapred.InputFormat import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession @@ -89,7 +91,7 @@ case class HiveTableScanExec( @transient private lazy val hiveQlTable = HiveClientImpl.toHiveTable(relation.tableMeta) @transient private lazy val tableDesc = new TableDesc( - hiveQlTable.getInputFormatClass, + getInputFormat(hiveQlTable.getInputFormatClass, conf), hiveQlTable.getOutputFormatClass, hiveQlTable.getMetadata) @@ -231,6 +233,20 @@ case class HiveTableScanExec( predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)) } + // Optionally returns a delegate input format based on the provided input format class. + // This is currently used to replace SymlinkTextInputFormat with DelegateSymlinkTextInputFormat + // in order to fix SPARK-40815. + private def getInputFormat( + inputFormatClass: Class[_ <: InputFormat[_, _]], + conf: SQLConf): Class[_ <: InputFormat[_, _]] = { + if (inputFormatClass == classOf[SymlinkTextInputFormat] && + conf != null && conf.useDelegateForSymlinkTextInputFormat) { + classOf[DelegateSymlinkTextInputFormat] + } else { + inputFormatClass + } + } + override def doCanonicalize(): HiveTableScanExec = { val input: AttributeSeq = relation.output HiveTableScanExec( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 047acff7c02f9..9945d4280c0d1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -25,7 +25,7 @@ import java.sql.{Date, Timestamp} import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION +import org.apache.spark.sql.internal.SQLConf.{ORC_IMPLEMENTATION, USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT} import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.tags.SlowHiveTest @@ -253,6 +253,15 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS sql("SELECT id FROM t ORDER BY id ASC"), (0 until 10).map(Row(_)) ) + + // Verify that with the flag disabled, we use the original SymlinkTextInputFormat + // which has the empty splits issue and therefore the result should be empty. + withSQLConf(USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT.key -> "false") { + checkAnswer( + sql("SELECT id FROM t ORDER BY id ASC"), + Seq.empty[Row] + ) + } } } } From 2ff2c5e8ac3fe72aa98bf238252d1cdb814268dd Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Wed, 26 Oct 2022 14:44:58 +1300 Subject: [PATCH 3/5] address comments --- .../apache/spark/sql/internal/SQLConf.scala | 12 ----- .../ql/io/DelegateSymlinkTextInputFormat.java | 41 +++++++++------ .../org/apache/spark/sql/hive/HiveUtils.scala | 9 ++++ .../hive/execution/HiveTableScanExec.scala | 2 +- .../execution/HiveSerDeReadWriteSuite.scala | 50 +++++++++++++++++-- 5 files changed, 82 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 701960d1ee4a0..2f96209222b2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1383,15 +1383,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT = - buildConf("spark.sql.hive.useDelegateForSymlinkTextInputFormat") - .internal() - .doc("When true, SymlinkTextInputFormat is replaced with a similar delegate class during " + - "table scan in order to fix the issue of empty splits") - .version("3.4.0") - .booleanConf - .createWithDefault(true) - val PARTITION_COLUMN_TYPE_INFERENCE = buildConf("spark.sql.sources.partitionColumnTypeInference.enabled") .doc("When true, automatically infer the data types for partitioned columns.") @@ -4287,9 +4278,6 @@ class SQLConf extends Serializable with Logging { def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT) - def useDelegateForSymlinkTextInputFormat: Boolean = - getConf(USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT) - def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY) def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED) diff --git a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java index af096e3d44e9d..14dded4ce40f5 100644 --- a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java +++ b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java @@ -43,40 +43,53 @@ public class DelegateSymlinkTextInputFormat extends SymlinkTextInputFormat { public static class DelegateSymlinkTextInputSplit extends FileSplit { - private final SymlinkTextInputSplit split; + private Path targetPath; // Path to the actual data file, not the symlink file. + // Used for deserialisation. public DelegateSymlinkTextInputSplit() { super((Path) null, 0, 0, (String[]) null); - split = new SymlinkTextInputSplit(); + targetPath = null; } - public DelegateSymlinkTextInputSplit(Path symlinkPath, SymlinkTextInputSplit split) throws IOException { + public DelegateSymlinkTextInputSplit(SymlinkTextInputSplit split) throws IOException { // It is fine to set start and length to the target file split because // SymlinkTextInputFormat maintains 1-1 mapping between SymlinkTextInputSplit and FileSplit. - super(symlinkPath, + super(split.getPath(), split.getTargetSplit().getStart(), split.getTargetSplit().getLength(), split.getTargetSplit().getLocations()); - this.split = split; + this.targetPath = split.getTargetSplit().getPath(); } /** - * Returns delegate input split. + * Returns target path. + * Visible for testing. */ - private SymlinkTextInputSplit getSplit() { - return split; + public Path getTargetPath() { + return targetPath; + } + + /** + * Reconstructs the delegate input split. + */ + private SymlinkTextInputSplit getSplit() throws IOException { + return new SymlinkTextInputSplit( + getPath(), + new FileSplit(targetPath, getStart(), getLength(), getLocations()) + ); } @Override public void write(DataOutput out) throws IOException { super.write(out); - split.write(out); + Text.writeString(out, (this.targetPath != null) ? this.targetPath.toString() : ""); } @Override public void readFields(DataInput in) throws IOException { super.readFields(in); - split.readFields(in); + String target = Text.readString(in); + this.targetPath = (!target.isEmpty()) ? new Path(target) : null; } } @@ -88,12 +101,11 @@ public RecordReader getRecordReader( } @Override - public InputSplit[] getSplits(JobConf job, int numSplits) - throws IOException { + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { InputSplit[] splits = super.getSplits(job, numSplits); for (int i = 0; i < splits.length; i++) { SymlinkTextInputSplit split = (SymlinkTextInputSplit) splits[i]; - splits[i] = new DelegateSymlinkTextInputSplit(split.getPath(), split); + splits[i] = new DelegateSymlinkTextInputSplit(split); } return splits; } @@ -104,8 +116,7 @@ public void configure(JobConf job) { } @Override - public ContentSummary getContentSummary(Path p, JobConf job) - throws IOException { + public ContentSummary getContentSummary(Path p, JobConf job) throws IOException { return super.getContentSummary(p, job); } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 143bcff95f88f..e00b22abc68d5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -198,6 +198,15 @@ private[spark] object HiveUtils extends Logging { .booleanConf .createWithDefault(true) + val USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT = + buildConf("spark.sql.hive.useDelegateForSymlinkTextInputFormat") + .internal() + .doc("When true, SymlinkTextInputFormat is replaced with a similar delegate class during " + + "table scan in order to fix the issue of empty splits") + .version("3.4.0") + .booleanConf + .createWithDefault(true) + /** * The version of the hive client that will be used to communicate with the metastore. Note that * this does not necessarily need to be the same version of Hive that is used internally by diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index bf3be09633968..63e7d28c42ad9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -240,7 +240,7 @@ case class HiveTableScanExec( inputFormatClass: Class[_ <: InputFormat[_, _]], conf: SQLConf): Class[_ <: InputFormat[_, _]] = { if (inputFormatClass == classOf[SymlinkTextInputFormat] && - conf != null && conf.useDelegateForSymlinkTextInputFormat) { + conf != null && conf.getConf(HiveUtils.USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT)) { classOf[DelegateSymlinkTextInputFormat] } else { inputFormatClass diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 9945d4280c0d1..6c509297c1ab9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -17,15 +17,19 @@ package org.apache.spark.sql.hive.execution -import java.io.File +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File} import java.nio.charset.StandardCharsets import java.nio.file.Files import java.sql.{Date, Timestamp} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.io.{DelegateSymlinkTextInputFormat, SymlinkTextInputFormat} +import org.apache.hadoop.mapred.FileSplit; +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} +import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET, USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT} import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf.{ORC_IMPLEMENTATION, USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT} +import org.apache.spark.sql.internal.SQLConf.{ORC_IMPLEMENTATION} import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.tags.SlowHiveTest @@ -222,6 +226,41 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS } } + test("SPARK-40815: DelegateSymlinkTextInputFormat serialization") { + def assertSerDe(split: DelegateSymlinkTextInputFormat.DelegateSymlinkTextInputSplit): Unit = { + val buf = new ByteArrayOutputStream() + val out = new DataOutputStream(buf) + try { + split.write(out) + } finally { + out.close() + } + + val res = new DelegateSymlinkTextInputFormat.DelegateSymlinkTextInputSplit() + val in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())) + try { + res.readFields(in) + } finally { + in.close() + } + + assert(split.getPath == res.getPath) + assert(split.getStart == res.getStart) + assert(split.getLength == res.getLength) + assert(split.getLocations.toSeq == res.getLocations.toSeq) + assert(split.getTargetPath == res.getTargetPath) + } + + assertSerDe( + new DelegateSymlinkTextInputFormat.DelegateSymlinkTextInputSplit( + new SymlinkTextInputFormat.SymlinkTextInputSplit( + new Path("file:/tmp/symlink"), + new FileSplit(new Path("file:/tmp/file"), 1L, 2L, Array[String]()) + ) + ) + ) + } + test("SPARK-40815: Read SymlinkTextInputFormat") { withTable("t") { withTempDir { root => @@ -256,7 +295,10 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS // Verify that with the flag disabled, we use the original SymlinkTextInputFormat // which has the empty splits issue and therefore the result should be empty. - withSQLConf(USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT.key -> "false") { + withSQLConf( + HADOOP_RDD_IGNORE_EMPTY_SPLITS.key -> "true", + USE_DELEGATE_FOR_SYMLINK_TEXT_INPUT_FORMAT.key -> "false") { + checkAnswer( sql("SELECT id FROM t ORDER BY id ASC"), Seq.empty[Row] From 4c0d250a3b74ed9eeeccfd24ea64ff46ff8f9b34 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Thu, 27 Oct 2022 18:14:46 +1300 Subject: [PATCH 4/5] trigger ci From b65a2c283c42f9f1b659ea3cd49de6446d60181c Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Tue, 1 Nov 2022 10:39:41 +1300 Subject: [PATCH 5/5] update javadoc and address comments --- .../hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java index 14dded4ce40f5..25420f74f2f2f 100644 --- a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java +++ b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java @@ -35,11 +35,10 @@ * Delegate for SymlinkTextInputFormat, created to address SPARK-40815. * Fixes an issue where SymlinkTextInputFormat returns empty splits which could result in * the correctness issue when "spark.hadoopRDD.ignoreEmptySplits" is enabled. - * + *

* In this class, we update the split start and length to match the target file input thus fixing * the issue. */ -@SuppressWarnings("deprecation") public class DelegateSymlinkTextInputFormat extends SymlinkTextInputFormat { public static class DelegateSymlinkTextInputSplit extends FileSplit {