diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java index 063df34fc..de9d798ee 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java @@ -480,7 +480,7 @@ private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId, boolean is groupWriter = new AlignedChunkGroupWriterImpl(deviceId, encryptParam); if (!isUnseq) { // Sequence File ((AlignedChunkGroupWriterImpl) groupWriter) - .setLastTime(alignedDeviceLastTimeMap.getOrDefault(deviceId, -1L)); + .setLastTime(alignedDeviceLastTimeMap.get(deviceId)); } } else { groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId, encryptParam); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java index cfc420430..1047114ce 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java @@ -62,7 +62,8 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter { private final EncryptParameter encryprParam; - private long lastTime = -1; + private long lastTime = Long.MIN_VALUE; + private boolean isInitLastTime = false; public AlignedChunkGroupWriterImpl(IDeviceID deviceId) { this.deviceId = deviceId; @@ -179,6 +180,7 @@ public int write(long time, List data) throws WriteProcessException, } timeChunkWriter.write(time); lastTime = time; + isInitLastTime = true; if (checkPageSizeAndMayOpenANewPage()) { writePageToPageBuffer(); } @@ -269,6 +271,7 @@ public int write(Tablet tablet, int startRowIndex, int endRowIndex) } timeChunkWriter.write(time); lastTime = time; + isInitLastTime = true; if (checkPageSizeAndMayOpenANewPage()) { writePageToPageBuffer(); } @@ -385,7 +388,7 @@ private void sealAllChunks() { } private void checkIsHistoryData(long time) throws WriteProcessException { - if (time <= lastTime) { + if (isInitLastTime && time <= lastTime) { throw new WriteProcessException( "Not allowed to write out-of-order data in timeseries " + deviceId @@ -405,6 +408,9 @@ public Long getLastTime() { } public void setLastTime(Long lastTime) { - this.lastTime = lastTime; + if (lastTime != null) { + this.lastTime = lastTime; + isInitLastTime = true; + } } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java index 197f165e5..5d62c803d 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java @@ -207,7 +207,8 @@ private void sealAllChunks() { } private void checkIsHistoryData(String measurementId, long time) throws WriteProcessException { - if (time <= lastTimeMap.getOrDefault(measurementId, -1L)) { + final Long lastTime = lastTimeMap.get(measurementId); + if (lastTime != null && time <= lastTime) { throw new WriteProcessException( "Not allowed to write out-of-order data in timeseries " + deviceId diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java index 672ca2c11..e68a2f47d 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java @@ -403,6 +403,59 @@ public void writeNonAlignedWithTabletWithNullValue() { } } + @Test + public void writeNonAlignedWithTabletWithNegativeTimestamps() { + setEnv(100, 30); + try (TsFileWriter tsFileWriter = new TsFileWriter(f)) { + measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s2", TSDataType.STRING, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s3", TSDataType.BLOB, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s4", TSDataType.DATE, TSEncoding.PLAIN)); + + // register nonAligned timeseries + tsFileWriter.registerTimeseries(new Path(deviceId), measurementSchemas); + + Tablet tablet = new Tablet(deviceId, measurementSchemas); + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + tablet.initBitMaps(); + int sensorNum = measurementSchemas.size(); + long startTime = -100; + for (long r = 0; r < 10000; r++) { + int row = tablet.rowSize++; + timestamps[row] = startTime++; + for (int i = 0; i < sensorNum - 1; i++) { + if (i == 1 && r > 1000) { + tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber()); + continue; + } + Binary[] textSensor = (Binary[]) values[i]; + textSensor[row] = new Binary("testString.........", TSFileConfig.STRING_CHARSET); + } + if (r > 1000) { + tablet.bitMaps[sensorNum - 1].mark((int) r % tablet.getMaxRowNumber()); + } else { + LocalDate[] dateSensor = (LocalDate[]) values[sensorNum - 1]; + dateSensor[row] = LocalDate.of(2024, 4, 1); + } + // write + if (tablet.rowSize == tablet.getMaxRowNumber()) { + tsFileWriter.write(tablet); + tablet.reset(); + } + } + // write + if (tablet.rowSize != 0) { + tsFileWriter.write(tablet); + tablet.reset(); + } + + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail("Meet errors in test: " + e.getMessage()); + } + } + @Test public void writeAlignedWithTabletWithNullValue() { setEnv(100, 30); @@ -456,6 +509,59 @@ public void writeAlignedWithTabletWithNullValue() { } } + @Test + public void writeDataToTabletsWithNegativeTimestamps() { + setEnv(100, 30); + try (TsFileWriter tsFileWriter = new TsFileWriter(f)) { + measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s2", TSDataType.STRING, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s3", TSDataType.BLOB, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s4", TSDataType.DATE, TSEncoding.PLAIN)); + + // register aligned timeseries + tsFileWriter.registerAlignedTimeseries(new Path(deviceId), measurementSchemas); + + Tablet tablet = new Tablet(deviceId, measurementSchemas); + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + tablet.initBitMaps(); + int sensorNum = measurementSchemas.size(); + long startTime = -1000; + for (long r = 0; r < 10000; r++) { + int row = tablet.rowSize++; + timestamps[row] = startTime++; + for (int i = 0; i < sensorNum - 1; i++) { + if (i == 1 && r > 1000) { + tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber()); + continue; + } + Binary[] textSensor = (Binary[]) values[i]; + textSensor[row] = new Binary("testString.........", TSFileConfig.STRING_CHARSET); + } + if (r > 1000) { + tablet.bitMaps[sensorNum - 1].mark((int) r % tablet.getMaxRowNumber()); + } else { + LocalDate[] dateSensor = (LocalDate[]) values[sensorNum - 1]; + dateSensor[row] = LocalDate.of(2024, 4, 1); + } + // write + if (tablet.rowSize == tablet.getMaxRowNumber()) { + tsFileWriter.writeAligned(tablet); + tablet.reset(); + } + } + // write + if (tablet.rowSize != 0) { + tsFileWriter.writeAligned(tablet); + tablet.reset(); + } + + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail("Meet errors in test: " + e.getMessage()); + } + } + /** Write an empty page and then write a nonEmpty page. */ @Test public void writeAlignedTimeseriesWithEmptyPage() throws IOException, WriteProcessException {