Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static void main(String[] args) throws IOException {
tablet.addValue(rowIndex, 0, "id1_field_2");

// id2 column
tablet.addValue(rowIndex, 1, "id1_field_2");
tablet.addValue(rowIndex, 1, "id2_field_2");

// s1 column
tablet.addValue(rowIndex, 2, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class TableSchema {

Expand All @@ -53,18 +54,45 @@ public class TableSchema {
private Map<String, Integer> idColumnOrder;

public TableSchema(String tableName) {
this.tableName = tableName;
this.tableName = tableName.toLowerCase();
this.measurementSchemas = new ArrayList<>();
this.columnCategories = new ArrayList<>();
this.updatable = true;
}

// for deserialize
public TableSchema(
List<IMeasurementSchema> columnSchemas, List<ColumnCategory> columnCategories) {
this.measurementSchemas =
columnSchemas.stream()
.map(
measurementSchema ->
new MeasurementSchema(
measurementSchema.getMeasurementName().toLowerCase(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getCompressor(),
measurementSchema.getProps()))
.collect(Collectors.toList());
this.columnCategories = columnCategories;
}

public TableSchema(
String tableName,
List<IMeasurementSchema> columnSchemas,
List<ColumnCategory> columnCategories) {
this.tableName = tableName;
this.measurementSchemas = columnSchemas;
this.tableName = tableName.toLowerCase();
this.measurementSchemas =
columnSchemas.stream()
.map(
measurementSchema ->
new MeasurementSchema(
measurementSchema.getMeasurementName().toLowerCase(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getCompressor(),
measurementSchema.getProps()))
.collect(Collectors.toList());
this.columnCategories = columnCategories;
}

Expand All @@ -73,22 +101,24 @@ public TableSchema(
List<String> columnNameList,
List<TSDataType> dataTypeList,
List<ColumnCategory> categoryList) {
this.tableName = tableName;
this.tableName = tableName.toLowerCase();
this.measurementSchemas = new ArrayList<>(columnNameList.size());
for (int i = 0; i < columnNameList.size(); i++) {
measurementSchemas.add(new MeasurementSchema(columnNameList.get(i), dataTypeList.get(i)));
measurementSchemas.add(
new MeasurementSchema(columnNameList.get(i).toLowerCase(), dataTypeList.get(i)));
}
this.columnCategories = categoryList;
}

@TsFileApi
public TableSchema(String tableName, List<ColumnSchema> columnSchemaList) {
this.tableName = tableName;
this.tableName = tableName.toLowerCase();
this.measurementSchemas = new ArrayList<>(columnSchemaList.size());
this.columnCategories = new ArrayList<>(columnSchemaList.size());
for (ColumnSchema columnSchema : columnSchemaList) {
this.measurementSchemas.add(
new MeasurementSchema(columnSchema.getColumnName(), columnSchema.getDataType()));
new MeasurementSchema(
columnSchema.getColumnName().toLowerCase(), columnSchema.getDataType()));
this.columnCategories.add(columnSchema.getColumnCategory());
}
}
Expand Down Expand Up @@ -126,12 +156,13 @@ public Map<String, Integer> getIdColumnOrder() {
* @return i if the given column is the i-th column, -1 if the column is not in the schema
*/
public int findColumnIndex(String columnName) {
final String lowerCaseColumnName = columnName.toLowerCase();
return getColumnPosIndex()
.computeIfAbsent(
columnName,
lowerCaseColumnName,
colName -> {
for (int i = 0; i < measurementSchemas.size(); i++) {
if (measurementSchemas.get(i).getMeasurementName().equals(columnName)) {
if (measurementSchemas.get(i).getMeasurementName().equals(lowerCaseColumnName)) {
return i;
}
}
Expand All @@ -144,13 +175,14 @@ public int findColumnIndex(String columnName) {
* not an ID column
*/
public int findIdColumnOrder(String columnName) {
final String lowerCaseColumnName = columnName.toLowerCase();
return getIdColumnOrder()
.computeIfAbsent(
columnName,
lowerCaseColumnName,
colName -> {
int columnOrder = 0;
for (int i = 0; i < measurementSchemas.size(); i++) {
if (measurementSchemas.get(i).getMeasurementName().equals(columnName)
if (measurementSchemas.get(i).getMeasurementName().equals(lowerCaseColumnName)
&& columnCategories.get(i) == ColumnCategory.ID) {
return columnOrder;
} else if (columnCategories.get(i) == ColumnCategory.ID) {
Expand All @@ -162,7 +194,7 @@ public int findIdColumnOrder(String columnName) {
}

public IMeasurementSchema findColumnSchema(String columnName) {
final int columnIndex = findColumnIndex(columnName);
final int columnIndex = findColumnIndex(columnName.toLowerCase());
return columnIndex >= 0 ? measurementSchemas.get(columnIndex) : null;
}

Expand Down Expand Up @@ -230,15 +262,15 @@ public static TableSchema deserialize(ByteBuffer buffer, DeserializeConfig conte
measurementSchemas.add(measurementSchema);
columnCategories.add(ColumnCategory.values()[buffer.getInt()]);
}
return new TableSchema(null, measurementSchemas, columnCategories);
return new TableSchema(measurementSchemas, columnCategories);
}

public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
this.tableName = tableName.toLowerCase();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,30 +74,33 @@ public List<TableSchema> getAllTableSchema() throws IOException {
public Optional<TableSchema> getTableSchemas(String tableName) throws IOException {
TsFileMetadata tsFileMetadata = fileReader.readFileMetadata();
Map<String, TableSchema> tableSchemaMap = tsFileMetadata.getTableSchemaMap();
return Optional.ofNullable(tableSchemaMap.get(tableName));
return Optional.ofNullable(tableSchemaMap.get(tableName.toLowerCase()));
}

@TsFileApi
public ResultSet query(String tableName, List<String> columnNames, long startTime, long endTime)
throws IOException, NoTableException, NoMeasurementException, ReadProcessException {
String lowerCaseTableName = tableName.toLowerCase();
TsFileMetadata tsFileMetadata = fileReader.readFileMetadata();
TableSchema tableSchema = tsFileMetadata.getTableSchemaMap().get(tableName);
TableSchema tableSchema = tsFileMetadata.getTableSchemaMap().get(lowerCaseTableName);
if (tableSchema == null) {
throw new NoTableException(tableName);
}
List<TSDataType> dataTypeList = new ArrayList<>(columnNames.size());
List<String> lowerCaseColumnNames = new ArrayList<>(columnNames.size());
for (String columnName : columnNames) {
Map<String, Integer> column2IndexMap = tableSchema.buildColumnPosIndex();
Integer columnIndex = column2IndexMap.get(columnName);
Integer columnIndex = column2IndexMap.get(columnName.toLowerCase());
if (columnIndex == null) {
throw new NoMeasurementException(columnName);
}
lowerCaseColumnNames.add(columnName.toLowerCase());
dataTypeList.add(tableSchema.getColumnSchemas().get(columnIndex).getType());
}
TsBlockReader tsBlockReader =
queryExecutor.query(
tableName,
columnNames,
lowerCaseTableName,
lowerCaseColumnNames,
new ExpressionTree.TimeBetweenAnd(startTime, endTime),
null,
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
import org.apache.tsfile.write.chunk.IChunkGroupWriter;
import org.apache.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
import org.apache.tsfile.write.chunk.TableChunkGroupWriterImpl;
import org.apache.tsfile.write.record.TSRecord;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.record.datapoint.DataPoint;
Expand Down Expand Up @@ -347,7 +348,7 @@ private boolean checkIsTimeseriesExist(TSRecord record, boolean isAligned)
throws WriteProcessException, IOException {
// initial ChunkGroupWriter of this device in the TSRecord
final IDeviceID deviceID = record.deviceId;
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(deviceID, isAligned);
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(deviceID, isAligned, false);

// initial all SeriesWriters of measurements in this TSRecord
List<IMeasurementSchema> measurementSchemas;
Expand Down Expand Up @@ -411,7 +412,7 @@ private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet)
private void checkIsTimeseriesExist(Tablet tablet, boolean isAligned)
throws WriteProcessException, IOException {
final IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId());
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(deviceID, isAligned);
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(deviceID, isAligned, false);

List<IMeasurementSchema> schemas = tablet.getSchemas();
if (getSchema().containsDevice(deviceID)) {
Expand Down Expand Up @@ -495,11 +496,15 @@ private List<IMeasurementSchema> checkIsAllMeasurementsInGroup(
return schemas;
}

private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId, boolean isAligned) {
private IChunkGroupWriter tryToInitialGroupWriter(
IDeviceID deviceId, boolean isAligned, boolean isTableModel) {
IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
if (groupWriter == null) {
if (isAligned) {
groupWriter = new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
groupWriter =
isTableModel
? new TableChunkGroupWriterImpl(deviceId, encryptParam)
: new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
if (!isUnseq) { // Sequence File
((AlignedChunkGroupWriterImpl) groupWriter)
.setLastTime(alignedDeviceLastTimeMap.get(deviceId));
Expand Down Expand Up @@ -733,7 +738,7 @@ public boolean writeTable(Tablet tablet, List<Pair<IDeviceID, Integer>> deviceId
for (Pair<IDeviceID, Integer> pair : deviceIdEndIndexPairs) {
// get corresponding ChunkGroupWriter and write this Tablet
recordCount +=
tryToInitialGroupWriter(pair.left, isTableWriteAligned)
tryToInitialGroupWriter(pair.left, isTableWriteAligned, true)
.write(tablet, startIndex, pair.right);
startIndex = pair.right;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {

private long lastTime = Long.MIN_VALUE;
private boolean isInitLastTime = false;
private boolean convertColumnNameToLowerCase = false;

public AlignedChunkGroupWriterImpl(IDeviceID deviceId) {
this.deviceId = deviceId;
Expand Down Expand Up @@ -100,17 +101,20 @@ public void tryToAddSeriesWriter(IMeasurementSchema measurementSchema) throws IO

public ValueChunkWriter tryToAddSeriesWriterInternal(IMeasurementSchema measurementSchema)
throws IOException {
ValueChunkWriter valueChunkWriter =
valueChunkWriterMap.get(measurementSchema.getMeasurementName());
String measurementName =
convertColumnNameToLowerCase
? measurementSchema.getMeasurementName().toLowerCase()
: measurementSchema.getMeasurementName();
ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(measurementName);
if (valueChunkWriter == null) {
valueChunkWriter =
new ValueChunkWriter(
measurementSchema.getMeasurementName(),
measurementName,
measurementSchema.getCompressor(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getValueEncoder());
valueChunkWriterMap.put(measurementSchema.getMeasurementName(), valueChunkWriter);
valueChunkWriterMap.put(measurementName, valueChunkWriter);
tryToAddEmptyPageAndData(valueChunkWriter);
}
return valueChunkWriter;
Expand All @@ -119,15 +123,19 @@ public ValueChunkWriter tryToAddSeriesWriterInternal(IMeasurementSchema measurem
@Override
public void tryToAddSeriesWriter(List<IMeasurementSchema> measurementSchemas) throws IOException {
for (IMeasurementSchema schema : measurementSchemas) {
if (!valueChunkWriterMap.containsKey(schema.getMeasurementName())) {
String measurementName =
convertColumnNameToLowerCase
? schema.getMeasurementName().toLowerCase()
: schema.getMeasurementName();
if (!valueChunkWriterMap.containsKey(measurementName)) {
ValueChunkWriter valueChunkWriter =
new ValueChunkWriter(
schema.getMeasurementName(),
measurementName,
schema.getCompressor(),
schema.getType(),
schema.getEncodingType(),
schema.getValueEncoder());
valueChunkWriterMap.put(schema.getMeasurementName(), valueChunkWriter);
valueChunkWriterMap.put(measurementName, valueChunkWriter);
tryToAddEmptyPageAndData(valueChunkWriter);
}
}
Expand All @@ -138,15 +146,25 @@ public int write(long time, List<DataPoint> data) throws WriteProcessException,
checkIsHistoryData(time);
List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
Set<String> existingMeasurements =
data.stream().map(DataPoint::getMeasurementId).collect(Collectors.toSet());
data.stream()
.map(
dataPoint ->
convertColumnNameToLowerCase
? dataPoint.getMeasurementId().toLowerCase()
: dataPoint.getMeasurementId())
.collect(Collectors.toSet());
for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
if (!existingMeasurements.contains(entry.getKey())) {
emptyValueChunkWriters.add(entry.getValue());
}
}
for (DataPoint point : data) {
boolean isNull = point.getValue() == null;
ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(point.getMeasurementId());
String measurementId =
convertColumnNameToLowerCase
? point.getMeasurementId().toLowerCase()
: point.getMeasurementId();
ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(measurementId);
switch (point.getType()) {
case BOOLEAN:
valueChunkWriter.write(time, (boolean) point.getValue(), isNull);
Expand Down Expand Up @@ -201,7 +219,11 @@ public int write(Tablet tablet, int startRowIndex, int endRowIndex)
// TODO: should we allow duplicated measurements in a Tablet?
Set<String> existingMeasurements =
measurementSchemas.stream()
.map(IMeasurementSchema::getMeasurementName)
.map(
schema ->
convertColumnNameToLowerCase
? schema.getMeasurementName().toLowerCase()
: schema.getMeasurementName())
.collect(Collectors.toSet());
for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
if (!existingMeasurements.contains(entry.getKey())) {
Expand Down Expand Up @@ -413,4 +435,8 @@ public void setLastTime(Long lastTime) {
isInitLastTime = true;
}
}

public void setConvertColumnNameToLowerCase(boolean convertColumnNameToLowerCase) {
this.convertColumnNameToLowerCase = convertColumnNameToLowerCase;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tsfile.write.chunk;

import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.file.metadata.IDeviceID;

public class TableChunkGroupWriterImpl extends AlignedChunkGroupWriterImpl {

public TableChunkGroupWriterImpl(IDeviceID deviceId) {
super(deviceId);
setConvertColumnNameToLowerCase(true);
}

public TableChunkGroupWriterImpl(IDeviceID deviceId, EncryptParameter encryptParam) {
super(deviceId, encryptParam);
setConvertColumnNameToLowerCase(true);
}
}
Loading
Loading