Skip to content

Commit 9b6dfed

Browse files
committed
Documentation and minor refactor to clarify MT memory management.
1 parent f4a552a commit 9b6dfed

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

lib/compress/zstdmt_compress.c

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,8 @@ typedef struct ZSTDMT_bufferPool_s {
102102
buffer_t bTable[1]; /* variable size */
103103
} ZSTDMT_bufferPool;
104104

105-
static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbWorkers, ZSTD_customMem cMem)
105+
static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_customMem cMem)
106106
{
107-
unsigned const maxNbBuffers = 2*nbWorkers + 3;
108107
ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_customCalloc(
109108
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
110109
if (bufPool==NULL) return NULL;
@@ -160,9 +159,8 @@ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const
160159
}
161160

162161

163-
static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, U32 nbWorkers)
162+
static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, unsigned maxNbBuffers)
164163
{
165-
unsigned const maxNbBuffers = 2*nbWorkers + 3;
166164
if (srcBufPool==NULL) return NULL;
167165
if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */
168166
return srcBufPool;
@@ -171,7 +169,7 @@ static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool,
171169
size_t const bSize = srcBufPool->bufferSize; /* forward parameters */
172170
ZSTDMT_bufferPool* newBufPool;
173171
ZSTDMT_freeBufferPool(srcBufPool);
174-
newBufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
172+
newBufPool = ZSTDMT_createBufferPool(maxNbBuffers, cMem);
175173
if (newBufPool==NULL) return newBufPool;
176174
ZSTDMT_setBufferSize(newBufPool, bSize);
177175
return newBufPool;
@@ -263,6 +261,16 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
263261
ZSTD_customFree(buf.start, bufPool->cMem);
264262
}
265263

264+
/* We need 2 output buffers per worker since each dstBuff must be flushed after it is released.
265+
* The 3 additional buffers are as follows:
266+
* 1 buffer for input loading
267+
* 1 buffer for "next input" when submitting current one
268+
* 1 buffer stuck in queue */
269+
#define BUF_POOL_MAX_NB_BUFFERS(nbWorkers) 2*nbWorkers + 3
270+
271+
/* After a worker releases its rawSeqStore, it is immediately ready for reuse.
272+
* So we only need one seq buffer per worker. */
273+
#define SEQ_POOL_MAX_NB_BUFFERS(nbWorkers) nbWorkers
266274

267275
/* ===== Seq Pool Wrapper ====== */
268276

@@ -316,7 +324,7 @@ static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq)
316324

317325
static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
318326
{
319-
ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
327+
ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(SEQ_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);
320328
if (seqPool == NULL) return NULL;
321329
ZSTDMT_setNbSeq(seqPool, 0);
322330
return seqPool;
@@ -329,7 +337,7 @@ static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)
329337

330338
static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers)
331339
{
332-
return ZSTDMT_expandBufferPool(pool, nbWorkers);
340+
return ZSTDMT_expandBufferPool(pool, SEQ_POOL_MAX_NB_BUFFERS(nbWorkers));
333341
}
334342

335343

@@ -936,7 +944,7 @@ MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers,
936944
mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
937945
assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */
938946
mtctx->jobIDMask = nbJobs - 1;
939-
mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
947+
mtctx->bufPool = ZSTDMT_createBufferPool(BUF_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);
940948
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
941949
mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
942950
initError = ZSTDMT_serialState_init(&mtctx->serial);
@@ -1039,7 +1047,7 @@ static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
10391047
{
10401048
if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);
10411049
FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) , "");
1042-
mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
1050+
mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, BUF_POOL_MAX_NB_BUFFERS(nbWorkers));
10431051
if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
10441052
mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);
10451053
if (mtctx->cctxPool == NULL) return ERROR(memory_allocation);

lib/compress/zstdmt_compress.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,11 @@ size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx);
6565
* Private use only. Init streaming operation.
6666
* expects params to be valid.
6767
* must receive dict, or cdict, or none, but not both.
68+
* mtctx can be freshly constructed or reused from a prior compression.
69+
* If mtctx is reused, memory allocations from the prior compression may not be freed,
70+
* even if they are not needed for the current compression.
6871
* @return : 0, or an error code */
69-
size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
72+
size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* mtctx,
7073
const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType,
7174
const ZSTD_CDict* cdict,
7275
ZSTD_CCtx_params params, unsigned long long pledgedSrcSize);

0 commit comments

Comments
 (0)