Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ public String toString() {
}

public static class TimestampType extends PrimitiveType {
private static final TimestampType INSTANCE_WITH_ZONE = new TimestampType(true);
private static final TimestampType INSTANCE_WITHOUT_ZONE = new TimestampType(false);
private static final TimestampType INSTANCE_WITH_ZONE = new TimestampType(true, false);
private static final TimestampType INSTANCE_WITHOUT_ZONE = new TimestampType(false, false);
private static final TimestampType INSTANCE_SPARK_INT96 = new TimestampType(true, true);

public static TimestampType withZone() {
return INSTANCE_WITH_ZONE;
Expand All @@ -219,16 +220,30 @@ public static TimestampType withoutZone() {
return INSTANCE_WITHOUT_ZONE;
}

/**
* @return Timestamp type (with timezone) represented as INT96. This is only added for compatibility reasons
* and can only be written using a Spark's ParquetWriteSupport. Writing this type should be avoided.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should change the type system to support this. INT96 may be something that we can read, but Iceberg cannot write it, per the spec.

Was this needed to build the tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I found a way to have tests running that doesn't add a new type, I had to create an implementation of ParquetWriter.Builder that uses Spark's ParquetWriteSupport and Iceberg's ParquetWriteAdapter to avoid creating a SparkSession.

public static TimestampType asSparkInt96() {
return INSTANCE_SPARK_INT96;
}

private final boolean adjustToUTC;
private final boolean asSparkInt96;

private TimestampType(boolean adjustToUTC) {
private TimestampType(boolean adjustToUTC, boolean asSparkInt96) {
this.adjustToUTC = adjustToUTC;
this.asSparkInt96 = asSparkInt96;
}

public boolean shouldAdjustToUTC() {
return adjustToUTC;
}

public boolean shouldRepresentAsInt96() {
return asSparkInt96;
}

@Override
public TypeID typeId() {
return TypeID.TIMESTAMP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iceberg.data.parquet;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -28,6 +30,7 @@
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
Expand Down Expand Up @@ -299,6 +302,10 @@ public ParquetValueReader<?> primitive(org.apache.iceberg.types.Type.PrimitiveTy
case INT64:
case DOUBLE:
return new ParquetValueReaders.UnboxedReader<>(desc);
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return new TimestampInt96Reader(desc);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
Expand Down Expand Up @@ -345,6 +352,25 @@ public LocalDateTime read(LocalDateTime reuse) {
}
}

private static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

private TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewers (and future me): toByteBuffer returns a duplicate of the internal buffer so that it is safe for uses of it to modify the buffer's position with methods like getLong.

final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return Instant
.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN))
.plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC).toLocalDateTime();
}
}

private static class TimestamptzReader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzReader(ColumnDescriptor desc) {
super(desc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public Long next() {
return nextLong();
}
};
case INT96:
return (ColumnIterator<T>) new ColumnIterator<Binary>(desc, writerVersion) {
@Override
public Binary next() {
return nextBinary();
}
};
case FLOAT:
return (ColumnIterator<T>) new ColumnIterator<Float>(desc, writerVersion) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public Long next() {
return nextLong();
}
};
case INT96:
return (PageIterator<T>) new PageIterator<Binary>(desc, writerVersion) {
public Binary next() {
return nextBinary();
}
};
case FLOAT:
return (PageIterator<T>) new PageIterator<Float>(desc, writerVersion) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ private <E> Setter<I> newSetter(ParquetValueReader<E> reader, Type type) {
return (record, pos, ignored) -> setFloat(record, pos, unboxed.readFloat());
case DOUBLE:
return (record, pos, ignored) -> setDouble(record, pos, unboxed.readDouble());
case INT96:
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
return (record, pos, ignored) -> set(record, pos, unboxed.readBinary());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;

public class TypeToMessageType {
public static final int DECIMAL_INT32_MAX_DIGITS = 9;
Expand Down Expand Up @@ -136,7 +137,10 @@ public Type primitive(PrimitiveType primitive, Type.Repetition repetition, int i
case TIME:
return Types.primitive(INT64, repetition).as(TIME_MICROS).id(id).named(name);
case TIMESTAMP:
if (((TimestampType) primitive).shouldAdjustToUTC()) {
final TimestampType timestampType = (TimestampType) primitive;
if (timestampType.shouldRepresentAsInt96()) {
return Types.primitive(INT96, repetition).id(id).named(name);
} else if (timestampType.shouldAdjustToUTC()) {
return Types.primitive(INT64, repetition).as(TIMESTAMPTZ_MICROS).id(id).named(name);
} else {
return Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).id(id).named(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
Expand Down Expand Up @@ -277,6 +279,10 @@ public ParquetValueReader<?> primitive(org.apache.iceberg.types.Type.PrimitiveTy
case INT64:
case DOUBLE:
return new UnboxedReader<>(desc);
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return new TimestampInt96Reader(desc);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
Expand Down Expand Up @@ -350,6 +356,29 @@ public long readLong() {
}
}

private static class TimestampInt96Reader extends UnboxedReader<Long> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Long read(Long ignored) {
return readLong();
}

@Override
public long readLong() {
final ByteBuffer byteBuffer = column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) +
TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
}
}

private static class StringReader extends PrimitiveReader<UTF8String> {
StringReader(ColumnDescriptor desc) {
super(desc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,30 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
import static org.apache.iceberg.types.Types.NestedField.required;

public class TestSparkParquetReader extends AvroDataTest {
@Override
Expand Down Expand Up @@ -67,4 +76,41 @@ protected void writeAndValidate(Schema schema) throws IOException {
Assert.assertFalse("Should not have extra rows", rows.hasNext());
}
}

protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException {
try (CloseableIterable<InternalRow> reader =
Parquet.read(inputFile)
.project(schema)
.createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type))
.build()) {
return Lists.newArrayList(reader);
}
}

@Test
public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException {
final Schema schema = new Schema(required(1, "ts", Types.TimestampType.asSparkInt96()));
final StructType sparkSchema = SparkSchemaUtil.convert(schema);
final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(), "parquet_int96.parquet");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use temp.newFile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially tried that way but the writer fails because the file already exists.

final List<InternalRow> rows = Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we don't use final for local variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed these final modifiers.


try (FileAppender<InternalRow> writer =
Parquet.write(Files.localOutput(parquetFile.toString()))
.writeSupport(
new org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport())
.set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
.set("org.apache.spark.legacyDateTime", "false")
.set("spark.sql.parquet.int96AsTimestamp", "true")
.set("spark.sql.parquet.writeLegacyFormat", "false")
.set("spark.sql.parquet.outputTimestampType", "INT96")
.schema(schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to pass in a normal timestamp type and set a property, if needed, to enable INT96 support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I fully understand this comment.

But I did change my approach here, and while still writing InternalRow I removed most of these properties and left only the relevant ones to make sure that Spark writes these as int96.

.build()) {
writer.addAll(rows);
}

final List<InternalRow> readRows =
rowsFromFile(Files.localInput(parquetFile.toString()), schema);
Assert.assertEquals(rows.size(), readRows.size());
Assert.assertThat(readRows, CoreMatchers.is(rows));
}
}