Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
}

private def putStateIntoStateCache(newVersion: Long, map: MapType): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putStateIntoStateCache -> cacheMap, to keep consistent with loadedMaps etc.

if (numberOfVersionsToRetainInMemory <= 0) {
if (loadedMaps.size() > 0) loadedMaps.clear()
return
}

while (loadedMaps.size() > numberOfVersionsToRetainInMemory) {
loadedMaps.remove(loadedMaps.lastKey())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,36 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
require(!StateStore.isMaintenanceRunning)
}

test("retaining only latest configured size of versions in memory") {
val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
numOfVersToRetainInMemory = 3)

var currentVersion = 0
def updateVersionTo(targetVersion: Int): Unit = {
for (i <- currentVersion + 1 to targetVersion) {
val store = provider.getStore(currentVersion)
put(store, "a", i)
store.commit()
currentVersion += 1
}
require(currentVersion === targetVersion)
def updateVersionTo(provider: StateStoreProvider, currentVersion: => Int,
targetVersion: Int): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is incorrect indenting by spark style guide. should be

def updateVersionTo(
    provider: StateStoreProvider, 
    currentVersion: => Int, 
    targetVersion: Int): Int = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you using => Int for currentVersion instead of simply using Int?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, since you are frequently incrementing the version by 1 (i.e. targetVersion = currentVersion + 1, always), you can add another convenience method called incrementVersion(provider, currentVersion)

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jul 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for correcting style guide. Will fix.
Regarding currentVersion: => Int is somehow I was trying to modify currentVersion itself, and stick with current approach but didn't roll back. Will fix.
And I agree it would be better to have incrementVersion to shorter the code. Will address.

var newCurrentVersion = currentVersion
for (i <- newCurrentVersion + 1 to targetVersion) {
val store = provider.getStore(newCurrentVersion)
put(store, "a", i)
store.commit()
newCurrentVersion += 1
}
require(newCurrentVersion === targetVersion)
newCurrentVersion
}

test("retaining only two latest versions when MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 2") {
val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
numOfVersToRetainInMemory = 2)

def restoreOriginValues(map: provider.MapType): Map[String, Int] = {
map.asScala.map(entry => rowToString(entry._1) -> rowToInt(entry._2)).toMap
}

updateVersionTo(1)
var currentVersion = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please add comments on each section here to make it clear what are you testing

currentVersion = updateVersionTo(provider, currentVersion, 1)
assert(getData(provider) === Set("a" -> 1))
var loadedMaps = provider.getClonedLoadedMaps()
assert(loadedMaps.size() === 1)
assert(loadedMaps.firstKey() === 1L)
assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1))

updateVersionTo(2)
currentVersion = updateVersionTo(provider, currentVersion, 2)
assert(getData(provider) === Set("a" -> 2))
loadedMaps = provider.getClonedLoadedMaps()
assert(loadedMaps.size() === 2)
Expand All @@ -99,43 +102,84 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2))
assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1))

updateVersionTo(3)
// this trigger exceeding cache and 1 will be evicted
currentVersion = updateVersionTo(provider, currentVersion, 3)
assert(getData(provider) === Set("a" -> 3))
loadedMaps = provider.getClonedLoadedMaps()
assert(loadedMaps.size() === 3)
assert(loadedMaps.size() === 2)
assert(loadedMaps.firstKey() === 3L)
assert(loadedMaps.lastKey() === 1L)
assert(loadedMaps.lastKey() === 2L)
assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3))
assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be boiled down to a convenience method as well to reduce the verbosity def checkVersion(version: Int, expectedData: Map[String, Int])

}

test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 1") {
val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
numOfVersToRetainInMemory = 1)

var currentVersion = 0

def restoreOriginValues(map: provider.MapType): Map[String, Int] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just allowed redundant function definition cause there's no way to use provider.MapType in parameter type unless provider is defined. If we really want to get rid of redundant function definition, we may have to change it to ConcurrentMap directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 one using making it ConcurrentMap. Maybe even better, you can use scala implicit classses to add methods to HDFSBackedStateStoreProvider

