@@ -30,7 +30,7 @@ import scala.util.control.NonFatal
3030
3131import com .google .common .primitives .Longs
3232import org .apache .hadoop .conf .Configuration
33- import org .apache .hadoop .fs .{FileStatus , FileSystem , Path , PathFilter }
33+ import org .apache .hadoop .fs .{FileStatus , FileSystem , FSDataOutputStream , Path , PathFilter }
3434import org .apache .hadoop .mapred .JobConf
3535import org .apache .hadoop .security .{Credentials , UserGroupInformation }
3636import org .apache .hadoop .security .token .{Token , TokenIdentifier }
@@ -477,4 +477,42 @@ object SparkHadoopUtil {
477477 hadoopConf.set(key.substring(" spark.hadoop." .length), value)
478478 }
479479 }
480+
481+ // scalastyle:off
482+ /**
483+ * Create a file on the given file system, optionally making sure erasure coding is disabled.
484+ *
485+ * Disabling EC can be helpful as HDFS EC doesn't support hflush(), hsync(), or append().
486+ * https://hadoop.apache.org/docs/r3.0.0/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html#Limitations
487+ */
488+ def createFile (fs : FileSystem , path : Path , allowEC : Boolean ): FSDataOutputStream = {
489+ if (allowEC) {
490+ fs.create(path)
491+ } else {
492+ try {
493+ // Use reflection as this uses APIs only available in Hadoop 3
494+ val builderMethod = fs.getClass().getMethod(" createFile" , classOf [Path ])
495+ // the builder api does not resolve relative paths, nor does it create parent dirs, while
496+ // the old api does.
497+ if (! fs.mkdirs(path.getParent())) {
498+ throw new IOException (s " Failed to create parents of $path" )
499+ }
500+ val qualifiedPath = fs.makeQualified(path)
501+ val builder = builderMethod.invoke(fs, qualifiedPath)
502+ val builderCls = builder.getClass()
503+ // this may throw a NoSuchMethodException if the path is not on hdfs
504+ val replicateMethod = builderCls.getMethod(" replicate" )
505+ val buildMethod = builderCls.getMethod(" build" )
506+ val b2 = replicateMethod.invoke(builder)
507+ buildMethod.invoke(b2).asInstanceOf [FSDataOutputStream ]
508+ } catch {
509+ case _ : NoSuchMethodException =>
510+ // No createFile() method, we're using an older hdfs client, which doesn't give us control
511+ // over EC vs. replication. Older hdfs doesn't have EC anyway, so just create a file with
512+ // old apis.
513+ fs.create(path)
514+ }
515+ }
516+ }
517+
480518}
0 commit comments