diff --git a/programs/fileio.c b/programs/fileio.c index 4c1331432e2..15a0d5387cb 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -1301,6 +1301,9 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, unsigned inputPresented = 0; unsigned inputBlocked = 0; unsigned lastJobID = 0; + UTIL_time_t lastAdaptTime = UTIL_getTime(); + U64 const adaptEveryMicro = REFRESH_RATE; + UTIL_HumanReadableSize_t const file_hrs = UTIL_makeHumanReadableSize(fileSize); DISPLAYLEVEL(6, "compression using zstd format \n"); @@ -1369,14 +1372,106 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, compressedfilesize += outBuff.pos; } - /* display notification; and adapt compression level */ - if (READY_FOR_UPDATE()) { + /* adaptive mode : statistics measurement and speed correction */ + if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) { + ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); + + lastAdaptTime = UTIL_getTime(); + + /* check output speed */ + if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */ + + unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced; + unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed; + assert(zfp.produced >= previous_zfp_update.produced); + assert(prefs->nbWorkers >= 1); + + /* test if compression is blocked + * either because output is slow and all buffers are full + * or because input is slow and no job can start while waiting for at least one buffer to be filled. + * note : exclude starting part, since currentJobID > 1 */ + if ( (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/ + && (zfp.nbActiveWorkers == 0) /* confirmed : no compression ongoing */ + ) { + DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n") + speedChange = slower; + } + + previous_zfp_update = zfp; + + if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */ + && (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */ + ) { + DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed); + speedChange = slower; + } + flushWaiting = 0; + } + + /* course correct only if there is at least one new job completed */ + if (zfp.currentJobID > lastJobID) { + DISPLAYLEVEL(6, "compression level adaptation check \n") + + /* check input speed */ + if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) { /* warm up period, to fill all workers */ + if (inputBlocked <= 0) { + DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n"); + speedChange = slower; + } else if (speedChange == noChange) { + unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested; + unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed; + unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced; + unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed; + previous_zfp_correction = zfp; + assert(inputPresented > 0); + DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n", + inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100, + (unsigned)newlyIngested, (unsigned)newlyConsumed, + (unsigned)newlyFlushed, (unsigned)newlyProduced); + if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */ + && (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */ + && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */ + ) { + DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n", + newlyIngested, newlyConsumed, newlyProduced, newlyFlushed); + speedChange = faster; + } + } + inputBlocked = 0; + inputPresented = 0; + } + + if (speedChange == slower) { + DISPLAYLEVEL(6, "slower speed , higher compression \n") + compressionLevel ++; + if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel(); + if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel; + compressionLevel += (compressionLevel == 0); /* skip 0 */ + ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel); + } + if (speedChange == faster) { + DISPLAYLEVEL(6, "faster speed , lighter compression \n") + compressionLevel --; + if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel; + compressionLevel -= (compressionLevel == 0); /* skip 0 */ + ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel); + } + speedChange = noChange; + + lastJobID = zfp.currentJobID; + } /* if (zfp.currentJobID > lastJobID) */ + } /* if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) */ + + /* display notification */ + if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) { ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100; UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed); UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed); UTIL_HumanReadableSize_t const produced_hrs = UTIL_makeHumanReadableSize(zfp.produced); + DELAY_NEXT_UPDATE(); + /* display progress notifications */ DISPLAY_PROGRESS("\r%79s\r", ""); /* Clear out the current displayed line */ if (g_display_prefs.displayLevel >= 3) { @@ -1406,96 +1501,8 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, if (fileSize != UTIL_FILESIZE_UNKNOWN) DISPLAY_PROGRESS("/%6.*f%4s", file_hrs.precision, file_hrs.value, file_hrs.suffix); DISPLAY_PROGRESS(" ==> %2.f%%", cShare); - DELAY_NEXT_UPDATE(); } - - /* adaptive mode : statistics measurement and speed correction */ - if (prefs->adaptiveMode) { - - /* check output speed */ - if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */ - - unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced; - unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed; - assert(zfp.produced >= previous_zfp_update.produced); - assert(prefs->nbWorkers >= 1); - - /* test if compression is blocked - * either because output is slow and all buffers are full - * or because input is slow and no job can start while waiting for at least one buffer to be filled. - * note : exclude starting part, since currentJobID > 1 */ - if ( (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/ - && (zfp.nbActiveWorkers == 0) /* confirmed : no compression ongoing */ - ) { - DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n") - speedChange = slower; - } - - previous_zfp_update = zfp; - - if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */ - && (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */ - ) { - DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed); - speedChange = slower; - } - flushWaiting = 0; - } - - /* course correct only if there is at least one new job completed */ - if (zfp.currentJobID > lastJobID) { - DISPLAYLEVEL(6, "compression level adaptation check \n") - - /* check input speed */ - if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) { /* warm up period, to fill all workers */ - if (inputBlocked <= 0) { - DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n"); - speedChange = slower; - } else if (speedChange == noChange) { - unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested; - unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed; - unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced; - unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed; - previous_zfp_correction = zfp; - assert(inputPresented > 0); - DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n", - inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100, - (unsigned)newlyIngested, (unsigned)newlyConsumed, - (unsigned)newlyFlushed, (unsigned)newlyProduced); - if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */ - && (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */ - && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */ - ) { - DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n", - newlyIngested, newlyConsumed, newlyProduced, newlyFlushed); - speedChange = faster; - } - } - inputBlocked = 0; - inputPresented = 0; - } - - if (speedChange == slower) { - DISPLAYLEVEL(6, "slower speed , higher compression \n") - compressionLevel ++; - if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel(); - if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel; - compressionLevel += (compressionLevel == 0); /* skip 0 */ - ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel); - } - if (speedChange == faster) { - DISPLAYLEVEL(6, "faster speed , lighter compression \n") - compressionLevel --; - if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel; - compressionLevel -= (compressionLevel == 0); /* skip 0 */ - ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel); - } - speedChange = noChange; - - lastJobID = zfp.currentJobID; - } /* if (zfp.currentJobID > lastJobID) */ - } /* if (g_adaptiveMode) */ - } /* if (READY_FOR_UPDATE()) */ + } /* if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) */ } /* while ((inBuff.pos != inBuff.size) */ } while (directive != ZSTD_e_end); diff --git a/programs/fileio_common.h b/programs/fileio_common.h index b82d5b6b16b..aec2e8d56fa 100644 --- a/programs/fileio_common.h +++ b/programs/fileio_common.h @@ -38,11 +38,11 @@ extern FIO_display_prefs_t g_display_prefs; extern UTIL_time_t g_displayClock; #define REFRESH_RATE ((U64)(SEC_TO_MICRO / 6)) -#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > REFRESH_RATE) +#define READY_FOR_UPDATE() (UTIL_clockSpanMicro(g_displayClock) > REFRESH_RATE || g_display_prefs.displayLevel >= 4) #define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); } #define DISPLAYUPDATE(l, ...) { \ if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \ - if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \ + if (READY_FOR_UPDATE()) { \ DELAY_NEXT_UPDATE(); \ DISPLAY(__VA_ARGS__); \ if (g_display_prefs.displayLevel>=4) fflush(stderr); \ diff --git a/tests/cli-tests/compression/adapt.sh b/tests/cli-tests/compression/adapt.sh index 564e955b5ea..30b9afaa03b 100755 --- a/tests/cli-tests/compression/adapt.sh +++ b/tests/cli-tests/compression/adapt.sh @@ -4,3 +4,11 @@ set -e # Test --adapt zstd -f file --adapt -c | zstd -t + +datagen -g100M > file100M + +# Pick parameters to force fast adaptation, even on slow systems +zstd --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression" + +# Adaption still happens with --no-progress +zstd --no-progress --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"