Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object DateTimeUtils {
final val JULIAN_DAY_OF_EPOCH = 2440588
final val SECONDS_PER_DAY = 60 * 60 * 24L
final val MICROS_PER_SECOND = 1000L * 1000L
final val MILLIS_PER_SECOND = 1000L
final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY

Expand Down Expand Up @@ -237,6 +238,24 @@ object DateTimeUtils {
(day.toInt, micros * 1000L)
}

/*
* Converts the timestamp to milliseconds since epoc. In spark timestamp values have microseconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: epoch

* precision, so this conversion is lossy.
*/
def toMillis(us: SQLTimestamp): Long = {
// When the timestamp is negative i.e before 1970, we need to adjust the millseconds portion.
// Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision.
// In millis precision the above needs to be represented as (-157700927877).
Math.floor(us.toDouble / MILLIS_PER_SECOND).toLong
}

/*
* Converts millseconds since epoc to SQLTimestamp.
*/
def fromMillis(millis: Long): SQLTimestamp = {
millis * 1000L
}

/**
* Parses a given UTF8 date string to the corresponding a corresponding [[Long]] value.
* The return type is [[Option]] in order to distinguish between 0L and null. The following
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_INT64_AS_TIMESTAMP_MILLIS = buildConf("spark.sql.parquet.int64AsTimestampMillis")
.doc("When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the " +
"extended type. In this mode, the microsecond portion of the timestamp value will be" +
"truncated.")
.booleanConf
.createWithDefault(false)

val PARQUET_CACHE_METADATA = buildConf("spark.sql.parquet.cacheMetadata")
.doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
.booleanConf
Expand Down Expand Up @@ -924,6 +931,8 @@ class SQLConf extends Serializable with Logging {

def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)

def isParquetINT64AsTimestampMillis: Boolean = getConf(PARQUET_INT64_AS_TIMESTAMP_MILLIS)

def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)

def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ protected void initialize(String path, List<String> columns) throws IOException
config.set("spark.sql.parquet.binaryAsString", "false");
config.set("spark.sql.parquet.int96AsTimestamp", "false");
config.set("spark.sql.parquet.writeLegacyFormat", "false");
config.set("spark.sql.parquet.int64AsTimestampMillis", "false");

this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;

import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
Expand Down Expand Up @@ -155,9 +156,13 @@ void readBatch(int total, ColumnVector column) throws IOException {
// Read and decode dictionary ids.
defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);

// Timestamp values encoded as INT64 can't be lazily decoded as we need to post process
// the values to add microseconds precision.
if (column.hasDictionary() || (rowId == 0 &&
(descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 ||
(descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 &&
column.dataType() != DataTypes.TimestampType) ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to convert the time back to micros and in case of lazy decoding, we don't get that chance ?

descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
Expand Down Expand Up @@ -250,7 +255,15 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
}
}
} else {
} else if (column.dataType() == DataTypes.TimestampType) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putLong(i,
DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i))));
}
}
}
else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
}
break;
Expand Down Expand Up @@ -362,7 +375,15 @@ private void readLongBatch(int rowId, int num, ColumnVector column) throws IOExc
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For vectorized reader, I think we should also add TimestampType support for INT64 in decodeDictionaryIds?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal Per our offline discussion, I think you should add TimestampType support for INT64 in decodeDictionaryIds. In order to test it, a test case of mixing dictionary-encoded values and non dictionary-encoded values is needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested the following test case:

test("SPARK-10634 timestamp written and read as INT64 - TIMESTAMP_MILLIS") {
  val data = (1 to 1000).map { i =>
    if (i < 500) {
      Row(new java.sql.Timestamp(10))
    } else {
      Row(new java.sql.Timestamp(i))
    }
  }
  val schema = StructType(List(StructField("time", TimestampType, false)).toArray)
  withSQLConf(ParquetOutputFormat.DICTIONARY_PAGE_SIZE -> "64",
      ParquetOutputFormat.PAGE_SIZE -> "128") {
    withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") {
      withTempPath { file =>
        val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
        df.coalesce(1).write.parquet(file.getCanonicalPath)
        ("true" :: Nil).foreach { vectorized =>
          withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
            val df2 = spark.read.parquet(file.getCanonicalPath)
            checkAnswer(df2, df.collect().toSeq)
          }
        }
      }
    }
  }
}

It will cause an exception:

[info]  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 4, localhost): java.lang.UnsupportedOperationException: Unimplemented type: TimestampType
[info]  at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.decodeDictionaryIds(VectorizedColumnReader.java:256)
[info]  at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:177)
[info]  at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230)

Copy link
Contributor Author

@dilipbiswal dilipbiswal Oct 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Thanks Simon. Very good catch !! I have made the changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change in indentation? if anything, looks like it should be indented less than the original.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito Thank you for reviewing. I have fixed the indentation.

} else if (column.dataType() == DataTypes.TimestampType) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
} else {
column.putNull(rowId + i);
}
}
} else {
throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class ParquetFileFormat
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
sparkSession.sessionState.conf.writeLegacyParquetFormat.toString)

conf.set(
SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis.toString)

// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)

Expand Down Expand Up @@ -300,6 +304,9 @@ class ParquetFileFormat
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key,
sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)

// Try to push down filters when filter push-down is enabled.
val pushed =
Expand Down Expand Up @@ -410,7 +417,8 @@ object ParquetFileFormat extends Logging {
val converter = new ParquetSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.writeLegacyParquetFormat)
sparkSession.sessionState.conf.writeLegacyParquetFormat,
sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis)

converter.convert(schema)
}
Expand Down Expand Up @@ -510,6 +518,7 @@ object ParquetFileFormat extends Logging {
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
val writeTimestampInMillis = sparkSession.sessionState.conf.isParquetINT64AsTimestampMillis
val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat
val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())

Expand Down Expand Up @@ -554,7 +563,8 @@ object ParquetFileFormat extends Logging {
new ParquetSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
writeLegacyParquetFormat = writeLegacyParquetFormat)
writeLegacyParquetFormat = writeLegacyParquetFormat,
writeTimestampInMillis = writeTimestampInMillis)

if (footers.isEmpty) {
Iterator.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.{GroupType, MessageType, Type}
import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type}
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}

Expand Down Expand Up @@ -252,6 +252,13 @@ private[parquet] class ParquetRowConverter(
case StringType =>
new ParquetStringConverter(updater)

case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(DateTimeUtils.fromMillis(value))
}
}

case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
new ParquetPrimitiveConverter(updater) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,29 @@ import org.apache.spark.sql.types._
* and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
* When set to false, use standard format defined in parquet-format spec. This argument only
* affects Parquet write path.
* @param writeTimestampInMillis Whether to write timestamp values as INT64 annotated by logical
* type TIMESTAMP_MILLIS.
*
*/
private[parquet] class ParquetSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) {
writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
writeTimestampInMillis: Boolean = SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.defaultValue.get) {

def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
writeLegacyParquetFormat = conf.writeLegacyParquetFormat)
writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
writeTimestampInMillis = conf.isParquetINT64AsTimestampMillis)

