Skip to content
27 changes: 27 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter(
null)
}

def open(dynamicPartPath: String) {
val numfmt = NumberFormat.getInstance()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NumberFormat.getInstance() is not thread-safe. We can use a thread-local variable to hold this object, similar to Cast.threadLocalDateFormat

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized this function is a variant of the original open() method within the same file. This should be a bug in the master branch.

Another issue is that, SparkHadoopWriter resides in project core, which is an indirect dependency of sql/hive. Thus logically, it's not proper to put open(dynamicPartPath: String) here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it is actually SparkHiveHadoopWriter in sql/hive. Seems we need to rename this file.

numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)

val extension = Utilities.getFileExtension(
conf.value,
fileSinkConf.getCompressed,
getOutputFormat())

val outputName = "part-" + numfmt.format(splitID) + extension
val outputPath: Path = FileOutputFormat.getOutputPath(conf.value)
if (outputPath == null) {
throw new IOException("Undefined job output-path")
}
val workPath = new Path(outputPath, dynamicPartPath.substring(1))//remove "/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add spaces around //.

val path = new Path(workPath, outputName)
getOutputCommitter().setupTask(getTaskContext())
writer = HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
fileSinkConf.getTableInfo,
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
fileSinkConf,
path,
null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Reporter.NULL instead of null.

}

def write(value: Writable) {
if (writer != null) {
writer.write(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,11 +822,6 @@ private[hive] object HiveQl {
cleanIdentifier(key.toLowerCase) -> None
}.toMap).getOrElse(Map.empty)

if (partitionKeys.values.exists(p => p.isEmpty)) {
throw new NotImplementedError(s"Do not support INSERT INTO/OVERWRITE with" +
s"dynamic partitioning.")
}

InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite)

case a: ASTNode =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharOb
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
Expand Down Expand Up @@ -159,6 +159,28 @@ case class InsertIntoHiveTable(
writer.commitJob()
}

def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int) :String = {
dynamicPartNum2 match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An if should be more appropriate than a match here.

case 0 =>""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a space before the quotes.

case i => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pair of braces are redundant.

val colsNum = tableInfo.getProperties.getProperty("columns").split("\\,").length
val partColStr = tableInfo.getProperties.getProperty("partition_columns")
val partCols = partColStr.split("/")
var buf = new StringBuffer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simplify line 169 to line 179 to:

partCols
  .takeRight(dynamicPartNum2)
  .zip(row.takeRight(dynamicPartNum2))
  .map { case (c, v) => s"/$c=$v" }
  .mkString

Also, I'm not very sure whether toString is adequate enough to be part of the partition directory name...

if (partCols.length == dynamicPartNum2) {
for (j <- 0 until partCols.length) {
buf.append("/").append(partCols(j)).append("=").append(row(j + row.length - colsNum))
}
} else {
for (j <- 0 until dynamicPartNum2) {
buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(row(j + colsNum))
}
}
buf.toString
}
}
}

override def execute() = result

