Skip to content

Commit 9da8ad4

Browse files
authored
[SPK-186] Vault error messages if fails a request (apache#160)
* Vault error messages if is failing * Changelog and solved compilations in mesos * Mesos fixes * Code style * Uncomment tests * Trying timeout * Removed todo * Go Jenkins * From future to try * Fixed futures to try * More specific messages * Revert "More specific messages" This reverts commit 3b4304a0efca1ea2f837070cd40369085c3ce741. * Uncommented tests
1 parent 0115a1c commit 9da8ad4

12 files changed

Lines changed: 416 additions & 180 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* Possibility of specifying the name of the certificate within a directory with multiple certificates
99
* Add support to multiples CAs
1010
* Added performance tests PNF
11+
* Add messages for errors in Vault
1112

1213
## 2.2.0.5 (upcoming)
1314

core/src/main/scala/org/apache/spark/broadcast/TorrentSecretBroadcast.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import java.util.zip.Adler32
2424
import scala.collection.JavaConverters._
2525
import scala.reflect.ClassTag
2626
import scala.util.Random
27-
2827
import org.apache.spark._
2928
import org.apache.spark.internal.Logging
3029
import org.apache.spark.io.CompressionCodec
@@ -92,7 +91,7 @@ private[spark] class TorrentSecretBroadcast(secretVaultPath: String,
9291
private var checksums: Array[Int] = _
9392

9493
override protected def getValue() = {
95-
VaultHelper.retrieveSecret(secretVaultPath, idJson)
94+
VaultHelper.retrieveSecret(secretVaultPath, idJson).get
9695
}
9796

9897
private def calcChecksum(block: ByteBuffer): Int = {

core/src/main/scala/org/apache/spark/security/ConfigSecurity.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.util.{Failure, Success, Try}
2121
import org.apache.spark.internal.Logging
2222
import org.apache.spark.util.Utils
2323

24+
2425
object ConfigSecurity extends Logging {
2526

2627
val secretsFolder: String = sys.env.get("SPARK_DRIVER_SECRET_FOLDER") match {
@@ -39,21 +40,34 @@ object ConfigSecurity extends Logging {
3940
logInfo("Obtaining vault token using VAULT_TOKEN")
4041
sys.env.get("VAULT_TOKEN")
4142
} else if (sys.env.get("VAULT_TEMP_TOKEN").isDefined) {
43+
4244
logInfo("Obtaining vault token using VAULT_TEMP_TOKEN")
43-
scala.util.Try {
44-
VaultHelper.getRealToken(sys.env.get("VAULT_TEMP_TOKEN"))
45-
} match {
45+
val token = VaultHelper.getRealToken(sys.env.get("VAULT_TEMP_TOKEN"))
46+
47+
token match {
4648
case Success(token) => Option(token)
4749
case Failure(e) =>
4850
logWarning("An error ocurred while trying to obtain" +
4951
" Application Token from a temporal token", e)
5052
None
5153
}
54+
5255
} else if (sys.env.get("VAULT_ROLE_ID").isDefined && sys.env.get("VAULT_SECRET_ID").isDefined) {
53-
logInfo("Obtaining vault token using ROLE_ID and SECRET_ID")
54-
Option(VaultHelper.getTokenFromAppRole(
56+
57+
logInfo("Obtaining vault token using ROLE_ID and SECRET_ID")
58+
val tokenRole = VaultHelper.getTokenFromAppRole(
5559
sys.env("VAULT_ROLE_ID"),
56-
sys.env("VAULT_SECRET_ID")))
60+
sys.env("VAULT_SECRET_ID")
61+
)
62+
63+
tokenRole match {
64+
case Success(token) => Option(token)
65+
case Failure(e) =>
66+
logWarning("An error ocurred while trying to obtain" +
67+
" Application Token from a ROLE_ID and SECRET_ID", e)
68+
None
69+
}
70+
5771
} else {
5872
logInfo("No Vault token variables provided. Skipping Vault token retrieving")
5973
None

core/src/main/scala/org/apache/spark/security/DBConfig.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
*/
1717
package org.apache.spark.security
1818

19+
1920
object DBConfig {
2021
def prepareEnvironment(options: Map[String, String]): Map[String, String] = {
2122
options.filter(_._1.endsWith("DB_USER_VAULT_PATH")).flatMap{case (_, path) =>
22-
val (pass, user) = VaultHelper.getPassPrincipalFromVault(path)
23+
24+
val (pass, user) = VaultHelper.getPassPrincipalFromVault(path).get
25+
2326
Seq(("spark.db.enable", "true"), ("spark.db.user", user), ("spark.db.pass", pass))
2427
}
2528
}

core/src/main/scala/org/apache/spark/security/HTTPHelper.scala

Lines changed: 40 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.security
2020
import java.io.{BufferedReader, File, InputStreamReader}
2121
import java.security.cert.X509Certificate
2222

23-
import scala.annotation.tailrec
23+
import scala.util.Try
2424
import scala.util.parsing.json.JSON
2525

2626
import org.apache.http.client.HttpClient
@@ -31,6 +31,10 @@ import org.apache.http.ssl.{SSLContextBuilder, TrustStrategy}
3131

3232
import org.apache.spark.internal.Logging
3333

34+
35+
36+
37+
3438
object HTTPHelper extends Logging{
3539

3640
lazy val clientNaive: HttpClient = {
@@ -54,49 +58,53 @@ object HTTPHelper extends Logging{
5458
def executePost(requestUrl: String,
5559
parentField: String,
5660
headers: Option[Seq[(String, String)]],
57-
entity: Option[String] = None): Map[String, Any] = {
61+
entity: Option[String] = None): Try[Map[String, Any]] = {
5862
val post = new HttpPost(requestUrl)
5963

6064
getContentFromResponse(post, parentField, headers, entity)
6165
}
6266
def executeGet(requestUrl: String,
6367
parentField: String,
64-
headers: Option[Seq[(String, String)]]): Map[String, Any] = {
68+
headers: Option[Seq[(String, String)]]): Try[Map[String, Any]] = {
6569
val get = new HttpGet(requestUrl)
6670
getContentFromResponse(get, parentField, headers)
6771
}
6872

6973
private def getContentFromResponse(uriRequest: HttpUriRequest,
7074
parentField: String,
7175
headers: Option[Seq[(String, String)]],
72-
entities: Option[String] = None): Map[String, Any] = {
73-
74-
headers.map(head => head.foreach { case (head, value) => uriRequest.addHeader(head, value) })
75-
76-
entities.map(entity => uriRequest.asInstanceOf[HttpPost].setEntity(new StringEntity(entity)))
77-
78-
val client = secureClient match {
79-
case Some(secureClient) =>
80-
logInfo(s"Using secure client")
81-
secureClient
82-
case _ => logInfo(s"Using non secure client")
83-
clientNaive
84-
}
85-
86-
val response = client.execute(uriRequest)
87-
88-
val rd = new BufferedReader(
89-
new InputStreamReader(response.getEntity().getContent()))
90-
91-
val json = JSON.parseFull(Stream.continually(rd.readLine()).takeWhile(_ != null).mkString).
92-
get.asInstanceOf[Map[String, Any]]
93-
logTrace(s"getFrom Vault ${json.mkString("\n")}")
94-
if(response.getStatusLine.getStatusCode != 200) {
95-
val errors = json("errors").asInstanceOf[List[String]].mkString("\n")
96-
throw new RuntimeException(errors)
97-
}
98-
else {
99-
json(parentField).asInstanceOf[Map[String, Any]]
100-
}
76+
entities: Option[String] = None): Try[Map[String, Any]] =
77+
Try {
78+
headers.foreach(
79+
head => head.foreach { case (head, value) => uriRequest.addHeader(head, value) }
80+
)
81+
82+
entities.foreach(
83+
entity => uriRequest.asInstanceOf[HttpPost].setEntity(new StringEntity(entity))
84+
)
85+
86+
val client = secureClient match {
87+
case Some(secureClient) =>
88+
logInfo(s"Using secure client")
89+
secureClient
90+
case _ => logInfo(s"Using non secure client")
91+
clientNaive
92+
}
93+
94+
val response = client.execute(uriRequest)
95+
96+
val rd = new BufferedReader(
97+
new InputStreamReader(response.getEntity().getContent()))
98+
99+
val json = JSON.parseFull(Stream.continually(rd.readLine()).takeWhile(_ != null).mkString).
100+
get.asInstanceOf[Map[String, Any]]
101+
logTrace(s"getFrom Vault ${json.mkString("\n")}")
102+
if (response.getStatusLine.getStatusCode != 200) {
103+
val errors = json("errors").asInstanceOf[List[String]].mkString("\n")
104+
throw new RuntimeException(errors)
105+
}
106+
else {
107+
json(parentField).asInstanceOf[Map[String, Any]]
108+
}
101109
}
102110
}

core/src/main/scala/org/apache/spark/security/KerberosConfig.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,23 @@ import javax.xml.bind.DatatypeConverter
2222

2323
import org.apache.spark.internal.Logging
2424

25+
2526
object KerberosConfig extends Logging{
2627

2728
def prepareEnviroment(options: Map[String, String]): Map[String, String] = {
28-
val kerberosVaultPath = options.get("KERBEROS_VAULT_PATH")
29-
if(kerberosVaultPath.isDefined) {
29+
30+
options.get("KERBEROS_VAULT_PATH") map { kerberosVaultPath =>
31+
3032
val (keytab64, principal) =
31-
VaultHelper.getKeytabPrincipalFromVault(kerberosVaultPath.get)
33+
VaultHelper.getKeytabPrincipalFromVault(kerberosVaultPath).get
34+
3235
val keytabPath = getKeytabPrincipal(keytab64, principal)
3336
Map("principal" -> principal, "keytabPath" -> keytabPath)
34-
} else {
37+
38+
} getOrElse {
3539
logInfo(s"tying to get ssl secrets from vault for Kerberos but not found vault path," +
3640
s" skipping")
37-
Map[String, String]()
41+
Map.empty
3842
}
3943
}
4044

core/src/main/scala/org/apache/spark/security/MesosConfig.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@ package org.apache.spark.security
1919
object MesosConfig {
2020
def prepareEnvironment(options: Map[String, String]): Map[String, String] = {
2121
options.filter(_._1.endsWith("MESOS_VAULT_PATH")).flatMap{case (_, path) =>
22-
val (pass, user) = VaultHelper.getPassPrincipalFromVault(path)
22+
23+
val (pass, user) =
24+
VaultHelper.getPassPrincipalFromVault(path).get
25+
2326
Seq(("spark.mesos.principal", user), ("spark.mesos.secret", pass))
27+
2428
}
2529
}
2630
}

core/src/main/scala/org/apache/spark/security/SSLConfig.scala

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,46 +30,89 @@ import sun.security.util.DerInputStream
3030

3131
import org.apache.spark.internal.Logging
3232

33+
34+
3335
object SSLConfig extends Logging {
3436

3537
val sslTypeDataStore = "DATASTORE"
3638

39+
private val sparkSSLPrefix = "spark.ssl."
40+
3741
def prepareEnvironment(sslType: String,
3842
options: Map[String, String]): Map[String, String] = {
3943

40-
val sparkSSLPrefix = "spark.ssl."
4144

42-
val trustStore = VaultHelper.getAllCas
43-
val trustPass = VaultHelper.getCAPass
44-
val trustStorePath = generateTrustStore(sslType, trustStore, trustPass)
45+
val trustStoreOptions = generateTruststoreOptions(sslType)
46+
val keyStoreOptions = generateKeystoreOptions(sslType, options)
47+
48+
val vaultKeyPassPath = options(s"${sslType}_VAULT_KEY_PASS_PATH")
49+
50+
val keyPass =
51+
VaultHelper.getCertPassForAppFromVault(vaultKeyPassPath).get
52+
53+
val keyPassOptions = Map(
54+
s"$sparkSSLPrefix${sslType.toLowerCase}.keyPassword" -> keyPass
55+
)
56+
57+
val certFilesPath =
58+
Map(s"$sparkSSLPrefix${sslType.toLowerCase}.certPem.path" ->
59+
s"${ConfigSecurity.secretsFolder}/cert.crt",
60+
s"$sparkSSLPrefix${sslType.toLowerCase}.keyPKCS8.path" ->
61+
s"${ConfigSecurity.secretsFolder}/key.pkcs8",
62+
s"$sparkSSLPrefix${sslType.toLowerCase}.caPem.path" ->
63+
s"${ConfigSecurity.secretsFolder}/ca.crt")
64+
65+
trustStoreOptions ++ keyStoreOptions ++ keyPassOptions ++ certFilesPath
66+
}
67+
68+
private def generateTruststoreOptions(sslType: String): Map[String, String] = {
69+
70+
val getTrustStoreAndPass = for {
71+
72+
trustStore <- VaultHelper.getAllCas
73+
trustPass <- VaultHelper.getCAPass
74+
75+
} yield {
76+
77+
generatePemFile(trustStore, "ca.crt")
78+
79+
(generateTrustStore(sslType, trustStore, trustPass), trustPass)
80+
}
81+
82+
val (trustStorePath, trustPass) = getTrustStoreAndPass.get
4583

4684
logInfo(s"Setting SSL values for $sslType")
4785

48-
val trustStoreOptions =
86+
4987
Map(s"$sparkSSLPrefix${sslType.toLowerCase}.enabled" -> "true",
5088
s"$sparkSSLPrefix${sslType.toLowerCase}.trustStore" -> trustStorePath,
5189
s"$sparkSSLPrefix${sslType.toLowerCase}.trustStorePassword" -> trustPass,
5290
s"$sparkSSLPrefix${sslType.toLowerCase}.security.protocol" -> "SSL")
5391

54-
val vaultKeystorePath = options.get(s"${sslType}_VAULT_CERT_PATH")
92+
}
5593

56-
val vaultKeystorePassPath = options.get(s"${sslType}_VAULT_CERT_PASS_PATH")
57-
val certName = options.get(s"${sslType}_CERTIFICATE_NAME")
94+
private def generateKeystoreOptions(
95+
sslType: String,
96+
options: Map[String, String]
97+
): Map[String, String] = {
5898

59-
val keyStoreOptions = if (vaultKeystorePath.isDefined
60-
&& vaultKeystorePassPath.isDefined) {
61-
62-
val (key, certs) = certName match {
63-
case Some(cert) => VaultHelper.getCertKeyForAppFromVault(
64-
vaultKeystorePath.get, certName.get)
65-
case None => VaultHelper.getCertKeyForAppFromVault(vaultKeystorePath.get)
66-
}
99+
val keystoreOptions = for {
100+
101+
vaultKeystorePath <- options.get(s"${sslType}_VAULT_CERT_PATH")
102+
vaultKeystorePassPath <- options.get(s"${sslType}_VAULT_CERT_PASS_PATH")
103+
104+
} yield {
105+
106+
val certName = options.get(s"${sslType}_CERTIFICATE_NAME")
107+
108+
val (key, certs) =
109+
VaultHelper.getCertKeyForAppFromVault(vaultKeystorePath, certName).get
67110

68111
pemToDer(key)
69112
generatePemFile(certs, "cert.crt")
70-
generatePemFile(trustStore, "ca.crt")
71113

72-
val pass = VaultHelper.getCertPassForAppFromVault( vaultKeystorePassPath.get)
114+
val pass =
115+
VaultHelper.getCertPassForAppFromVault( vaultKeystorePassPath).get
73116

74117
val keyStorePath = generateKeyStore(sslType, certs, key, pass)
75118

@@ -78,27 +121,14 @@ object SSLConfig extends Logging {
78121
s"$sparkSSLPrefix${sslType.toLowerCase}.protocol" -> "TLSv1.2",
79122
s"$sparkSSLPrefix${sslType.toLowerCase}.needClientAuth" -> "true"
80123
)
81-
82-
} else {
83-
logInfo(s"trying to get ssl secrets from vault for ${sslType.toLowerCase} keyStore" +
84-
s" but not found pass and cert vault paths, exiting")
85-
Map[String, String]()
86124
}
87125

88-
val vaultKeyPassPath = options.get(s"${sslType}_VAULT_KEY_PASS_PATH")
89-
90-
val keyPass = Map(s"$sparkSSLPrefix${sslType.toLowerCase}.keyPassword"
91-
-> VaultHelper.getCertPassForAppFromVault(vaultKeyPassPath.get))
92-
93-
val certFilesPath =
94-
Map(s"$sparkSSLPrefix${sslType.toLowerCase}.certPem.path" ->
95-
s"${ConfigSecurity.secretsFolder}/cert.crt",
96-
s"$sparkSSLPrefix${sslType.toLowerCase}.keyPKCS8.path" ->
97-
s"${ConfigSecurity.secretsFolder}/key.pkcs8",
98-
s"$sparkSSLPrefix${sslType.toLowerCase}.caPem.path" ->
99-
s"${ConfigSecurity.secretsFolder}/ca.crt")
126+
keystoreOptions.getOrElse {
127+
logInfo(s"trying to get ssl secrets from vault for ${sslType.toLowerCase} keyStore" +
128+
s" but not found pass and cert vault paths, exiting")
129+
Map.empty
130+
}
100131

101-
trustStoreOptions ++ keyStoreOptions ++ keyPass ++ certFilesPath
102132
}
103133

104134
def generateTrustStore(sslType: String, cas: String, password: String): String = {

0 commit comments

Comments
 (0)