def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean)
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean,
writeTimestampInMillis = conf.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean)


/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
Expand Down Expand Up @@ -158,7 +165,7 @@ private[parquet] class ParquetSchemaConverter(
case INT_64 | null => LongType
case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
case UINT_64 => typeNotSupported()
case TIMESTAMP_MILLIS => typeNotImplemented()
case TIMESTAMP_MILLIS => TimestampType
case _ => illegalType()
}

Expand Down Expand Up @@ -370,10 +377,16 @@ private[parquet] class ParquetSchemaConverter(
// we may resort to microsecond precision in the future.
//
// For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
// currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
// hasn't implemented `TIMESTAMP_MICROS` yet.
// currently not implemented yet because parquet-mr 1.8.1 (the version we're currently using)
// hasn't implemented `TIMESTAMP_MICROS` yet, however it supports TIMESTAMP_MILLIS. We will
// encode timestamp values as TIMESTAMP_MILLIS annotating INT64 if
// 'spark.sql.parquet.int64AsTimestampMillis' is set.
//
// TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.

case TimestampType if writeTimestampInMillis =>
Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name)

case TimestampType =>
Types.primitive(INT96, repetition).named(field.name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
// Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions
private var writeLegacyParquetFormat: Boolean = _

// Whether to write timestamp value with milliseconds precision.
private var writeTimestampInMillis: Boolean = _

// Reusable byte array used to write timestamps as Parquet INT96 values
private val timestampBuffer = new Array[Byte](12)

Expand All @@ -80,6 +83,13 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != null)
configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
}

this.writeTimestampInMillis = {
assert(configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key) != null)
configuration.get(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key).toBoolean
}


this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)

val messageType = new ParquetSchemaConverter(configuration).convert(schema)
Expand Down Expand Up @@ -153,6 +163,11 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))

case TimestampType if writeTimestampInMillis =>
(row: SpecializedGetters, ordinal: Int) =>
val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
recordConsumer.addLong(millis)

case TimestampType =>
(row: SpecializedGetters, ordinal: Int) => {
// TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
| required binary g(ENUM);
| required binary h(DECIMAL(32,0));
| required fixed_len_byte_array(32) i(DECIMAL(32,0));
| required int64 j(TIMESTAMP_MILLIS);
|}
""".stripMargin)

val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0),
DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0))
DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0),
TimestampType)

withTempPath { location =>
val path = new Path(location.getCanonicalPath)
Expand Down Expand Up @@ -607,6 +609,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}

test("read dictionary and plain encoded timestamp_millis written as INT64") {
("true" :: "false" :: Nil).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
checkAnswer(
// timestamp column in this file is encoded using combination of plain
// and dictionary encodings.
readResourceParquetFile("test-data/timemillis-in-i64.parquet"),
(1 to 3).map(i => Row(new java.sql.Timestamp(10))))
}
}
}

test("SPARK-12589 copy() on rows returned from reader works for strings") {
withTempPath { dir =>
val data = (1, "abc") ::(2, "helloabcde") :: Nil
Expand Down
Loading