Skip to content

Commit 15f32ad

Browse files
committed
[fileio] Separate parameter adaption from display update rate
Split the logic for parameter adaption from the logic to update the display rate. This decouples the two updates, so changes to display updates don't affect parameter adaption. Also add a test case that checks that parameter adaption actually happens. This fixes Issue #3353, where --adapt is broken when --no-progress is passed.
1 parent a78c91a commit 15f32ad

File tree

3 files changed

+108
-93
lines changed

3 files changed

+108
-93
lines changed

programs/fileio.c

Lines changed: 98 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1301,6 +1301,9 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
13011301
unsigned inputPresented = 0;
13021302
unsigned inputBlocked = 0;
13031303
unsigned lastJobID = 0;
1304+
UTIL_time_t lastAdaptTime = UTIL_getTime();
1305+
U64 const adaptEveryMicro = REFRESH_RATE;
1306+
13041307
UTIL_HumanReadableSize_t const file_hrs = UTIL_makeHumanReadableSize(fileSize);
13051308

13061309
DISPLAYLEVEL(6, "compression using zstd format \n");
@@ -1369,14 +1372,106 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
13691372
compressedfilesize += outBuff.pos;
13701373
}
13711374

1372-
/* display notification; and adapt compression level */
1373-
if (READY_FOR_UPDATE()) {
1375+
/* adaptive mode : statistics measurement and speed correction */
1376+
if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) {
1377+
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
1378+
1379+
lastAdaptTime = UTIL_getTime();
1380+
1381+
/* check output speed */
1382+
if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */
1383+
1384+
unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
1385+
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
1386+
assert(zfp.produced >= previous_zfp_update.produced);
1387+
assert(prefs->nbWorkers >= 1);
1388+
1389+
/* test if compression is blocked
1390+
* either because output is slow and all buffers are full
1391+
* or because input is slow and no job can start while waiting for at least one buffer to be filled.
1392+
* note : exclude starting part, since currentJobID > 1 */
1393+
if ( (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
1394+
&& (zfp.nbActiveWorkers == 0) /* confirmed : no compression ongoing */
1395+
) {
1396+
DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
1397+
speedChange = slower;
1398+
}
1399+
1400+
previous_zfp_update = zfp;
1401+
1402+
if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
1403+
&& (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
1404+
) {
1405+
DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
1406+
speedChange = slower;
1407+
}
1408+
flushWaiting = 0;
1409+
}
1410+
1411+
/* course correct only if there is at least one new job completed */
1412+
if (zfp.currentJobID > lastJobID) {
1413+
DISPLAYLEVEL(6, "compression level adaptation check \n")
1414+
1415+
/* check input speed */
1416+
if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) { /* warm up period, to fill all workers */
1417+
if (inputBlocked <= 0) {
1418+
DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
1419+
speedChange = slower;
1420+
} else if (speedChange == noChange) {
1421+
unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
1422+
unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
1423+
unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
1424+
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed;
1425+
previous_zfp_correction = zfp;
1426+
assert(inputPresented > 0);
1427+
DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
1428+
inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
1429+
(unsigned)newlyIngested, (unsigned)newlyConsumed,
1430+
(unsigned)newlyFlushed, (unsigned)newlyProduced);
1431+
if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
1432+
&& (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */
1433+
&& (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
1434+
) {
1435+
DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
1436+
newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
1437+
speedChange = faster;
1438+
}
1439+
}
1440+
inputBlocked = 0;
1441+
inputPresented = 0;
1442+
}
1443+
1444+
if (speedChange == slower) {
1445+
DISPLAYLEVEL(6, "slower speed , higher compression \n")
1446+
compressionLevel ++;
1447+
if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
1448+
if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
1449+
compressionLevel += (compressionLevel == 0); /* skip 0 */
1450+
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
1451+
}
1452+
if (speedChange == faster) {
1453+
DISPLAYLEVEL(6, "faster speed , lighter compression \n")
1454+
compressionLevel --;
1455+
if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
1456+
compressionLevel -= (compressionLevel == 0); /* skip 0 */
1457+
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
1458+
}
1459+
speedChange = noChange;
1460+
1461+
lastJobID = zfp.currentJobID;
1462+
} /* if (zfp.currentJobID > lastJobID) */
1463+
} /* if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) */
1464+
1465+
/* display notification */
1466+
if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) {
13741467
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
13751468
double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
13761469
UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed);
13771470
UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed);
13781471
UTIL_HumanReadableSize_t const produced_hrs = UTIL_makeHumanReadableSize(zfp.produced);
13791472

