Skip to content

Commit db2fa60

Browse files
committed
Let 'commitTime' be independent with modificationTime
1 parent 8564ab4 commit db2fa60

2 files changed

Lines changed: 21 additions & 21 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ case class SinkFileStatus(
5757
blockReplication: Int,
5858
blockSize: Long,
5959
action: String) {
60-
// use modification time if we don't know about exact commit time
61-
this(path, size, isDir, modificationTime, blockReplication, blockSize, action, modificationTime)
60+
// use Long.MaxValue if we don't know about exact commit time, which means they will not evicted
61+
this(path, size, isDir, modificationTime, blockReplication, blockSize, action, Long.MaxValue)
6262
}
6363

6464
def toFileStatus: FileStatus = {
@@ -69,15 +69,14 @@ case class SinkFileStatus(
6969

7070
object SinkFileStatus {
7171
def apply(f: FileStatus): SinkFileStatus = {
72-
SinkFileStatus(
73-
path = f.getPath.toUri.toString,
74-
size = f.getLen,
75-
isDir = f.isDirectory,
76-
modificationTime = f.getModificationTime,
77-
blockReplication = f.getReplication,
78-
blockSize = f.getBlockSize,
79-
action = FileStreamSinkLog.ADD_ACTION,
80-
commitTime = f.getModificationTime)
72+
new SinkFileStatus(
73+
f.getPath.toUri.toString,
74+
f.getLen,
75+
f.isDirectory,
76+
f.getModificationTime,
77+
f.getReplication,
78+
f.getBlockSize,
79+
FileStreamSinkLog.ADD_ACTION)
8180
}
8281
}
8382

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -239,17 +239,18 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
239239
}
240240

241241
test("read Spark 2.1.0 log format") {
242+
val maxLong = Long.MaxValue
242243
assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
243244
// SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted
244-
SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION, 100),
245-
SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION, 200),
246-
SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION, 300),
247-
SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION, 400),
248-
SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION, 500),
249-
SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION, 600),
250-
SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION, 700),
251-
SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION, 800),
252-
SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION, 900)
245+
SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
246+
SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
247+
SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
248+
SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
249+
SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
250+
SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
251+
SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
252+
SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
253+
SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION, maxLong)
253254
))
254255
}
255256

@@ -258,7 +259,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
258259
* in SinkFileStatus.
259260
*/
260261
private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus =
261-
newFakeSinkFileStatus(path, action, 100L)
262+
newFakeSinkFileStatus(path, action, Long.MaxValue)
262263

263264
/**
264265
* Create a fake SinkFileStatus using path and action, and commit time.

0 commit comments

Comments
 (0)