/**
Expand All @@ -178,6 +200,12 @@ case class InsertIntoHiveTable(
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
var dynamicPartNum = 0
var dynamicPartPath = "";
val partitionSpec = partition.map {
case (key, Some(value)) => key -> value
case (key, None) => { dynamicPartNum += 1; key -> "" }// Should not reach here right now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the comment and put dynamicPartNum += 1 and key -> "" in two separate lines (without braces).

}
val rdd = childRdd.mapPartitions { iter =>
val serializer = newSerializer(fileSinkConf.getTableInfo)
val standardOI = ObjectInspectorUtils
Expand All @@ -191,7 +219,10 @@ case class InsertIntoHiveTable(
val outputData = new Array[Any](fieldOIs.length)
iter.map { row =>
var i = 0
while (i < row.length) {
while (i < fieldOIs.length) {
if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first comparison is redundant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of an if, we should add an assertion here to make sure row.length - fieldIOs.length == dynamicPartNum.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing is that, we only need to calculate dynamicPartPath once for a single row. Just move this block before the while.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, too many comments for a single line :)

Just realized this line is on the critical path, please forget about the assertion I mentioned above. To avoid unnecessary computations and branches, I'd suggest something like:

if (dynamicPartNum > 0) {
  iter.map {
    // With DP
  }
} else {
  iter.map {
    // Without DP
  }
}

dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum)
}
// Casts Strings to HiveVarchars when necessary.
outputData(i) = wrap(row(i), fieldOIs(i))
i += 1
Expand All @@ -204,12 +235,81 @@ case class InsertIntoHiveTable(
// ORC stores compression information in table properties. While, there are other formats
// (e.g. RCFile) that rely on hadoop configurations to store compression information.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the above comments before line 306.

val jobConf = new JobConf(sc.hiveconf)
saveAsHiveFile(
rdd,
outputClass,
fileSinkConf,
jobConf,
sc.hiveconf.getBoolean("hive.exec.compress.output", false))
val jobConfSer = new SerializableWritable(jobConf)
if (dynamicPartNum>0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add spaces around >.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spaces around >.

if (outputClass == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we generalize saveAsHiveFile instead of duplicating its code?

throw new SparkException("Output value class not set")
}
jobConfSer.value.setOutputValueClass(outputClass)
if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
throw new SparkException("Output format class not set")
}
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
jobConfSer.value.set("mapred.output.compress", "true")
fileSinkConf.setCompressed(true)
fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
}
jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])

FileOutputFormat.setOutputPath(
jobConfSer.value,
SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))

var writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
val serializer = newSerializer(fileSinkConf.getTableInfo)
var count = 0
var writer2:SparkHiveHadoopWriter = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space after :

while(iter.hasNext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space after (

val record = iter.next();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the trailing ;

val location = fileSinkConf.getDirName
val partLocation = location + dynamicPartPath
writer2=writerMap.get(dynamicPartPath) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spaces around =.

case Some(writer)=> writer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space after )

case None => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can replace writerMap.get(...) match { ... case None ... } structure here with writerMap.getOrElse(..., { ... }).

val tempWriter = new SparkHiveHadoopWriter(jobConfSer.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
tempWriter.open(dynamicPartPath);
writerMap += (dynamicPartPath -> tempWriter)
tempWriter
}
}
count += 1
writer2.write(record)
}
for((k,v) <- writerMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space before (

v.close()
v.commit()
}
}

sc.sparkContext.runJob(rdd, writeToFile2 _)

for((k,v) <- writerMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space before (

v.commitJob()
}
writerMap.clear()
//writer.commitJob()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this comment.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this newline.

It seems that we duplicated saveAsHiveFile here and added a new version of writeToFile together with some other minor modifications to enable dynamic partitioning. I think we can move writeToFile2 (also, please rename it to something like wirteWithDynamicPartitions) into saveAsHiveFile without hurting performance as long as we keep the critical path of writing without dynamic partitioning clean.

} else {
saveAsHiveFile(
rdd,
outputClass,
fileSinkConf,
jobConf,
sc.hiveconf.getBoolean("hive.exec.compress.output", false))
}

// TODO: Handle dynamic partitioning.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this comment since we are adding support for dynamic partitioning.

val outputPath = FileOutputFormat.getOutputPath(jobConf)
Expand All @@ -220,25 +320,33 @@ case class InsertIntoHiveTable(
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
val holdDDLTime = false
if (partition.nonEmpty) {
val partitionSpec = partition.map {
case (key, Some(value)) => key -> value
case (key, None) => key -> "" // Should not reach here right now.
}
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
db.validatePartitionNameCharacters(partVals)
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
db.loadPartition(
outputPath,
qualifiedTableName,
partitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
if (dynamicPartNum>0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add spaces around >.

db.loadDynamicPartitions(
outputPath,
qualifiedTableName,
partitionSpec,
overwrite,
dynamicPartNum/*dpCtx.getNumDPCols()*/,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the comment.

holdDDLTime,
isSkewedStoreAsSubdir
)
} else {
db.loadPartition(
outputPath,
qualifiedTableName,
partitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
}
} else {
db.loadTable(
outputPath,
Expand Down