Skip to content

Commit 1093ac1

Browse files
committed
Fixed issue with concurrent spill during compaction
1 parent 6c4f58c commit 1093ac1

1 file changed

Lines changed: 98 additions & 20 deletions

File tree

engine/src/main/java/com/arcadedb/engine/timeseries/TimeSeriesShard.java

Lines changed: 98 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -307,14 +307,21 @@ private void compactInternal() throws IOException {
307307
// ── Phase 1 (no lock, read-only TX): read full/immutable pages ───────────────────────────
308308
// Full pages are never written to by concurrent appendSamples(); the read-only snapshot TX
309309
// is always conflict-free. Roll it back immediately — nothing is modified.
310+
// phase2Spill holds the raw samples of the last partial chunk from Phase 2 (if any).
311+
// These samples are merged with Phase 4's partial-page data to avoid splitting a single
312+
// sealed block across the Phase-2/Phase-4 page boundary.
313+
Object[] phase2Spill = null;
310314
if (lastFullPage > 0) {
311315
database.begin();
312316
try {
313317
final Object[] snapshotData = mutableBucket.readFullPagesForCompaction(lastFullPage);
314318
if (snapshotData != null)
315319
// ── Phase 2 (lock-free, no TX): sort + compress immutable page data ─────────────
316-
buildCompressedBlocks(snapshotData, allCompressedList, allMetaList, allMinsList, allMaxsList, allSumsList,
317-
allTagDVList);
320+
// returnSpill=true: the last partial chunk is returned as raw data instead of being
321+
// emitted as a block, so Phase 4 can merge it with the partial-page samples and
322+
// produce a single correctly-sized block.
323+
phase2Spill = buildCompressedBlocks(snapshotData, allCompressedList, allMetaList, allMinsList, allMaxsList,
324+
allSumsList, allTagDVList, true);
318325
} finally {
319326
database.rollback(); // read-only: rollback is always safe
320327
}
@@ -344,22 +351,26 @@ private void compactInternal() throws IOException {
344351
try {
345352
// Read pages that were NOT in the Phase 1 snapshot: the partial last page (page N from
346353
// Phase 0) and any new pages (N+1..M) that arrived between Phase 0 and Phase 4.
354+
// Prepend any Phase 2 spill so the last bucket/block is not split across phases.
347355
final int currentDataPageCount = mutableBucket.getDataPageCount();
348-
if (currentDataPageCount > lastFullPage) {
349-
final Object[] remainingData =
350-
mutableBucket.readPagesRangeForCompaction(lastFullPage + 1, currentDataPageCount);
351-
if (remainingData != null) {
352-
final List<byte[][]> remCompressed = new ArrayList<>();
353-
final List<long[]> remMeta = new ArrayList<>();
354-
final List<double[]> remMins = new ArrayList<>();
355-
final List<double[]> remMaxs = new ArrayList<>();
356-
final List<double[]> remSums = new ArrayList<>();
357-
final List<String[][]> remTagDV = new ArrayList<>();
358-
buildCompressedBlocks(remainingData, remCompressed, remMeta, remMins, remMaxs, remSums, remTagDV);
359-
// Append these blocks to the already-written temp file (fast: small data).
360-
sealedStore.appendBlocksToTempFile(
361-
remCompressed, remMeta, remMins, remMaxs, remSums, remTagDV, newBlockDirectory);
362-
}
356+
Object[] remainingData = null;
357+
if (currentDataPageCount > lastFullPage)
358+
remainingData = mutableBucket.readPagesRangeForCompaction(lastFullPage + 1, currentDataPageCount);
359+
if (phase2Spill != null)
360+
remainingData = remainingData != null ? mergeCompactionData(phase2Spill, remainingData) : phase2Spill;
361+
if (remainingData != null) {
362+
final List<byte[][]> remCompressed = new ArrayList<>();
363+
final List<long[]> remMeta = new ArrayList<>();
364+
final List<double[]> remMins = new ArrayList<>();
365+
final List<double[]> remMaxs = new ArrayList<>();
366+
final List<double[]> remSums = new ArrayList<>();
367+
final List<String[][]> remTagDV = new ArrayList<>();
368+
// returnSpill=false: Phase 4 processes ALL remaining data (including Phase 2 spill);
369+
// the last partial chunk must become a block since there is no next phase.
370+
buildCompressedBlocks(remainingData, remCompressed, remMeta, remMins, remMaxs, remSums, remTagDV, false);
371+
// Append these blocks to the already-written temp file (fast: small data).
372+
sealedStore.appendBlocksToTempFile(
373+
remCompressed, remMeta, remMins, remMaxs, remSums, remTagDV, newBlockDirectory);
363374
}
364375

365376
// Atomically swap temp file into the sealed store; updates in-memory blockDirectory.
@@ -391,17 +402,25 @@ private void compactInternal() throws IOException {
391402
* Sorts the given snapshot data by timestamp and builds compressed sealed blocks,
392403
* appending the results to the supplied output lists.
393404
* Called from Phase 2 (lock-free) and Phase 4 (under writeLock).
405+
*
406+
* @param returnSpill when {@code true}, the last partial chunk is NOT emitted as a block but
407+
* is instead returned as raw compaction data (same format as {@code data}).
408+
* The caller (Phase 4) prepends this spill to the partial-page data so the
409+
* bucket/block boundary is never split across phases. Pass {@code false}
410+
* in Phase 4 where all remaining data must become blocks.
411+
* @return the spill raw data array, or {@code null} when {@code returnSpill} is {@code false}
412+
* or there is no partial last chunk.
394413
*/
395-
private void buildCompressedBlocks(
414+
private Object[] buildCompressedBlocks(
396415
final Object[] data,
397416
final List<byte[][]> compressedOut, final List<long[]> metaOut,
398417
final List<double[]> minsOut, final List<double[]> maxsOut, final List<double[]> sumsOut,
399-
final List<String[][]> tagDVOut) {
418+
final List<String[][]> tagDVOut, final boolean returnSpill) {
400419

401420
final long[] timestamps = (long[]) data[0];
402421
final int totalSamples = timestamps.length;
403422
if (totalSamples == 0)
404-
return;
423+
return null;
405424

406425
final int[] sortedIndices = sortIndices(timestamps);
407426
final long[] sortedTs = applyOrder(timestamps, sortedIndices);
@@ -436,6 +455,21 @@ private void buildCompressedBlocks(
436455

437456
chunkEnd = adjustChunkForDictionaryLimit(chunkStart, chunkEnd, colCount, sortedColArrays);
438457

458+
// If this is the last chunk and it may be partial, return it as spill so Phase 4 can
459+
// merge it with partial-page data and produce a single correctly-bounded block.
460+
if (returnSpill && chunkEnd == totalSamples) {
461+
final boolean isPartial;
462+
if (compactionBucketIntervalMs > 0)
463+
// Bucket-aligned: always hold back the last chunk since Phase 4 may have more
464+
// samples in the same bucket.
465+
isPartial = true;
466+
else
467+
// Fixed-size: partial when the chunk is smaller than a full sealed block.
468+
isPartial = (chunkEnd - chunkStart) < SEALED_BLOCK_SIZE;
469+
if (isPartial)
470+
return extractSpillData(sortedTs, sortedColArrays, chunkStart, chunkEnd, colCount);
471+
}
472+
439473
final int chunkLen = chunkEnd - chunkStart;
440474
final long[] chunkTs = Arrays.copyOfRange(sortedTs, chunkStart, chunkEnd);
441475

@@ -491,6 +525,50 @@ private void buildCompressedBlocks(
491525
tagDVOut.add(chunkTagDistinctValues);
492526
chunkStart = chunkEnd;
493527
}
528+
return null; // no spill
529+
}
530+
531+
/**
532+
* Extracts the raw samples in {@code [from, to)} from the sorted compaction arrays and
533+
* returns them in the same format as the {@code data} parameter of
534+
* {@link #buildCompressedBlocks}: element 0 is {@code long[]} timestamps, subsequent
535+
* elements are {@code Object[]} non-timestamp column arrays.
536+
*/
537+
private Object[] extractSpillData(final long[] sortedTs, final Object[][] sortedColArrays,
538+
final int from, final int to, final int colCount) {
539+
final Object[] spill = new Object[colCount];
540+
spill[0] = Arrays.copyOfRange(sortedTs, from, to);
541+
int spillIdx = 1;
542+
for (int c = 0; c < colCount; c++) {
543+
if (sortedColArrays[c] == null)
544+
continue; // TIMESTAMP column — skip; its data lives in element 0
545+
spill[spillIdx++] = Arrays.copyOfRange(sortedColArrays[c], from, to);
546+
}
547+
return spill;
548+
}
549+
550+
/**
551+
* Concatenates two compaction data arrays (same format as the {@code data} parameter of
552+
* {@link #buildCompressedBlocks}). Used to merge Phase 2 spill with Phase 4 partial-page
553+
* data before compressing them together.
554+
*/
555+
private static Object[] mergeCompactionData(final Object[] a, final Object[] b) {
556+
final long[] tsA = (long[]) a[0];
557+
final long[] tsB = (long[]) b[0];
558+
final long[] mergedTs = new long[tsA.length + tsB.length];
559+
System.arraycopy(tsA, 0, mergedTs, 0, tsA.length);
560+
System.arraycopy(tsB, 0, mergedTs, tsA.length, tsB.length);
561+
final Object[] result = new Object[a.length];
562+
result[0] = mergedTs;
563+
for (int i = 1; i < a.length; i++) {
564+
final Object[] colA = (Object[]) a[i];
565+
final Object[] colB = (Object[]) b[i];
566+
final Object[] merged = new Object[colA.length + colB.length];
567+
System.arraycopy(colA, 0, merged, 0, colA.length);
568+
System.arraycopy(colB, 0, merged, colA.length, colB.length);
569+
result[i] = merged;
570+
}
571+
return result;
494572
}
495573

496574
/** Best-effort: clear the compaction-in-progress flag after a non-crash error. */

0 commit comments

Comments
 (0)