Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
if (eventLoop == null) return // scheduler has already been stopped
logDebug("Stopping JobScheduler")

// First, stop receiving
receiverTracker.stop(processAllReceivedData)
if (receiverTracker != null) {
// First, stop receiving
receiverTracker.stop(processAllReceivedData)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NPE thrown when streaming context is stopped before recovery is complete

}

// Second, stop generating jobs. If it has to process all received data,
// then this will wait for all the processing through JobScheduler to be over.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.streaming.util

import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentSkipListSet
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NoteToSelf: Remove

import java.util.{Iterator => JIterator}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -126,11 +127,11 @@ private[streaming] class FileBasedWriteAheadLog(
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))

logFilesToRead.iterator.map { file =>
logFilesToRead.par.map { file =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an expensive operation - you'd end up running an O(n) operation to create a copy (in addition to the copy cost). Do we really need this? I am not entirely sure the copying adds a whole lot of value, considering that this array is not going to be very huge. Also note the additional cost to spin up threads (if the context does not already have them spun up).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harishreedharan I did several benchmarks with this. In a setting where you would have a 30 minute window operation, which corresponds to ~9000 files on S3 with closeFileAfterWrite = true, the current code takes about 15 minutes to recover. With this very simple addition, this time is reduced to 1.5 minutes (on a driver with 4 CPUs, probably with hyper-threading)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which threadpool / execution context is this par operation going to? You must not use the default system execution.

logDebug(s"Creating log reader with $file")
val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
}.flatten.asJava
}.flatten.toIterator.asJava
}

/**
Expand All @@ -149,27 +150,26 @@ private[streaming] class FileBasedWriteAheadLog(
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")

def deleteFiles() {
oldLogFiles.foreach { logInfo =>
try {
val path = new Path(logInfo.path)
val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
fs.delete(path, true)
synchronized { pastLogs -= logInfo }
logDebug(s"Cleared log file $logInfo")
} catch {
case ex: Exception =>
logWarning(s"Error clearing write ahead log file $logInfo", ex)
}
synchronized { pastLogs --= oldLogFiles }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a test to make sure that even if the delete is not successful, the recovery is robust, and will delete the file once the next cleanup request is sent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can merged these two synchronized blocks here and move the print later.

def deleteFile(walInfo: LogInfo): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why rename this to walInfo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logInfo is Spark's logging method

try {
val path = new Path(walInfo.path)
val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
fs.delete(path, true)
logDebug(s"Cleared log file $walInfo")
} catch {
case ex: Exception =>
logWarning(s"Error clearing write ahead log file $walInfo", ex)
}
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
}
if (!executionContext.isShutdown) {
val f = Future { deleteFiles() }
if (waitForCompletion) {
import scala.concurrent.duration._
Await.ready(f, 1 second)
oldLogFiles.foreach { logInfo =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: When waitForCompletion is true, this whole deletion is done one by one.
Might be better to compose all the Future.sequence into a single Future to wait. Then they will work in parallel.
Not a problem really as waitForCompletion is true only in tests. Do this only if there is any other critical feedback.

if (!executionContext.isShutdown) {
val f = Future { deleteFile(logInfo) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again should not use the default execution context. please make a execution context for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the execution context was defined implicitly in the class definition. Made it non-implicit for better readability

if (waitForCompletion) {
import scala.concurrent.duration._
Await.ready(f, 1 second)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.util

import java.io.{Closeable, EOFException}
import java.io.{IOException, Closeable, EOFException}
import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -55,6 +55,19 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
logDebug("Error reading next item, EOF reached", e)
close()
false
case e: IOException =>
logWarning("Error while trying to read data. If the file was deleted, " +
"this should be okay.", e)
close()
if (HdfsUtils.checkFileExists(path, conf)) {
// if file exists, this could be a legitimate error
throw e
} else {
// file was deleted. This can occur when the daemon cleanup thread takes time to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: file -> File, since this comment actually has multiple sentences, its weird to have the first one start with lower case.

// delete the file during recovery.
false
}

case e: Exception =>
logWarning("Error while trying to read data from HDFS.", e)
close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,11 @@ private[streaming] object HdfsUtils {
case _ => fs
}
}

/** Check if the file exists at the given path. */
def checkFileExists(path: String, conf: Configuration): Boolean = {
val hdpPath = new Path(path)
val fs = getFileSystemForPath(hdpPath, conf)
fs.isFile(hdpPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.util.{WriteAheadLogUtils, FileBasedWriteAheadLogReader}
import org.apache.spark.streaming.util.{WriteAheadLogSuite, WriteAheadLogUtils, FileBasedWriteAheadLogReader}
import org.apache.spark.streaming.util.WriteAheadLogSuite._
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}

Expand Down Expand Up @@ -207,6 +207,87 @@ class ReceivedBlockTrackerSuite
tracker1.isWriteAheadLogEnabled should be (false)
}

