Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ class FeedLoadManager(private val context: Context) {
else -> feedDatabaseManager.outdatedSubscriptionsForGroup(groupId, outdatedThreshold)
}

// like `currentProgress`, but counts the number of extractions that have begun, so they
// can be properly throttled every once in a while (see doOnNext below)
val extractionCount = AtomicInteger()

return outdatedSubscriptions
.take(1)
.doOnNext {
Expand All @@ -105,6 +109,13 @@ class FeedLoadManager(private val context: Context) {
.observeOn(Schedulers.io())
.flatMap { Flowable.fromIterable(it) }
.takeWhile { !cancelSignal.get() }
.doOnNext {
// throttle extractions once every BATCH_SIZE to avoid being throttled
val previousCount = extractionCount.getAndIncrement()
if (previousCount != 0 && previousCount % BATCH_SIZE == 0) {
Thread.sleep(DELAY_BETWEEN_BATCHES_MILLIS.random())
}
}
.parallel(PARALLEL_EXTRACTIONS, PARALLEL_EXTRACTIONS * 2)
.runOn(Schedulers.io(), PARALLEL_EXTRACTIONS * 2)
.filter { !cancelSignal.get() }
Expand Down Expand Up @@ -328,7 +339,19 @@ class FeedLoadManager(private val context: Context) {
/**
* How many extractions will be running in parallel.
*/
private const val PARALLEL_EXTRACTIONS = 6
private const val PARALLEL_EXTRACTIONS = 3

/**
* How many extractions to perform before waiting [DELAY_BETWEEN_BATCHES_MILLIS] to avoid
* being rate limited
*/
private const val BATCH_SIZE = 50

/**
* Wait a random delay in this range once every [BATCH_SIZE] extractions to avoid being
* rate limited
*/
private val DELAY_BETWEEN_BATCHES_MILLIS = (6000L..12000L)

/**
* Number of items to buffer to mass-insert in the database.
Expand Down