Skip to content

Commit 33334e2

Browse files
rdblueMarcelo Vanzin
authored andcommitted
[SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.
## What changes were proposed in this pull request? Updates FileFormatWriter to create a consistent Hadoop Job ID for a write. ## How was this patch tested? Existing tests for regressions. Closes apache#23777 from rdblue/SPARK-26873-fix-file-format-writer-job-ids. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 2228ee5 commit 33334e2

1 file changed

Lines changed: 4 additions & 1 deletion

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,14 @@ object FileFormatWriter extends Logging {
162162
rdd
163163
}
164164

165+
val jobIdInstant = new Date().getTime
165166
val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
166167
sparkSession.sparkContext.runJob(
167168
rddWithNonEmptyPartitions,
168169
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
169170
executeTask(
170171
description = description,
172+
jobIdInstant = jobIdInstant,
171173
sparkStageId = taskContext.stageId(),
172174
sparkPartitionId = taskContext.partitionId(),
173175
sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
@@ -200,13 +202,14 @@ object FileFormatWriter extends Logging {
200202
/** Writes data out in a single Spark task. */
201203
private def executeTask(
202204
description: WriteJobDescription,
205+
jobIdInstant: Long,
203206
sparkStageId: Int,
204207
sparkPartitionId: Int,
205208
sparkAttemptNumber: Int,
206209
committer: FileCommitProtocol,
207210
iterator: Iterator[InternalRow]): WriteTaskResult = {
208211

209-
val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
212+
val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId)
210213
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
211214
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
212215

0 commit comments

Comments
 (0)