Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions lib/compress/zstdmt_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ typedef struct ZSTDMT_bufferPool_s {
buffer_t bTable[1]; /* variable size */
} ZSTDMT_bufferPool;

static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbWorkers, ZSTD_customMem cMem)
static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_customMem cMem)
{
unsigned const maxNbBuffers = 2*nbWorkers + 3;
ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_customCalloc(
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
if (bufPool==NULL) return NULL;
Expand Down Expand Up @@ -160,9 +159,8 @@ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const
}


static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, U32 nbWorkers)
static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, unsigned maxNbBuffers)
{
unsigned const maxNbBuffers = 2*nbWorkers + 3;
if (srcBufPool==NULL) return NULL;
if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */
return srcBufPool;
Expand All @@ -171,7 +169,7 @@ static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool,
size_t const bSize = srcBufPool->bufferSize; /* forward parameters */
ZSTDMT_bufferPool* newBufPool;
ZSTDMT_freeBufferPool(srcBufPool);
newBufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
newBufPool = ZSTDMT_createBufferPool(maxNbBuffers, cMem);
if (newBufPool==NULL) return newBufPool;
ZSTDMT_setBufferSize(newBufPool, bSize);
return newBufPool;
Expand Down Expand Up @@ -263,6 +261,16 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
ZSTD_customFree(buf.start, bufPool->cMem);
}

/* We need 2 output buffers per worker since each dstBuff must be flushed after it is released.
* The 3 additional buffers are as follows:
* 1 buffer for input loading
* 1 buffer for "next input" when submitting current one
* 1 buffer stuck in queue */
#define BUF_POOL_MAX_NB_BUFFERS(nbWorkers) 2*nbWorkers + 3

/* After a worker releases its rawSeqStore, it is immediately ready for reuse.
* So we only need one seq buffer per worker. */
#define SEQ_POOL_MAX_NB_BUFFERS(nbWorkers) nbWorkers

/* ===== Seq Pool Wrapper ====== */

Expand Down Expand Up @@ -316,7 +324,7 @@ static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq)

static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
{
ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(SEQ_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);
if (seqPool == NULL) return NULL;
ZSTDMT_setNbSeq(seqPool, 0);
return seqPool;
Expand All @@ -329,7 +337,7 @@ static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)

static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers)
{
return ZSTDMT_expandBufferPool(pool, nbWorkers);
return ZSTDMT_expandBufferPool(pool, SEQ_POOL_MAX_NB_BUFFERS(nbWorkers));
}


Expand Down Expand Up @@ -936,7 +944,7 @@ MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers,
mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */
mtctx->jobIDMask = nbJobs - 1;
mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
mtctx->bufPool = ZSTDMT_createBufferPool(BUF_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
initError = ZSTDMT_serialState_init(&mtctx->serial);
Expand Down Expand Up @@ -1039,7 +1047,7 @@ static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
{
if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);
FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) , "");
mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, BUF_POOL_MAX_NB_BUFFERS(nbWorkers));
if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);
if (mtctx->cctxPool == NULL) return ERROR(memory_allocation);
Expand Down
5 changes: 4 additions & 1 deletion lib/compress/zstdmt_compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx);
* Private use only. Init streaming operation.
* expects params to be valid.
* must receive dict, or cdict, or none, but not both.
* mtctx can be freshly constructed or reused from a prior compression.
* If mtctx is reused, memory allocations from the prior compression may not be freed,
* even if they are not needed for the current compression.
* @return : 0, or an error code */
size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* mtctx,
const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params params, unsigned long long pledgedSrcSize);
Expand Down