Skip to content

Commit 157cbe7

Browse files
luoluoyuyujt2594838
authored andcommitted
Fixed the issue that the time of the first data item written to TSFile by measurement cannot be a negative number (#297)
(cherry picked from commit 4d93491)
1 parent 7c9f49c commit 157cbe7

File tree

4 files changed

+118
-5
lines changed

4 files changed

+118
-5
lines changed

java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId, boolean is
480480
groupWriter = new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
481481
if (!isUnseq) { // Sequence File
482482
((AlignedChunkGroupWriterImpl) groupWriter)
483-
.setLastTime(alignedDeviceLastTimeMap.getOrDefault(deviceId, -1L));
483+
.setLastTime(alignedDeviceLastTimeMap.get(deviceId));
484484
}
485485
} else {
486486
groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId, encryptParam);

java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
6262

6363
private final EncryptParameter encryprParam;
6464

65-
private long lastTime = -1;
65+
private long lastTime = Long.MIN_VALUE;
66+
private boolean isInitLastTime = false;
6667

6768
public AlignedChunkGroupWriterImpl(IDeviceID deviceId) {
6869
this.deviceId = deviceId;
@@ -179,6 +180,7 @@ public int write(long time, List<DataPoint> data) throws WriteProcessException,
179180
}
180181
timeChunkWriter.write(time);
181182
lastTime = time;
183+
isInitLastTime = true;
182184
if (checkPageSizeAndMayOpenANewPage()) {
183185
writePageToPageBuffer();
184186
}
@@ -269,6 +271,7 @@ public int write(Tablet tablet, int startRowIndex, int endRowIndex)
269271
}
270272
timeChunkWriter.write(time);
271273
lastTime = time;
274+
isInitLastTime = true;
272275
if (checkPageSizeAndMayOpenANewPage()) {
273276
writePageToPageBuffer();
274277
}
@@ -385,7 +388,7 @@ private void sealAllChunks() {
385388
}
386389

