Skip to content

Commit 480d20a

Browse files
committed
Broadcast configuration in hiveWriterContainers (WIP hack)
1 parent f90dc94 commit 480d20a

3 files changed

Lines changed: 49 additions & 33 deletions

File tree

core/src/main/scala/org/apache/spark/SerializableWritable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,4 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
4343
ow.readFields(in)
4444
t = ow.get().asInstanceOf[T]
4545
}
46-
}
46+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,14 @@ case class InsertIntoHiveTable(
174174

175175
val jobConf = new JobConf(sc.hiveconf)
176176
val jobConfSer = new SerializableWritable(jobConf)
177+
val broadcastedConf = sc.sparkContext.broadcast(new SerializableWritable[JobConf](jobConf))
177178

178179
val writerContainer = if (numDynamicPartitions > 0) {
179180
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
180-
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
181+
new SparkHiveDynamicPartitionWriterContainer(
182+
broadcastedConf, fileSinkConf, dynamicPartColNames)
181183
} else {
182-
new SparkHiveWriterContainer(jobConf, fileSinkConf)
184+
new SparkHiveWriterContainer(broadcastedConf, fileSinkConf)
183185
}
184186

185187
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ import org.apache.hadoop.io.Writable
3131
import org.apache.hadoop.mapred._
3232
import org.apache.hadoop.hive.common.FileUtils
3333

34+
import org.apache.spark.broadcast.Broadcast
3435
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3536
import org.apache.spark.sql.Row
36-
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
37+
import org.apache.spark.{SerializableWritable, Logging, SparkHadoopWriter}
3738
import org.apache.spark.sql.catalyst.util.DateUtils
3839
import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc}
3940
import org.apache.spark.sql.hive.HiveShim._
@@ -44,7 +45,7 @@ import org.apache.spark.sql.types._
4445
* It is based on [[SparkHadoopWriter]].
4546
*/
4647
private[hive] class SparkHiveWriterContainer(
47-
@transient jobConf: JobConf,
48+
jobConf: Broadcast[SerializableWritable[JobConf]],
4849
fileSinkConf: FileSinkDesc)
4950
extends Logging
5051
with SparkHadoopMapRedUtil
@@ -56,22 +57,28 @@ private[hive] class SparkHiveWriterContainer(
5657
// handler settings can be set to jobConf
5758
if (tableDesc != null) {
5859
PlanUtils.configureOutputJobPropertiesForStorageHandler(tableDesc)
59-
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
60+
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf.value.value)
6061
}
61-
protected val conf = new SerializableWritable(jobConf)
62+
@transient var conf: JobConf = jobConf.value.value
6263

6364
private var jobID = 0
6465
private var splitID = 0
6566
private var attemptID = 0
66-
private var jID: SerializableWritable[JobID] = null
67-
private var taID: SerializableWritable[TaskAttemptID] = null
67+
68+
@transient private var jID: JobID = null
69+
@transient private var taID: TaskAttemptID = null
70+
private var jIDString: String = null
71+
private var taskIDString: String = null
72+
private var taskAttemptIDString: String = null
6873

6974
@transient private var writer: FileSinkOperator.RecordWriter = null
70-
@transient protected lazy val committer = conf.value.getOutputCommitter
71-
@transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
72-
@transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
75+
@transient protected lazy val committer = conf.getOutputCommitter
76+
/** Only used on driver side **/
77+
@transient protected lazy val jobContext = newJobContext(conf, jID)
78+
/** Only used on executor side */
79+
@transient private lazy val taskContext = newTaskAttemptContext(conf, taID)
7380
@transient private lazy val outputFormat =
74-
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
81+
conf.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
7582

7683
def driverSideSetup() {
7784
setIDs(0, 0, 0)
@@ -80,6 +87,7 @@ private[hive] class SparkHiveWriterContainer(
8087
}
8188

8289
def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
90+
conf = new JobConf(jobConf.value.value)
8391
setIDs(jobId, splitId, attemptId)
8492
setConfParams()
8593
committer.setupTask(taskContext)
@@ -90,7 +98,7 @@ private[hive] class SparkHiveWriterContainer(
9098
val numberFormat = NumberFormat.getInstance()
9199
numberFormat.setMinimumIntegerDigits(5)
92100
numberFormat.setGroupingUsed(false)
93-
val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat)
101+
val extension = Utilities.getFileExtension(conf, fileSinkConf.getCompressed, outputFormat)
94102
"part-" + numberFormat.format(splitID) + extension
95103
}
96104

