Skip to content

Commit 53ac850

Browse files
mgaido91jerryshao
authored andcommitted
[SPARK-24948][SHS][BACKPORT-2.2] Delegate check access permissions to the file system
## What changes were proposed in this pull request? In `SparkHadoopUtil. checkAccessPermission`, we consider only basic permissions in order to check whether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can). The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons. ## How was this patch tested? added UT Author: Marco Gaido <marcogaido91@gmail.com> Closes #22022 from mgaido91/SPARK-24948_2.2.
1 parent a5624c7 commit 53ac850

4 files changed

Lines changed: 74 additions & 137 deletions

File tree

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import scala.util.control.NonFatal
2929
import com.google.common.primitives.Longs
3030
import org.apache.hadoop.conf.Configuration
3131
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
32-
import org.apache.hadoop.fs.permission.FsAction
3332
import org.apache.hadoop.mapred.JobConf
3433
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3534
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -379,27 +378,6 @@ class SparkHadoopUtil extends Logging {
379378
buffer.toString
380379
}
381380

382-
private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
383-
val perm = status.getPermission
384-
val ugi = UserGroupInformation.getCurrentUser
385-
386-
if (ugi.getShortUserName == status.getOwner) {
387-
if (perm.getUserAction.implies(mode)) {
388-
return true
389-
}
390-
} else if (ugi.getGroupNames.contains(status.getGroup)) {
391-
if (perm.getGroupAction.implies(mode)) {
392-
return true
393-
}
394-
} else if (perm.getOtherAction.implies(mode)) {
395-
return true
396-
}
397-
398-
logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
399-
s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
400-
s"${if (status.isDirectory) "d" else "-"}$perm")
401-
false
402-
}
403381
}
404382

405383
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ package org.apache.spark.deploy.history
1919

2020
import java.io.{FileNotFoundException, IOException, OutputStream}
2121
import java.util.UUID
22-
import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
22+
import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService, Future, TimeUnit}
2323
import java.util.zip.{ZipEntry, ZipOutputStream}
2424

25+
import scala.collection.JavaConverters._
2526
import scala.collection.mutable
2627
import scala.xml.Node
2728

2829
import com.google.common.io.ByteStreams
2930
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
30-
import org.apache.hadoop.fs.{FileStatus, Path}
31-
import org.apache.hadoop.fs.permission.FsAction
31+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
3232
import org.apache.hadoop.hdfs.DistributedFileSystem
3333
import org.apache.hadoop.hdfs.protocol.HdfsConstants
3434
import org.apache.hadoop.security.AccessControlException
@@ -105,7 +105,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
105105
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
106106

107107
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
108-
private val fs = new Path(logDir).getFileSystem(hadoopConf)
108+
// Visible for testing
109+
private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf)
109110

110111
// Used by check event thread and clean log thread.
111112
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
@@ -129,6 +130,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
129130

130131
private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)
131132

