@@ -32,6 +32,7 @@ import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
3232import org .apache .hadoop .mapred .TextInputFormat
3333import org .apache .hadoop .mapreduce .lib .input .{TextInputFormat => NewTextInputFormat }
3434import org .json4s .{DefaultFormats , Extraction }
35+ import org .junit .Assert .{assertEquals , assertFalse }
3536import org .scalatest .concurrent .Eventually
3637import org .scalatest .matchers .must .Matchers ._
3738
@@ -1237,6 +1238,53 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
12371238 }
12381239 }
12391240 }
1241+
1242+ test(" SPARK-XXX: Fill missing S3A magic committer configs if needed" ) {
1243+ val c1 = new SparkConf ().setAppName(" s3a-test" ).setMaster(" local" )
1244+ sc = new SparkContext (c1)
1245+ assertFalse(sc.getConf.contains(" spark.hadoop.fs.s3a.committer.name" ))
1246+
1247+ resetSparkContext()
1248+ val c2 = c1.clone.set(" spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled" , " false" )
1249+ sc = new SparkContext (c2)
1250+ assertFalse(sc.getConf.contains(" spark.hadoop.fs.s3a.committer.name" ))
1251+
1252+ resetSparkContext()
1253+ val c3 = c1.clone.set(" spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled" , " true" )
1254+ sc = new SparkContext (c3)
1255+ Seq (
1256+ " spark.hadoop.fs.s3a.committer.magic.enabled" -> " true" ,
1257+ " spark.hadoop.fs.s3a.committer.name" -> " magic" ,
1258+ " spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" ->
1259+ " org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory" ,
1260+ " spark.sql.parquet.output.committer.class" ->
1261+ " org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter" ,
1262+ " spark.sql.sources.commitProtocolClass" ->
1263+ " org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
1264+ ).foreach { case (k, v) =>
1265+ assertEquals(v, sc.getConf.get(k))
1266+ }
1267+
1268+ // Respect a user configuration
1269+ resetSparkContext()
1270+ val c4 = c1.clone
1271+ .set(" spark.hadoop.fs.s3a.committer.magic.enabled" , " false" )
1272+ .set(" spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled" , " true" )
1273+ sc = new SparkContext (c4)
1274+ Seq (
1275+ " spark.hadoop.fs.s3a.committer.magic.enabled" -> " false" ,
1276+ " spark.hadoop.fs.s3a.committer.name" -> null ,
1277+ " spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" -> null ,
1278+ " spark.sql.parquet.output.committer.class" -> null ,
1279+ " spark.sql.sources.commitProtocolClass" -> null
1280+ ).foreach { case (k, v) =>
1281+ if (v == null ) {
1282+ assertFalse(sc.getConf.contains(k))
1283+ } else {
1284+ assertEquals(v, sc.getConf.get(k))
1285+ }
1286+ }
1287+ }
12401288}
12411289
12421290object SparkContextSuite {
0 commit comments