test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") {
val manualClock = new ManualClock
conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
val tracker = createTracker(clock = manualClock)

val addBlocks = generateBlockInfos()
val batch1 = addBlocks.slice(0, 1)
val batch2 = addBlocks.slice(1, 3)
val batch3 = addBlocks.slice(3, 6)

def advanceTime(): Unit = manualClock.advance(1000)

assert(getWriteAheadLogFiles().length === 0)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you added inline comments to explain each step, so that the reader can understand whats going on.

val start = manualClock.getTimeMillis()
manualClock.advance(500)
tracker.cleanupOldBatches(start, waitForCompletion = false)
assert(getWriteAheadLogFiles().length === 1)
advanceTime()
batch1.foreach(tracker.addBlock)
assert(getWriteAheadLogFiles().length === 1)
advanceTime()

val batch1Time = manualClock.getTimeMillis()
tracker.allocateBlocksToBatch(batch1Time)
advanceTime()

batch2.foreach { block =>
tracker.addBlock(block)
advanceTime()
}
assert(getWriteAheadLogFiles().length === 3)

advanceTime()

val batch2Time = manualClock.getTimeMillis()
tracker.allocateBlocksToBatch(batch2Time)

advanceTime()

assert(getWriteAheadLogFiles().length === 4)
tracker.cleanupOldBatches(batch1Time, waitForCompletion = true)
assert(getWriteAheadLogFiles().length === 3)

batch3.foreach { block =>
tracker.addBlock(block)
advanceTime()
}
val batch3Time = manualClock.getTimeMillis()
tracker.allocateBlocksToBatch(batch3Time)

advanceTime()
assert(getWriteAheadLogFiles().length === 4)
advanceTime()
tracker.cleanupOldBatches(batch2Time, waitForCompletion = true)
assert(getWriteAheadLogFiles().length === 3)

def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = {
subject.getBlocksOfBatchAndStream(batch3Time, streamId) should be(
base.getBlocksOfBatchAndStream(batch3Time, streamId))
subject.getBlocksOfBatchAndStream(batch2Time, streamId) should be(
base.getBlocksOfBatchAndStream(batch2Time, streamId))
subject.getBlocksOfBatchAndStream(batch1Time, streamId) should be(Nil)
}

val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock)
compareTrackers(tracker, tracker2)

WriteAheadLogSuite.writeEventsUsingWriter(getLogFileName(start), Seq(createBatchCleanup(start)))

assert(getWriteAheadLogFiles().length === 4)
val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock)
compareTrackers(tracker, tracker3)
WriteAheadLogSuite.writeEventsUsingWriter(getLogFileName(batch1Time),
Seq(createBatchAllocation(batch1Time, batch1)))
assert(getWriteAheadLogFiles().length === 5)
val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock)
compareTrackers(tracker, tracker4)
}

/**
* Create tracker object with the optional provided clock. Use fake clock if you
* want to control time by manually incrementing it to test log clean.
Expand All @@ -233,6 +314,12 @@ class ReceivedBlockTrackerSuite
getWrittenLogData(Seq(logFile))
}

/** Get the log file name for the given log start time. */
def getLogFileName(time: Long, rollingIntervalSecs: Int = 1): String = {
checkpointDirectory.toString + File.separator + "receivedBlockMetadata" +
File.separator + s"log-$time-${time + rollingIntervalSecs * 1000}"
}

/**
* Get all the data written in the given write ahead log files. By default, it will read all
* files in the test log directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path
import org.scalatest.concurrent.Eventually._
import org.scalatest.BeforeAndAfter

import org.apache.spark.streaming.scheduler.ReceivedBlockTrackerLogEvent
import org.apache.spark.util.{ManualClock, Utils}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}

Expand Down Expand Up @@ -366,6 +367,21 @@ object WriteAheadLogSuite {
segments
}

/**
* Write received block tracker events to a file using the writer class and return an array of
* the file segments written.
*/
def writeEventsUsingWriter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is related to ReceivedBlockTracker and should not be in this file, as this file is totally independent. Move this method to ReceivedBlockTrackerSuite.

filePath: String,
events: Seq[ReceivedBlockTrackerLogEvent]): Seq[FileBasedWriteAheadLogSegment] = {
val writer = new FileBasedWriteAheadLogWriter(filePath, hadoopConf)
val segments = events.map {
item => writer.write(ByteBuffer.wrap(Utils.serialize(item)))
}
writer.close()
segments
}

/** Write data to rotating files in log directory using the WriteAheadLog class. */
def writeDataUsingWriteAheadLog(
logDirectory: String,
Expand Down