Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions app/src/main/java/com/github/libretube/repo/LocalFeedRepository.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,32 @@ class LocalFeedRepository : FeedRepository {
if (channelIds.isEmpty()) return

val totalExtractionCount = AtomicInteger()
val chunkedExtractionCount = AtomicInteger()
val channelExtractionCount = AtomicInteger()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could possibly add some small comments about the different atomic integers and their difference here, but that's not that important

withContext(Dispatchers.Main) {
onProgressUpdate(FeedProgress(0, channelIds.size))
}

for (channelIdChunk in channelIds.chunked(CHUNK_SIZE)) {
// add a delay after each BATCH_SIZE amount of visited channels
val count = chunkedExtractionCount.get();
val count = channelExtractionCount.get();
if (count >= BATCH_SIZE) {
delay(BATCH_DELAY.random())
chunkedExtractionCount.set(0)
// add a delay after each BATCH_SIZE amount of fully-fetched channels
delay(CHANNEL_BATCH_DELAY.random())
channelExtractionCount.set(0)
}

val collectedFeedItems = channelIdChunk.parallelMap { channelId ->
try {
getRelatedStreams(channelId, minimumDateMillis)
getRelatedStreams(channelId, minimumDateMillis).also {
if (it.isNotEmpty())
// increase counter if we had to fully fetch the channel
channelExtractionCount.incrementAndGet()
}
} catch (e: Exception) {
Log.e(channelId, e.stackTraceToString())
null
} finally {
chunkedExtractionCount.incrementAndGet()
val currentProgress = totalExtractionCount.incrementAndGet()

withContext(Dispatchers.Main) {
onProgressUpdate(FeedProgress(currentProgress, channelIds.size))
onProgressUpdate(FeedProgress(totalExtractionCount.incrementAndGet(), channelIds.size))
}
}
}.filterNotNull().flatten().map(StreamItem::toFeedItem)
Expand Down Expand Up @@ -159,17 +160,26 @@ class LocalFeedRepository : FeedRepository {
}

companion object {
private const val CHUNK_SIZE = 2
/**
* Amount of feeds that are fetched concurrently.
*
* Should ideally be a factor of `BATCH_SIZE` to be correctly applied.
*/
private const val CHUNK_SIZE = 5

/**
* Maximum amount of feeds that should be fetched together, before a delay should be applied.
*/
private const val BATCH_SIZE = 50

/**
* Millisecond delay between two consecutive batches to avoid throttling.
* Millisecond delay after fetching `BATCH_SIZE` channels to avoid throttling.
*
* A channel is only counted as fetched when it had a recent upload, requiring to fetch
* the channelInfo via Innertube.
*/
private val BATCH_DELAY = (500L..1500L)
private val CHANNEL_BATCH_DELAY = (500L..1500L)

private const val MAX_FEED_AGE_DAYS = 30L // 30 days
}
}
Loading