@@ -110,11 +118,11 @@ private[hive] class SparkHiveWriterContainer(
110118
// NOTE this method is executed at the executor side.
111119
// For Hive tables without partitions or with only static partitions, only 1 writer is needed.
112120
writer = HiveFileFormatUtils.getHiveRecordWriter(
113-
conf.value,
121+
conf,
114122
fileSinkConf.getTableInfo,
115-
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
123+
conf.getOutputValueClass.asInstanceOf[Class[Writable]],
116124
fileSinkConf,
117-
FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
125+
FileOutputFormat.getTaskOutputPath(conf, getOutputName),
118126
Reporter.NULL)
119127
}
120128

@@ -127,17 +135,23 @@ private[hive] class SparkHiveWriterContainer(
127135
splitID = splitId
128136
attemptID = attemptId
129137

130-
jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
131-
taID = new SerializableWritable[TaskAttemptID](
132-
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
138+
// note: sparkHadoopwriter.createjobid may be locale-dependent because it doesn't pass a locale
139+
// to date format; we should fix this so that its results is location-independent in case
140+
// different cluster nodes have different locales (e.g. driver and executor may be different
141+
// types of machines with different configurations).
142+
jID = SparkHadoopWriter.createJobID(now, jobId)
143+
taID = new TaskAttemptID(new TaskID(jID, true, splitID), attemptID)
144+
jIDString = jID.toString
145+
taskAttemptIDString = taID.toString
146+
taskIDString = taID.getTaskID.toString
133147
}
134148

135149
private def setConfParams() {
136-
conf.value.set("mapred.job.id", jID.value.toString)
137-
conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
138-
conf.value.set("mapred.task.id", taID.value.toString)
139-
conf.value.setBoolean("mapred.task.is.map", true)
140-
conf.value.setInt("mapred.task.partition", splitID)
150+
conf.set("mapred.job.id", jIDString)
151+
conf.set("mapred.tip.id", taskIDString)
152+
conf.set("mapred.task.id", taskAttemptIDString)
153+
conf.setBoolean("mapred.task.is.map", true)
154+
conf.setInt("mapred.task.partition", splitID)
141155
}
142156
}
143157

@@ -160,14 +174,14 @@ private[spark] object SparkHiveDynamicPartitionWriterContainer {
160174
}
161175

162176
private[spark] class SparkHiveDynamicPartitionWriterContainer(
163-
@transient jobConf: JobConf,
177+
jobConf: Broadcast[SerializableWritable[JobConf]],
164178
fileSinkConf: FileSinkDesc,
165179
dynamicPartColNames: Array[String])
166180
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
167181

168182
import SparkHiveDynamicPartitionWriterContainer._
169183

170-
private val defaultPartName = jobConf.get(
184+
private val defaultPartName = jobConf.value.value.get(
171185
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
172186

173187
@transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
@@ -191,10 +205,10 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
191205
// Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
192206
// calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
193207
// load it with loadDynamicPartitions/loadPartition/loadTable.
194-
val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
195-
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
208+
val oldMarker = jobConf.value.value.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
209+
jobConf.value.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
196210
super.commitJob()
197-
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
211+
jobConf.value.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
198212
}
199213

200214
override def getLocalFileWriter(row: Row, schema: StructType): FileSinkOperator.RecordWriter = {
@@ -229,16 +243,16 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
229243
newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
230244

231245
val path = {
232-
val outputPath = FileOutputFormat.getOutputPath(conf.value)
246+
val outputPath = FileOutputFormat.getOutputPath(conf)
233247
assert(outputPath != null, "Undefined job output-path")
234248
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
235249
new Path(workPath, getOutputName)
236250
}
237251

238252
HiveFileFormatUtils.getHiveRecordWriter(
239-
conf.value,
253+
conf,
240254
fileSinkConf.getTableInfo,
241-
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
255+
conf.getOutputValueClass.asInstanceOf[Class[Writable]],
242256
newFileSinkDesc,
243257
path,
244258
Reporter.NULL)

0 commit comments

Comments
 (0)