1473+
DELAY_NEXT_UPDATE();
1474+
13801475
/* display progress notifications */
13811476
DISPLAY_PROGRESS("\r%79s\r", ""); /* Clear out the current displayed line */
13821477
if (g_display_prefs.displayLevel >= 3) {
@@ -1406,96 +1501,8 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
14061501
if (fileSize != UTIL_FILESIZE_UNKNOWN)
14071502
DISPLAY_PROGRESS("/%6.*f%4s", file_hrs.precision, file_hrs.value, file_hrs.suffix);
14081503
DISPLAY_PROGRESS(" ==> %2.f%%", cShare);
1409-
DELAY_NEXT_UPDATE();
14101504
}
1411-
1412-
/* adaptive mode : statistics measurement and speed correction */
1413-
if (prefs->adaptiveMode) {
1414-
1415-
/* check output speed */
1416-
if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */
1417-
1418-
unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
1419-
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
1420-
assert(zfp.produced >= previous_zfp_update.produced);
1421-
assert(prefs->nbWorkers >= 1);
1422-
1423-
/* test if compression is blocked
1424-
* either because output is slow and all buffers are full
1425-
* or because input is slow and no job can start while waiting for at least one buffer to be filled.
1426-
* note : exclude starting part, since currentJobID > 1 */
1427-
if ( (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
1428-
&& (zfp.nbActiveWorkers == 0) /* confirmed : no compression ongoing */
1429-
) {
1430-
DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
1431-
speedChange = slower;
1432-
}
1433-
1434-
previous_zfp_update = zfp;
1435-
1436-
if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
1437-
&& (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
1438-
) {
1439-
DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
1440-
speedChange = slower;
1441-
}
1442-
flushWaiting = 0;
1443-
}
1444-
1445-
/* course correct only if there is at least one new job completed */
1446-
if (zfp.currentJobID > lastJobID) {
1447-
DISPLAYLEVEL(6, "compression level adaptation check \n")
1448-
1449-
/* check input speed */
1450-
if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) { /* warm up period, to fill all workers */
1451-
if (inputBlocked <= 0) {
1452-
DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
1453-
speedChange = slower;
1454-
} else if (speedChange == noChange) {
1455-
unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
1456-
unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
1457-
unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
1458-
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed;
1459-
previous_zfp_correction = zfp;
1460-
assert(inputPresented > 0);
1461-
DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
1462-
inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
1463-
(unsigned)newlyIngested, (unsigned)newlyConsumed,
1464-
(unsigned)newlyFlushed, (unsigned)newlyProduced);
1465-
if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
1466-
&& (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */
1467-
&& (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
1468-
) {
1469-
DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
1470-
newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
1471-
speedChange = faster;
1472-
}
1473-
}
1474-
inputBlocked = 0;
1475-
inputPresented = 0;
1476-
}
1477-
1478-
if (speedChange == slower) {
1479-
DISPLAYLEVEL(6, "slower speed , higher compression \n")
1480-
compressionLevel ++;
1481-
if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
1482-
if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
1483-
compressionLevel += (compressionLevel == 0); /* skip 0 */
1484-
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
1485-
}
1486-
if (speedChange == faster) {
1487-
DISPLAYLEVEL(6, "faster speed , lighter compression \n")
1488-
compressionLevel --;
1489-
if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
1490-
compressionLevel -= (compressionLevel == 0); /* skip 0 */
1491-
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
1492-
}
1493-
speedChange = noChange;
1494-
1495-
lastJobID = zfp.currentJobID;
1496-
} /* if (zfp.currentJobID > lastJobID) */
1497-
} /* if (g_adaptiveMode) */
1498-
} /* if (READY_FOR_UPDATE()) */
1505+
} /* if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) */
14991506
} /* while ((inBuff.pos != inBuff.size) */
15001507
} while (directive != ZSTD_e_end);
15011508

programs/fileio_common.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ extern FIO_display_prefs_t g_display_prefs;
3838
extern UTIL_time_t g_displayClock;
3939

4040
#define REFRESH_RATE ((U64)(SEC_TO_MICRO / 6))
41-
#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > REFRESH_RATE)
41+
#define READY_FOR_UPDATE() (UTIL_clockSpanMicro(g_displayClock) > REFRESH_RATE || g_display_prefs.displayLevel >= 4)
4242
#define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); }
4343
#define DISPLAYUPDATE(l, ...) { \
4444
if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \
45-
if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \
45+
if (READY_FOR_UPDATE()) { \
4646
DELAY_NEXT_UPDATE(); \
4747
DISPLAY(__VA_ARGS__); \
4848
if (g_display_prefs.displayLevel>=4) fflush(stderr); \

tests/cli-tests/compression/adapt.sh

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,11 @@ set -e
44

55
# Test --adapt
66
zstd -f file --adapt -c | zstd -t
7+
8+
datagen -g100M > file100M
9+
10+
# Pick parameters to force fast adaptation, even on slow systems
11+
zstd --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"
12+
13+
# Adaption still happens with --no-progress
14+
zstd --no-progress --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"

0 commit comments

Comments
 (0)