133+
private val blacklist = new ConcurrentHashMap[String, Long]
134+
135+
// Visible for testing
136+
private[history] def isBlacklisted(path: Path): Boolean = {
137+
blacklist.containsKey(path.getName)
138+
}
139+
140+
private def blacklist(path: Path): Unit = {
141+
blacklist.put(path.getName, clock.getTimeMillis())
142+
}
143+
144+
/**
145+
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
146+
*/
147+
private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
148+
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
149+
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
150+
}
151+
132152
/**
133153
* Return a runnable that performs the given operation on the event logs.
134154
* This operation is expected to be executed periodically.
@@ -326,7 +346,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
326346
// the end-user.
327347
!entry.getPath().getName().startsWith(".") &&
328348
prevFileSize < entry.getLen() &&
329-
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
349+
!isBlacklisted(entry.getPath)
330350
}
331351
.flatMap { entry => Some(entry) }
332352
.sortWith { case (entry1, entry2) =>
@@ -481,6 +501,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
481501
}
482502

483503
} catch {
504+
case e: AccessControlException =>
505+
// We don't have read permissions on the log file
506+
logWarning(s"Unable to read log ${fileStatus.getPath}", e.getCause)
507+
blacklist(fileStatus.getPath)
508+
None
484509
case e: Exception =>
485510
logError(
486511
s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
@@ -587,6 +612,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
587612
} catch {
588613
case t: Exception => logError("Exception in cleaning logs", t)
589614
}
615+
// Clean the blacklist from the expired entries.
616+
clearBlacklist(CLEAN_INTERVAL_S)
590617
}
591618

592619
/**

core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala

Lines changed: 0 additions & 97 deletions
This file was deleted.

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ import scala.concurrent.duration._
2727
import scala.language.postfixOps
2828

2929
import com.google.common.io.{ByteStreams, Files}
30-
import org.apache.hadoop.fs.FileStatus
30+
import org.apache.hadoop.fs.{FileStatus, Path}
3131
import org.apache.hadoop.hdfs.DistributedFileSystem
32+
import org.apache.hadoop.security.AccessControlException
3233
import org.json4s.jackson.JsonMethods._
33-
import org.mockito.Matchers.any
34-
import org.mockito.Mockito.{mock, spy, verify}
34+
import org.mockito.ArgumentMatcher
35+
import org.mockito.Matchers.{any, argThat}
36+
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
3537
import org.scalatest.BeforeAndAfter
3638
import org.scalatest.Matchers
3739
import org.scalatest.concurrent.Eventually._
@@ -135,14 +137,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
135137
// setReadable(...) does not work on Windows. Please refer JDK-6728842.
136138
assume(!Utils.isWindows)
137139

138-
class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
139-
var mergeApplicationListingCall = 0
140-
override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
141-
super.mergeApplicationListing(fileStatus)
142-
mergeApplicationListingCall += 1
143-
}
144-
}
145-
val provider = new TestFsHistoryProvider
140+
val provider = new FsHistoryProvider(createTestConf())
146141

147142
val logFile1 = newLogFile("new1", None, inProgress = false)
148143
writeFile(logFile1, true, None,
@@ -159,8 +154,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
159154
updateAndCheck(provider) { list =>
160155
list.size should be (1)
161156
}
162-
163-
provider.mergeApplicationListingCall should be (1)
164157
}
165158

166159
test("history file is renamed from inprogress to completed") {
@@ -583,6 +576,42 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
583576
}
584577
}
585578

579+
test("SPARK-24948: blacklist files we don't have read permission on") {
580+
val clock = new ManualClock(1533132471)
581+
val provider = new FsHistoryProvider(createTestConf(), clock)
582+
val accessDenied = newLogFile("accessDenied", None, inProgress = false)
583+
writeFile(accessDenied, true, None,
584+
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None))
585+
val accessGranted = newLogFile("accessGranted", None, inProgress = false)
586+
writeFile(accessGranted, true, None,
587+
SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None),
588+
SparkListenerApplicationEnd(5L))
589+
val mockedFs = spy(provider.fs)
590+
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
591+
argThat(new ArgumentMatcher[Path]() {
592+
override def matches(path: Any): Boolean = {
593+
path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
594+
}
595+
}))
596+
val mockedProvider = spy(provider)
597+
when(mockedProvider.fs).thenReturn(mockedFs)
598+
updateAndCheck(mockedProvider) { list =>
599+
list.size should be(1)
600+
}
601+
writeFile(accessDenied, true, None,
602+
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None),
603+
SparkListenerApplicationEnd(5L))
604+
// Doing 2 times in order to check the blacklist filter too
605+
updateAndCheck(mockedProvider) { list =>
606+
list.size should be(1)
607+
}
608+
val accessDeniedPath = new Path(accessDenied.getPath)
609+
assert(mockedProvider.isBlacklisted(accessDeniedPath))
610+
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
611+
mockedProvider.cleanLogs()
612+
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
613+
}
614+
586615
/**
587616
* Asks the provider to check for logs and calls a function to perform checks on the updated
588617
* app list. Example:

0 commit comments

Comments
 (0)