implicit class ProviderHelper(provider: StateStoreProvider) {
   def toStringIntMap(): Map[String, Int] = { .... }
}

This should avoid this problem. Either way, I hate having these duplicate methods, so we should fix it one way or the other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, if you make the convenience method checkVersion i mentioned above, you may not have to do this at all.

map.asScala.map(entry => rowToString(entry._1) -> rowToInt(entry._2)).toMap
}

currentVersion = updateVersionTo(provider, currentVersion, 1)
assert(getData(provider) === Set("a" -> 1))
var loadedMaps = provider.getClonedLoadedMaps()
assert(loadedMaps.size() === 1)
assert(loadedMaps.firstKey() === 1L)
assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1))

// this trigger exceeding cache and 1 will be evicted
updateVersionTo(4)
assert(getData(provider) === Set("a" -> 4))
currentVersion = updateVersionTo(provider, currentVersion, 2)
assert(getData(provider) === Set("a" -> 2))
loadedMaps = provider.getClonedLoadedMaps()
assert(loadedMaps.size() === 3)
assert(loadedMaps.firstKey() === 4L)
assert(loadedMaps.lastKey() === 2L)
assert(restoreOriginValues(loadedMaps.get(4L)) === Map("a" -> 4))
assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3))
// now version 1 is evicted and not stored in cache
// this fact ensures cache miss will occur when this partition succeeds commit
// but there's a failure afterwards so have to reprocess previous batch
assert(loadedMaps.size() === 1)
assert(loadedMaps.firstKey() === 2L)
assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2))

// suppose there has been failure after committing, and it decided to reprocess previous batch
currentVersion = 1

val store = provider.getStore(currentVersion)
// negative value to represent reprocessing
put(store, "a", -2)
store.commit()
currentVersion += 1

// make sure newly committed version is reflected to the cache (overwritten)
assert(getData(provider) === Set("a" -> -2))
loadedMaps = provider.getClonedLoadedMaps()
assert(loadedMaps.size() === 1)
assert(loadedMaps.firstKey() === 2L)
assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> -2))
}

test("no cache data with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 0") {
val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
numOfVersToRetainInMemory = 0)

var currentVersion = 0

currentVersion = updateVersionTo(provider, currentVersion, 1)
assert(getData(provider) === Set("a" -> 1))
var loadedMaps = provider.getClonedLoadedMaps()
assert(loadedMaps.size() === 0)

currentVersion = updateVersionTo(provider, currentVersion, 2)
assert(getData(provider) === Set("a" -> 2))
loadedMaps = provider.getClonedLoadedMaps()
assert(loadedMaps.size() === 0)
}

test("snapshotting") {
val provider = newStoreProvider(opId = Random.nextInt, partition = 0, minDeltasForSnapshot = 5)

var currentVersion = 0
def updateVersionTo(targetVersion: Int): Unit = {
for (i <- currentVersion + 1 to targetVersion) {
val store = provider.getStore(currentVersion)
put(store, "a", i)
store.commit()
currentVersion += 1
}
require(currentVersion === targetVersion)
}

updateVersionTo(2)
currentVersion = updateVersionTo(provider, currentVersion, 2)
require(getData(provider) === Set("a" -> 2))
provider.doMaintenance() // should not generate snapshot files
assert(getData(provider) === Set("a" -> 2))
Expand All @@ -146,7 +190,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}

// After version 6, snapshotting should generate one snapshot file
updateVersionTo(6)
currentVersion = updateVersionTo(provider, currentVersion, 6)
require(getData(provider) === Set("a" -> 6), "store not updated correctly")
provider.doMaintenance() // should generate snapshot files

Expand All @@ -161,7 +205,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
"snapshotting messed up the data of the final version")

// After version 20, snapshotting should generate newer snapshot files
updateVersionTo(20)
currentVersion = updateVersionTo(provider, currentVersion, 20)
require(getData(provider) === Set("a" -> 20), "store not updated correctly")
provider.doMaintenance() // do snapshot

Expand Down