diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java index 6a994b49d3a..42b3e60a758 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java @@ -21,6 +21,8 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; import org.apache.spark.shuffle.IndexShuffleBlockResolver; import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter; import org.apache.spark.util.Utils; @@ -49,8 +51,12 @@ public class LocalDiskSingleSpillMapOutputWriter // The map spill file already has the proper format, and it contains all of the partition data. // So just transfer it directly to the destination without any merging. File outputFile = blockResolver.getDataFile(shuffleId, mapId); + File tempFile = Utils.tempFileWith(outputFile); Files.move(mapSpillFile.toPath(), tempFile.toPath()); + if (TaskContext$.MODULE$.get().isOuterTablePart1() ) { + TaskContext$.MODULE$.get().setShuffleFile(new File[]{outputFile, tempFile}); + } blockResolver .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, checksums, tempFile); } diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index a660bccd2e6..12864a9f225 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -295,6 +295,21 @@ private class ShuffleStatus( * remove outputs which are served by an external shuffle server (if one exists). */ def removeOutputsByFilter(f: BlockManagerId => Boolean): Unit = withWriteLock { + /* + for (mapIndex <- mapStatuses.indices) { + val currentMapStatus = mapStatuses(mapIndex) + if (currentMapStatus != null && f(currentMapStatus.location)) { + _numAvailableMapOutputs -= 1 + mapIdToMapIndex.remove(currentMapStatus.mapId) + mapStatusesDeleted(mapIndex) = currentMapStatus + mapStatuses(mapIndex) = null + invalidateSerializedMapOutputStatusCache() + } + } */ + } + + def removeOutputsByFilterX(f: BlockManagerId => Boolean): Unit = withWriteLock { + for (mapIndex <- mapStatuses.indices) { val currentMapStatus = mapStatuses(mapIndex) if (currentMapStatus != null && f(currentMapStatus.location)) { @@ -857,7 +872,7 @@ private[spark] class MapOutputTrackerMaster( def unregisterAllMapAndMergeOutput(shuffleId: Int): Unit = { shuffleStatuses.get(shuffleId) match { case Some(shuffleStatus) => - shuffleStatus.removeOutputsByFilter(x => true) + shuffleStatus.removeOutputsByFilterX(x => true) shuffleStatus.removeMergeResultsByFilter(x => true) shuffleStatus.removeShuffleMergerLocations() incrementEpoch() diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 30d772bd62d..0d552715d96 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -3266,7 +3266,7 @@ object SparkContext extends Logging { import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure. - val MAX_LOCAL_TASK_FAILURES = 1 + val MAX_LOCAL_TASK_FAILURES = 4 // Ensure that default executor's resources satisfies one or more tasks requirement. // This function is for cluster managers that don't set the executor cores config, for diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 5384fd86a8f..81fdcac0fd7 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -17,20 +17,33 @@ package org.apache.spark -import java.io.Closeable +import java.io.{Closeable, File} import java.util.Properties - import org.apache.spark.annotation.{DeveloperApi, Evolving, Since} import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.Source import org.apache.spark.resource.ResourceInformation -import org.apache.spark.scheduler.Task +import org.apache.spark.scheduler.{ResultTask, Task} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.{AccumulatorV2, TaskCompletionListener, TaskFailureListener} object TaskContext { + + private var shuffleFileToDelete: Option[Array[File]] = None + private var failResult = false + var resultTaskCount: Int = 0 + + def setFailResult(): Unit = { + this.failResult = true + } + + def unsetFailResult(): Unit = { + this.failResult = false + } + + def getFailResult: Boolean = this.failResult /** * Return the currently active TaskContext. This can be called inside of * user functions to access contextual information about running tasks. @@ -71,6 +84,19 @@ object TaskContext { new TaskContextImpl(0, 0, 0, 0, 0, 1, null, new Properties, null, TaskMetrics.empty, 1) } + + def deleteShuffleFile(): Unit = { + import scala.sys.process._ + val delete = this.shuffleFileToDelete.nonEmpty + this.shuffleFileToDelete.foreach(files => files.foreach { f => + assert(Process(Seq("rm", "-rf", f.getPath)).! == 0) + + }) + if (delete) { + assert(Process(Seq("rm", "-rf", "/private/tmp/bug/blockmgr*/**/temp_shuffle*")).! == 0) + } + this.shuffleFileToDelete = None + } } @@ -82,6 +108,8 @@ object TaskContext { * }}} */ abstract class TaskContext extends Serializable { + + private var shuffleFile: Option[File] = None // Note: TaskContext must NOT define a get method. Otherwise it will prevent the Scala compiler // from generating a static get method (based on the companion object's get method). @@ -89,6 +117,20 @@ abstract class TaskContext extends Serializable { // Note: getters in this class are defined with parentheses to maintain backward compatibility. + + + def setShuffleFile(files: Array[File]): Unit = { + TaskContext.shuffleFileToDelete = Some(files) + // throw new RuntimeException("test") + } + + private var isOuterTable: Boolean = false + + def setIsOuterTablePart1(): Unit = { + isOuterTable = true + } + + def isOuterTablePart1: Boolean = this.isOuterTable /** * Returns true if the task has completed. */ @@ -168,7 +210,18 @@ abstract class TaskContext extends Serializable { // context too within run(). If that's the case, kill the thread before it starts executing // the actual task. killTaskIfInterrupted() - task.runTask(this) + task match { + case r: ResultTask[_, _] => TaskContext.synchronized { + TaskContext.resultTaskCount += 1 + if (r.partitionId == 1) { + TaskContext.deleteShuffleFile() + } + } + case _ => TaskContext.resultTaskCount = 0 + } + val x = task.runTask(this) + + x } catch { case e: Throwable => // Catch all errors; run task failure and completion callbacks, and rethrow the exception. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5dda7afc3eb..0d20f4459ab 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1700,7 +1700,7 @@ package object config { "map-side aggregation and there are at most this many reduce partitions") .version("1.1.1") .intConf - .createWithDefault(200) + .createWithDefault(1) private[spark] val SHUFFLE_MANAGER = ConfigBuilder("spark.shuffle.manager") diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 05bcafdb14d..6a7a6b7d50b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -103,6 +103,12 @@ private[spark] class ShuffleMapTask( } else { context.taskAttemptId() } + + if (TaskContext.getFailResult && partition.toString.contains("outer") && + partitionId == 1) { + TaskContext.get().setIsOuterTablePart1() + TaskContext.unsetFailResult() + } dep.shuffleWriterProcessor.write( rdd.iterator(partition, context), dep, diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/api/src/main/scala/org/apache/spark/sql/Encoders.scala index 4957d76af9a..feb3b0ae5e3 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/Encoders.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/Encoders.scala @@ -261,7 +261,7 @@ object Encoders { TransformingEncoder(classTag[T], BinaryEncoder, provider) } - private[sql] def tupleEncoder[T](encoders: Encoder[_]*): Encoder[T] = { + def tupleEncoder[T](encoders: Encoder[_]*): Encoder[T] = { ProductEncoder.tuple(encoders.map(agnosticEncoderFor(_))).asInstanceOf[Encoder[T]] } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 6e19a1d6bbc..5cc7ff41f8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -316,7 +316,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * Returns an expression that will produce a valid partition ID(i.e. non-negative and is less * than numPartitions) based on hashing expressions. */ - def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions)) + def partitionIdExpression: Expression = Pmod(expressions.head, Literal(numPartitions)) override protected def withNewChildrenInternal( newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7b560002ede..fce1a67d826 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -815,7 +815,7 @@ object SQLConf { s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), to avoid too many small tasks.") .version("3.0.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val COALESCE_PARTITIONS_PARALLELISM_FIRST = buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst") @@ -1741,7 +1741,7 @@ object SQLConf { "side. This could help to eliminate unnecessary shuffles") .version("3.4.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED = buildConf("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 226debc9764..86d03fcd4dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -618,12 +618,15 @@ case class FileSourceScanExec( // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsColumnar: Boolean = { - val conf = relation.sparkSession.sessionState.conf + /* val conf = relation.sparkSession.sessionState.conf // Only output columnar if there is WSCG to read it. val requiredWholeStageCodegenSettings = conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, schema) requiredWholeStageCodegenSettings && relation.fileFormat.supportBatch(relation.sparkSession, schema) + + */ + false } private lazy val needsUnsafeRowConversion: Boolean = { @@ -668,6 +671,7 @@ case class FileSourceScanExec( toUnsafe.initialize(index) iter.map { row => numOutputRows += 1 + // println(s"row iter ${row.getString(1)}") toUnsafe(row) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala index 50af845c37c..fa05bcea7c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala @@ -51,6 +51,9 @@ case class FilePartition(index: Int, files: Array[PartitionedFile]) case (host, numBytes) => host }.toArray } + + override def toString(): String = s"partition index = $index. files =" + + s" ${files.map(f => f.filePath.toString).mkString}" } object FilePartition extends Logging { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index b8348cefe7c..0d461da269c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -66,6 +66,7 @@ trait SharedSparkSessionBase protected def sparkConf = { val conf = new SparkConf() + .setMaster("local[2]") .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) .set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) .set(SQLConf.CODEGEN_FALLBACK.key, "false") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 91c6ac6f96e..980cb1155f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.internal.{SessionState, SessionStateBuilder, SQLConf * A special `SparkSession` prepared for testing. */ private[spark] class TestSparkSession(sc: SparkContext) extends SparkSession(sc) { self => - def this(sparkConf: SparkConf, maxLocalTaskFailures: Int = 1, numCores: Int = 2) = { + def this(sparkConf: SparkConf, maxLocalTaskFailures: Int = 4, numCores: Int = 2) = { this(new SparkContext(s"local[$numCores,$maxLocalTaskFailures]", "test-sql-context", sparkConf.set("spark.sql.testkey", "true"))) }