Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class ColumnStats {

GenericRecord genericRecord = (GenericRecord) record;

final Object fieldVal = convertValueForSpecificDataTypes(field.schema(), genericRecord.get(field.name()), true);
final Object fieldVal = convertValueForSpecificDataTypes(field.schema(), genericRecord.get(field.name()), false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @alexeykudinkin , can you take a look at this line, the change expects to work correctly right ? (same with parquet files, the parquet max/min for timestamp is the raw long, and we use a LongWrapper here).

final Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(genericRecord.getSchema(), field.name());

if (fieldVal != null && canCompare(fieldSchema)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -273,29 +271,6 @@ private static Object doUnpack(
Object rawVal,
LogicalType logicalType,
Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> converters) {
// fix time unit
switch (logicalType.getTypeRoot()) {
case TIME_WITHOUT_TIME_ZONE:
TimeType timeType = (TimeType) logicalType;
if (timeType.getPrecision() == 3) {
// the precision in HoodieMetadata is 6
rawVal = ((Long) rawVal) / 1000;
} else if (timeType.getPrecision() == 9) {
rawVal = ((Long) rawVal) * 1000;
}
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) logicalType;
if (timestampType.getPrecision() == 3) {
// the precision in HoodieMetadata is 6
rawVal = ((Long) rawVal) / 1000;
} else if (timestampType.getPrecision() == 9) {
rawVal = ((Long) rawVal) * 1000;
}
break;
default:
// no operation
}
AvroToRowDataConverters.AvroToRowDataConverter converter =
converters.computeIfAbsent(logicalType, k -> AvroToRowDataConverters.createConverter(logicalType));
return converter.convert(rawVal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -523,6 +524,8 @@ private static Object getValAsJavaObj(RowData indexRow, int pos, LogicalType col
// manually encoding corresponding values as int and long w/in the Column Stats Index and
// here we have to decode those back into corresponding logical representation.
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType tsType = (TimestampType) colType;
return indexRow.getTimestamp(pos, tsType.getPrecision()).getMillisecond();
case TIME_WITHOUT_TIME_ZONE:
case DATE:
return indexRow.getLong(pos);
Expand Down