Skip to content

Commit 1598e6c

Browse files
authored
Async write for decompression (#2975)
* Async IO decompression: - Added --[no-]asyncio flag for CLI decompression. - Replaced dstBuffer in decompression with a pool of write jobs. - Added an ability to execute write jobs in a separate thread. - Added an ability to wait (join) on all jobs in a thread pool (queued and running).
1 parent 2f03c19 commit 1598e6c

File tree

7 files changed

+390
-99
lines changed

7 files changed

+390
-99
lines changed

build/meson/programs/meson.build

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,24 @@ zstd_programs_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'),
2020
join_paths(zstd_rootdir, 'programs/dibio.c'),
2121
join_paths(zstd_rootdir, 'programs/zstdcli_trace.c'),
2222
# needed due to use of private symbol + -fvisibility=hidden
23-
join_paths(zstd_rootdir, 'lib/common/xxhash.c')]
23+
join_paths(zstd_rootdir, 'lib/common/xxhash.c'),
24+
join_paths(zstd_rootdir, 'lib/common/pool.c'),
25+
join_paths(zstd_rootdir, 'lib/common/zstd_common.c'),
26+
join_paths(zstd_rootdir, 'lib/common/error_private.c')]
2427

28+
zstd_deps = [ libzstd_dep ]
2529
zstd_c_args = libzstd_debug_cflags
30+
31+
zstd_frugal_deps = [ libzstd_dep ]
32+
zstd_frugal_c_args = [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ]
33+
2634
if use_multi_thread
35+
zstd_deps += [ thread_dep ]
2736
zstd_c_args += [ '-DZSTD_MULTITHREAD' ]
37+
zstd_frugal_deps += [ thread_dep ]
38+
zstd_frugal_c_args += [ '-DZSTD_MULTITHREAD' ]
2839
endif
2940

30-
zstd_deps = [ libzstd_dep ]
3141
if use_zlib
3242
zstd_deps += [ zlib_dep ]
3343
zstd_c_args += [ '-DZSTD_GZCOMPRESS', '-DZSTD_GZDECOMPRESS' ]
@@ -69,14 +79,17 @@ zstd = executable('zstd',
6979
zstd_frugal_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'),
7080
join_paths(zstd_rootdir, 'programs/timefn.c'),
7181
join_paths(zstd_rootdir, 'programs/util.c'),
72-
join_paths(zstd_rootdir, 'programs/fileio.c')]
82+
join_paths(zstd_rootdir, 'programs/fileio.c'),
83+
join_paths(zstd_rootdir, 'lib/common/pool.c'),
84+
join_paths(zstd_rootdir, 'lib/common/zstd_common.c'),
85+
join_paths(zstd_rootdir, 'lib/common/error_private.c')]
7386

7487
# Minimal target, with only zstd compression and decompression.
7588
# No bench. No legacy.
7689
executable('zstd-frugal',
7790
zstd_frugal_sources,
78-
dependencies: libzstd_dep,
79-
c_args: [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ],
91+
dependencies: zstd_frugal_deps,
92+
c_args: zstd_frugal_c_args,
8093
install: true)
8194

8295
install_data(join_paths(zstd_rootdir, 'programs/zstdgrep'),

lib/common/pool.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,7 @@ static void* POOL_thread(void* opaque) {
9696
/* If the intended queue size was 0, signal after finishing job */
9797
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
9898
ctx->numThreadsBusy--;
99-
if (ctx->queueSize == 1) {
100-
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
101-
}
99+
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
102100
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
103101
}
104102
} /* for (;;) */
@@ -190,6 +188,17 @@ void POOL_free(POOL_ctx *ctx) {
190188
ZSTD_customFree(ctx, ctx->customMem);
191189
}
192190

191+
/*! POOL_joinJobs() :
192+
* Waits for all queued jobs to finish executing.
193+
*/
194+
void POOL_joinJobs(POOL_ctx* ctx) {
195+
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
196+
while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
197+
ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
198+
}
199+
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
200+
}
201+
193202
void ZSTD_freeThreadPool (ZSTD_threadPool* pool) {
194203
POOL_free (pool);
195204
}
@@ -330,6 +339,11 @@ void POOL_free(POOL_ctx* ctx) {
330339
(void)ctx;
331340
}
332341

342+
void POOL_joinJobs(POOL_ctx* ctx){
343+
assert(!ctx || ctx == &g_poolCtx);
344+
(void)ctx;
345+
}
346+
333347
int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
334348
(void)ctx; (void)numThreads;
335349
return 0;

lib/common/pool.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
3838
*/
3939
void POOL_free(POOL_ctx* ctx);
4040

41+
42+
/*! POOL_joinJobs() :
43+
* Waits for all queued jobs to finish executing.
44+
*/
45+
void POOL_joinJobs(POOL_ctx* ctx);
46+
4147
/*! POOL_resize() :
4248
* Expands or shrinks pool's number of threads.
4349
* This is more efficient than releasing + creating a new context,

0 commit comments

Comments
 (0)