From 9efc14086bbfc0014ed6d5cac8603af93b2dd723 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Thu, 11 Aug 2016 14:44:09 +0530 Subject: [PATCH 1/5] [SPARK-13979][Spark Core]Killed executor is respawned without AWS keys in standalone spark cluster --- .../scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 8 ++++++++ .../sql/execution/datasources/DataSourceStrategy.scala | 8 ++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 59e90564b351..5bad795b9731 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -107,6 +107,14 @@ class SparkHadoopUtil extends Logging { if (key.startsWith("spark.hadoop.")) { hadoopConf.set(key.substring("spark.hadoop.".length), value) } + // Copy any "fs.swift2d.foo=bar" properties into conf as "fs.swift2d.foo=bar" + else if (key.startsWith("fs.swift2d")){ + hadoopConf.set(key, value) + } + // Copy any "fs.s3x.foo=bar" properties into conf as "fs.s3x.foo=bar" + else if (key.startsWith("fs.s3")){ + hadoopConf.set(key, value) + } } val bufferSize = conf.get("spark.buffer.size", "65536") hadoopConf.set("io.file.buffer.size", bufferSize) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 3741a9cb32fd..5d13f873c6b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -121,7 +121,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) => // See buildPartitionedTableScan for the reason that we need to create a shard // broadcast HadoopConf. - val sharedHadoopConf = SparkHadoopUtil.get.conf + // fix added for SPARK-13979 + // val sharedHadoopConf = SparkHadoopUtil.get.conf + val sharedHadoopConf = t.sqlContext.sparkContext.hadoopConfiguration val confBroadcast = t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf)) pruneFilterProject( @@ -156,7 +158,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Because we are creating one RDD per partition, we need to have a shared HadoopConf. // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high. - val sharedHadoopConf = SparkHadoopUtil.get.conf + // fix added for SPARK-13979 + // val sharedHadoopConf = SparkHadoopUtil.get.conf + val sharedHadoopConf = relation.sqlContext.sparkContext.hadoopConfiguration val confBroadcast = relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf)) val partitionColumnNames = partitionColumns.fieldNames.toSet From 89e7c97a30e4cd38d2bc92912f950092c8e1864a Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Tue, 16 Aug 2016 11:42:25 +0530 Subject: [PATCH 2/5] corrected the position of comment --- .../main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 5bad795b9731..72ee6cdee44f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -102,8 +102,8 @@ class SparkHadoopUtil extends Logging { hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey) hadoopConf.set("fs.s3a.secret.key", accessKey) } - // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" conf.getAll.foreach { case (key, value) => + // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" if (key.startsWith("spark.hadoop.")) { hadoopConf.set(key.substring("spark.hadoop.".length), value) } From ab6ab9b810082991fb43820b9c8ad662afc8e39a Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Sun, 21 Aug 2016 10:05:41 +0530 Subject: [PATCH 3/5] Update SparkHadoopUtil.scala have updated for hadoop-openstack package properties --- .../scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 72ee6cdee44f..6d8aaaa8fe9e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -107,11 +107,11 @@ class SparkHadoopUtil extends Logging { if (key.startsWith("spark.hadoop.")) { hadoopConf.set(key.substring("spark.hadoop.".length), value) } - // Copy any "fs.swift2d.foo=bar" properties into conf as "fs.swift2d.foo=bar" - else if (key.startsWith("fs.swift2d")){ + // Copy any "fs.swift2d.foo=bar" or "fs.swift.foo=bar" properties into conf + else if (key.startsWith("fs.swift")){ hadoopConf.set(key, value) } - // Copy any "fs.s3x.foo=bar" properties into conf as "fs.s3x.foo=bar" + // Copy any "fs.s3.foo=bar" or "fs.s3a.foo=bar" or "fs.s3n.foo=bar" properties into conf else if (key.startsWith("fs.s3")){ hadoopConf.set(key, value) } From 9765dc9f85bb145cf22df4c5a05a6de6f7d88f61 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Sun, 21 Aug 2016 10:06:46 +0530 Subject: [PATCH 4/5] Update SparkHadoopUtil.scala added comment with jira number --- .../src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6d8aaaa8fe9e..07f34287759d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -107,6 +107,7 @@ class SparkHadoopUtil extends Logging { if (key.startsWith("spark.hadoop.")) { hadoopConf.set(key.substring("spark.hadoop.".length), value) } + // fix added for SPARK-13979 // Copy any "fs.swift2d.foo=bar" or "fs.swift.foo=bar" properties into conf else if (key.startsWith("fs.swift")){ hadoopConf.set(key, value) From dee4e6d996ed506602c5b703816f0b948975322e Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Sun, 26 Feb 2017 00:22:02 +0530 Subject: [PATCH 5/5] added fs.* instead of filesystem specific --- .../scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 07f34287759d..cd125bf0ca8d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -108,12 +108,8 @@ class SparkHadoopUtil extends Logging { hadoopConf.set(key.substring("spark.hadoop.".length), value) } // fix added for SPARK-13979 - // Copy any "fs.swift2d.foo=bar" or "fs.swift.foo=bar" properties into conf - else if (key.startsWith("fs.swift")){ - hadoopConf.set(key, value) - } - // Copy any "fs.s3.foo=bar" or "fs.s3a.foo=bar" or "fs.s3n.foo=bar" properties into conf - else if (key.startsWith("fs.s3")){ + // Copy any "fs.*=bar" properties into conf it will cover almost all filesystem + else if (key.startsWith("fs.")){ hadoopConf.set(key, value) } }