387390
private void checkIsHistoryData(long time) throws WriteProcessException {
388-
if (time <= lastTime) {
391+
if (isInitLastTime && time <= lastTime) {
389392
throw new WriteProcessException(
390393
"Not allowed to write out-of-order data in timeseries "
391394
+ deviceId
@@ -405,6 +408,9 @@ public Long getLastTime() {
405408
}
406409

407410
public void setLastTime(Long lastTime) {
408-
this.lastTime = lastTime;
411+
if (lastTime != null) {
412+
this.lastTime = lastTime;
413+
isInitLastTime = true;
414+
}
409415
}
410416
}

java/tsfile/src/main/java/org/apache/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ private void sealAllChunks() {
207207
}
208208

209209
private void checkIsHistoryData(String measurementId, long time) throws WriteProcessException {
210-
if (time <= lastTimeMap.getOrDefault(measurementId, -1L)) {
210+
final Long lastTime = lastTimeMap.get(measurementId);
211+
if (lastTime != null && time <= lastTime) {
211212
throw new WriteProcessException(
212213
"Not allowed to write out-of-order data in timeseries "
213214
+ deviceId

java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,59 @@ public void writeNonAlignedWithTabletWithNullValue() {
403403
}
404404
}
405405

406+
@Test
407+
public void writeNonAlignedWithTabletWithNegativeTimestamps() {
408+
setEnv(100, 30);
409+
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
410+
measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN));
411+
measurementSchemas.add(new MeasurementSchema("s2", TSDataType.STRING, TSEncoding.PLAIN));
412+
measurementSchemas.add(new MeasurementSchema("s3", TSDataType.BLOB, TSEncoding.PLAIN));
413+
measurementSchemas.add(new MeasurementSchema("s4", TSDataType.DATE, TSEncoding.PLAIN));
414+
415+
// register nonAligned timeseries
416+
tsFileWriter.registerTimeseries(new Path(deviceId), measurementSchemas);
417+
418+
Tablet tablet = new Tablet(deviceId, measurementSchemas);
419+
long[] timestamps = tablet.timestamps;
420+
Object[] values = tablet.values;
421+
tablet.initBitMaps();
422+
int sensorNum = measurementSchemas.size();
423+
long startTime = -100;
424+
for (long r = 0; r < 10000; r++) {
425+
int row = tablet.rowSize++;
426+
timestamps[row] = startTime++;
427+
for (int i = 0; i < sensorNum - 1; i++) {
428+
if (i == 1 && r > 1000) {
429+
tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
430+
continue;
431+
}
432+
Binary[] textSensor = (Binary[]) values[i];
433+
textSensor[row] = new Binary("testString.........", TSFileConfig.STRING_CHARSET);
434+
}
435+
if (r > 1000) {
436+
tablet.bitMaps[sensorNum - 1].mark((int) r % tablet.getMaxRowNumber());
437+
} else {
438+
LocalDate[] dateSensor = (LocalDate[]) values[sensorNum - 1];
439+
dateSensor[row] = LocalDate.of(2024, 4, 1);
440+
}
441+
// write
442+
if (tablet.rowSize == tablet.getMaxRowNumber()) {
443+
tsFileWriter.write(tablet);
444+
tablet.reset();
445+
}
446+
}
447+
// write
448+
if (tablet.rowSize != 0) {
449+
tsFileWriter.write(tablet);
450+
tablet.reset();
451+
}
452+
453+
} catch (Throwable e) {
454+
e.printStackTrace();
455+
Assert.fail("Meet errors in test: " + e.getMessage());
456+
}
457+
}
458+
406459
@Test
407460
public void writeAlignedWithTabletWithNullValue() {
408461
setEnv(100, 30);
@@ -456,6 +509,59 @@ public void writeAlignedWithTabletWithNullValue() {
456509
}
457510
}
458511

512+
@Test
513+
public void writeDataToTabletsWithNegativeTimestamps() {
514+
setEnv(100, 30);
515+
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
516+
measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN));
517+
measurementSchemas.add(new MeasurementSchema("s2", TSDataType.STRING, TSEncoding.PLAIN));
518+
measurementSchemas.add(new MeasurementSchema("s3", TSDataType.BLOB, TSEncoding.PLAIN));
519+
measurementSchemas.add(new MeasurementSchema("s4", TSDataType.DATE, TSEncoding.PLAIN));
520+
521+
// register aligned timeseries
522+
tsFileWriter.registerAlignedTimeseries(new Path(deviceId), measurementSchemas);
523+
524+
Tablet tablet = new Tablet(deviceId, measurementSchemas);
525+
long[] timestamps = tablet.timestamps;
526+
Object[] values = tablet.values;
527+
tablet.initBitMaps();
528+
int sensorNum = measurementSchemas.size();
529+
long startTime = -1000;
530+
for (long r = 0; r < 10000; r++) {
531+
int row = tablet.rowSize++;
532+
timestamps[row] = startTime++;
533+
for (int i = 0; i < sensorNum - 1; i++) {
534+
if (i == 1 && r > 1000) {
535+
tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
536+
continue;
537+
}
538+
Binary[] textSensor = (Binary[]) values[i];
539+
textSensor[row] = new Binary("testString.........", TSFileConfig.STRING_CHARSET);
540+
}
541+
if (r > 1000) {
542+
tablet.bitMaps[sensorNum - 1].mark((int) r % tablet.getMaxRowNumber());
543+
} else {
544+
LocalDate[] dateSensor = (LocalDate[]) values[sensorNum - 1];
545+
dateSensor[row] = LocalDate.of(2024, 4, 1);
546+
}
547+
// write
548+
if (tablet.rowSize == tablet.getMaxRowNumber()) {
549+
tsFileWriter.writeAligned(tablet);
550+
tablet.reset();
551+
}
552+
}
553+
// write
554+
if (tablet.rowSize != 0) {
555+
tsFileWriter.writeAligned(tablet);
556+
tablet.reset();
557+
}
558+
559+
} catch (Throwable e) {
560+
e.printStackTrace();
561+
Assert.fail("Meet errors in test: " + e.getMessage());
562+
}
563+
}
564+
459565
/** Write an empty page and then write a nonEmpty page. */
460566
@Test
461567
public void writeAlignedTimeseriesWithEmptyPage() throws IOException, WriteProcessException {

0 commit comments

Comments
 (0)