From cf4077dfca1355fdfd2f9d7a3a3a2dc7d7b97946 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 22 Mar 2021 11:33:41 +0800 Subject: [PATCH 01/10] [SPARK-34817][SQL] Read parquet unsigned types that stored as int32 physical type --- .../parquet/VectorizedColumnReader.java | 4 + .../parquet/VectorizedPlainValuesReader.java | 9 ++ .../parquet/VectorizedRleValuesReader.java | 39 +++++++++ .../parquet/VectorizedValuesReader.java | 1 + .../parquet/ParquetRowConverter.scala | 5 ++ .../parquet/ParquetSchemaConverter.scala | 8 +- .../datasources/parquet/ParquetIOSuite.scala | 86 ++++++++++++++----- 7 files changed, 126 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index dac18b1abe047..e9217ee38497b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -565,6 +565,10 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw canReadAsIntDecimal(column.dataType())) { defColumn.readIntegers( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else if (column.dataType() == DataTypes.LongType) { + // We use LongType to handle UINT32 + defColumn.readIntegersAsUnsigned( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.ByteType) { defColumn.readBytes( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 994779b618829..ec74c9781d9a8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -83,6 +83,15 @@ public final void readIntegers(int total, WritableColumnVector c, int rowId) { } } + @Override + public final void readIntegersAsUnsigned(int total, WritableColumnVector c, int rowId) { + int requiredBytes = total * 4; + ByteBuffer buffer = getBuffer(requiredBytes); + for (int i = 0; i < total; i += 1) { + c.putLong(rowId + i, Integer.toUnsignedLong(buffer.getInt())); + } + } + // A fork of `readIntegers` to rebase the date values. For performance reasons, this method // iterates the values twice: check if we need to rebase first, then go to the optimized branch // if rebase is not needed. diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index a6c8292671d3f..1e5afde06e5d3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -203,6 +203,40 @@ public void readIntegers( } } + public void readIntegersAsUnsigned( + int total, + WritableColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + while (left > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + data.readIntegersAsUnsigned(n, c, rowId); + } else { + c.putNulls(rowId, n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + c.putLong(rowId + i, Integer.toUnsignedLong(data.readInteger())); + } else { + c.putNull(rowId + i); + } + } + break; + } + rowId += n; + left -= n; + currentCount -= n; + } + } + // A fork of `readIntegers`, which rebases the date int value (days) before filling // the Spark column vector. public void readIntegersWithRebase( @@ -602,6 +636,11 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) { } } + @Override + public final void readIntegersAsUnsigned(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + @Override public void readIntegersWithRebase( int total, WritableColumnVector c, int rowId, boolean failIfRebase) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java index 35db8f235ed60..ea9d8128dae74 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java @@ -41,6 +41,7 @@ public interface VectorizedValuesReader { void readBytes(int total, WritableColumnVector c, int rowId); void readIntegers(int total, WritableColumnVector c, int rowId); void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase); + void readIntegersAsUnsigned(int total, WritableColumnVector c, int rowId); void readLongs(int total, WritableColumnVector c, int rowId); void readLongsWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase); void readFloats(int total, WritableColumnVector c, int rowId); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index dca12ff6b4deb..2c610ec539ce7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -252,6 +252,11 @@ private[parquet] class ParquetRowConverter( updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = { catalystType match { + case LongType if parquetType.getOriginalType == OriginalType.UINT_32 => + new ParquetPrimitiveConverter(updater) { + override def addInt(value: Int): Unit = + updater.setLong(Integer.toUnsignedLong(value)) + } case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => new ParquetPrimitiveConverter(updater) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index ff804e25ede4b..d1155fe004553 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -130,13 +130,11 @@ class ParquetToSparkSchemaConverter( case INT32 => originalType match { case INT_8 => ByteType - case INT_16 => ShortType - case INT_32 | null => IntegerType + case INT_16 | UINT_8 => ShortType + case INT_32 | UINT_16 | null => IntegerType case DATE => DateType case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS) - case UINT_8 => typeNotSupported() - case UINT_16 => typeNotSupported() - case UINT_32 => typeNotSupported() + case UINT_32 => LongType case TIME_MILLIS => typeNotImplemented() case _ => illegalType() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index fbe651502296c..547a9ac966e07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -313,18 +313,21 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession test("SPARK-10113 Support for unsigned Parquet logical types") { val parquetSchema = MessageTypeParser.parseMessageType( """message root { - | required int32 c(UINT_32); + | required INT32 a(UINT_8); + | required INT32 b(UINT_16); + | required INT32 c(UINT_32); + | required INT64 d(UINT_64); |} """.stripMargin) + val expectedSparkTypes = Seq(ShortType, IntegerType, LongType, DecimalType(20, 0)) + withTempPath { location => val path = new Path(location.getCanonicalPath) val conf = spark.sessionState.newHadoopConf() writeMetadata(parquetSchema, path, conf) - val errorMessage = intercept[Throwable] { - spark.read.parquet(path.toString).printSchema() - }.toString - assert(errorMessage.contains("Parquet type not supported")) + val sparkTypes = spark.read.parquet(path.toString).schema.map(_.dataType) + assert(sparkTypes === expectedSparkTypes) } } @@ -381,9 +384,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession checkCompressionCodec(CompressionCodecName.SNAPPY) } + private def createParquetWriter(schema: MessageType, path: Path): ParquetWriter[Group] = { + val testWriteSupport = new TestGroupWriteSupport(schema) + /** + * Provide a builder for constructing a parquet writer - after PARQUET-248 directly + * constructing the writer is deprecated and should be done through a builder. The default + * builders include Avro - but for raw Parquet writing we must create our own builder. + */ + class ParquetWriterBuilder() extends + ParquetWriter.Builder[Group, ParquetWriterBuilder](path) { + override def getWriteSupport(conf: Configuration) = testWriteSupport + + override def self() = this + } + new ParquetWriterBuilder().build() + } + test("read raw Parquet file") { def makeRawParquetFile(path: Path): Unit = { - val schema = MessageTypeParser.parseMessageType( + val schemaStr = """ |message root { | required boolean _1; @@ -392,22 +411,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession | required float _4; | required double _5; |} - """.stripMargin) - - val testWriteSupport = new TestGroupWriteSupport(schema) - /** - * Provide a builder for constructing a parquet writer - after PARQUET-248 directly - * constructing the writer is deprecated and should be done through a builder. The default - * builders include Avro - but for raw Parquet writing we must create our own builder. - */ - class ParquetWriterBuilder() extends - ParquetWriter.Builder[Group, ParquetWriterBuilder](path) { - override def getWriteSupport(conf: Configuration) = testWriteSupport - - override def self() = this - } + """.stripMargin + val schema = MessageTypeParser.parseMessageType(schemaStr) + - val writer = new ParquetWriterBuilder().build() + val writer = createParquetWriter(schema, path) (0 until 10).foreach { i => val record = new SimpleGroup(schema) @@ -432,6 +440,42 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + test("SPARK-34817: Read UINT_8/UINT_16/UINT_32 from parquet") { + def makeRawParquetFile(path: Path): Unit = { + val schemaStr = + """message root { + | required INT32 a(UINT_8); + | required INT32 b(UINT_16); + | required INT32 c(UINT_32); + |} + """.stripMargin + val schema = MessageTypeParser.parseMessageType(schemaStr) + + + val writer = createParquetWriter(schema, path) + + (0 until 10).foreach { i => + val record = new SimpleGroup(schema) + record.add(0, i + Byte.MaxValue) + record.add(1, i + Short.MaxValue) + record.add(2, i + Int.MaxValue) + writer.write(record) + } + writer.close() + } + + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + makeRawParquetFile(path) + readParquetFile(path.toString) { df => + checkAnswer(df, (0 until 10).map { i => + Row(i + Byte.MaxValue.toShort, i + Short.MaxValue.toInt, i + Int.MaxValue.toLong) + Row(i + Byte.MaxValue, i + Short.MaxValue, i + Int.MaxValue.toLong) + + }) + } + } + } test("write metadata") { val hadoopConf = spark.sessionState.newHadoopConf() withTempPath { file => From 0c8b6d45455744745d2df87793f4acd94d58656e Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 22 Mar 2021 11:49:23 +0800 Subject: [PATCH 02/10] nit --- .../datasources/parquet/VectorizedRleValuesReader.java | 1 + .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 1e5afde06e5d3..bc7df8c5eed96 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -203,6 +203,7 @@ public void readIntegers( } } + // A fork of `readIntegers`, reading the signed integers as unsigned in long type public void readIntegersAsUnsigned( int total, WritableColumnVector c, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 547a9ac966e07..72f8660fcab4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -316,11 +316,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession | required INT32 a(UINT_8); | required INT32 b(UINT_16); | required INT32 c(UINT_32); - | required INT64 d(UINT_64); |} """.stripMargin) - val expectedSparkTypes = Seq(ShortType, IntegerType, LongType, DecimalType(20, 0)) + val expectedSparkTypes = Seq(ShortType, IntegerType, LongType) withTempPath { location => val path = new Path(location.getCanonicalPath) From 8ff3267b82dc20961a31b6b6a8d565a221497132 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 22 Mar 2021 12:43:36 +0800 Subject: [PATCH 03/10] nit --- .../datasources/parquet/VectorizedRleValuesReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index bc7df8c5eed96..6f8ba11d6964e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -638,7 +638,7 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) { } @Override - public final void readIntegersAsUnsigned(int total, WritableColumnVector c, int rowId) { + public void readIntegersAsUnsigned(int total, WritableColumnVector c, int rowId) { throw new UnsupportedOperationException("only readInts is valid."); } From 0da5d076e5f7a54a9b8b4426e93394a6e5eb37cd Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 24 Mar 2021 18:08:53 +0800 Subject: [PATCH 04/10] dictionary branch --- .../parquet/ParquetDictionary.java | 16 +++- .../parquet/VectorizedColumnReader.java | 12 ++- .../parquet/VectorizedPlainValuesReader.java | 2 +- .../parquet/VectorizedRleValuesReader.java | 6 +- .../parquet/VectorizedValuesReader.java | 2 +- .../datasources/parquet/ParquetIOSuite.scala | 92 ++++++++++--------- 6 files changed, 74 insertions(+), 56 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java index 7aa6b079073c4..a9da357b62de3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java @@ -21,16 +21,16 @@ public final class ParquetDictionary implements Dictionary { private org.apache.parquet.column.Dictionary dictionary; - private boolean castLongToInt = false; + private boolean needTransform = false; - public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary, boolean castLongToInt) { + public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary, boolean needTransform) { this.dictionary = dictionary; - this.castLongToInt = castLongToInt; + this.needTransform = needTransform; } @Override public int decodeToInt(int id) { - if (castLongToInt) { + if (needTransform) { return (int) dictionary.decodeToLong(id); } else { return dictionary.decodeToInt(id); @@ -39,7 +39,13 @@ public int decodeToInt(int id) { @Override public long decodeToLong(int id) { - return dictionary.decodeToLong(id); + if (needTransform) { + // for unsigned int32, it stores as signed int32. We need to decode it + // to int before converting to long + return Integer.toUnsignedLong(dictionary.decodeToInt(id)); + } else { + return dictionary.decodeToLong(id); + } } @Override diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index e9217ee38497b..d7e3de7dc2af0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -46,7 +46,6 @@ import org.apache.spark.sql.types.DecimalType; import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.ValuesReaderIntIterator; import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.createRLEIterator; @@ -370,6 +369,13 @@ private void decodeDictionaryIds( column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i))); } } + } else if (column.dataType() == DataTypes.LongType) { + for (int i = rowId; i < rowId + num; ++i) { + if (!column.isNullAt(i)) { + column.putLong(i, + Integer.toUnsignedLong(dictionary.decodeToInt(dictionaryIds.getDictId(i)))); + } + } } else if (column.dataType() == DataTypes.ByteType) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { @@ -566,8 +572,8 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw defColumn.readIntegers( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.LongType) { - // We use LongType to handle UINT32 - defColumn.readIntegersAsUnsigned( + // We use LongType to handle UINT32. In Parquet, UINT32 values are stored as signed integers + defColumn.readUnsignedIntegers( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.ByteType) { defColumn.readBytes( diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index ec74c9781d9a8..99beb0250a62b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -84,7 +84,7 @@ public final void readIntegers(int total, WritableColumnVector c, int rowId) { } @Override - public final void readIntegersAsUnsigned(int total, WritableColumnVector c, int rowId) { + public final void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) { int requiredBytes = total * 4; ByteBuffer buffer = getBuffer(requiredBytes); for (int i = 0; i < total; i += 1) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 6f8ba11d6964e..8c4289903b601 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -204,7 +204,7 @@ public void readIntegers( } // A fork of `readIntegers`, reading the signed integers as unsigned in long type - public void readIntegersAsUnsigned( + public void readUnsignedIntegers( int total, WritableColumnVector c, int rowId, @@ -217,7 +217,7 @@ public void readIntegersAsUnsigned( switch (mode) { case RLE: if (currentValue == level) { - data.readIntegersAsUnsigned(n, c, rowId); + data.readUnsignedIntegers(n, c, rowId); } else { c.putNulls(rowId, n); } @@ -638,7 +638,7 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) { } @Override - public void readIntegersAsUnsigned(int total, WritableColumnVector c, int rowId) { + public void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) { throw new UnsupportedOperationException("only readInts is valid."); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java index ea9d8128dae74..9f5d944329343 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java @@ -41,7 +41,7 @@ public interface VectorizedValuesReader { void readBytes(int total, WritableColumnVector c, int rowId); void readIntegers(int total, WritableColumnVector c, int rowId); void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase); - void readIntegersAsUnsigned(int total, WritableColumnVector c, int rowId); + void readUnsignedIntegers(int total, WritableColumnVector c, int rowId); void readLongs(int total, WritableColumnVector c, int rowId); void readLongsWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase); void readFloats(int total, WritableColumnVector c, int rowId); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 72f8660fcab4f..bf8a09b685449 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -28,17 +28,20 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.parquet.column.{Encoding, ParquetProperties} +import org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0 import org.apache.parquet.example.data.{Group, GroupWriter} -import org.apache.parquet.example.data.simple.SimpleGroup +import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory} import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext +import org.apache.parquet.hadoop.example.ExampleParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql._ +import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -310,7 +313,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } - test("SPARK-10113 Support for unsigned Parquet logical types") { + test("SPARK-34817: Support for unsigned Parquet logical types") { val parquetSchema = MessageTypeParser.parseMessageType( """message root { | required INT32 a(UINT_8); @@ -383,20 +386,22 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession checkCompressionCodec(CompressionCodecName.SNAPPY) } - private def createParquetWriter(schema: MessageType, path: Path): ParquetWriter[Group] = { - val testWriteSupport = new TestGroupWriteSupport(schema) - /** - * Provide a builder for constructing a parquet writer - after PARQUET-248 directly - * constructing the writer is deprecated and should be done through a builder. The default - * builders include Avro - but for raw Parquet writing we must create our own builder. - */ - class ParquetWriterBuilder() extends - ParquetWriter.Builder[Group, ParquetWriterBuilder](path) { - override def getWriteSupport(conf: Configuration) = testWriteSupport + private def createParquetWriter( + schema: MessageType, + path: Path, + dictionaryEnabled: Boolean = false): ParquetWriter[Group] = { + val hadoopConf = spark.sessionState.newHadoopConf() - override def self() = this - } - new ParquetWriterBuilder().build() + ExampleParquetWriter + .builder(path) + .withDictionaryEncoding(dictionaryEnabled) + .withType(schema) + .withWriterVersion(PARQUET_1_0) + .withCompressionCodec(GZIP) + .withRowGroupSize(1024 * 1024) + .withPageSize(1024) + .withConf(hadoopConf) + .build() } test("read raw Parquet file") { @@ -440,41 +445,42 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } test("SPARK-34817: Read UINT_8/UINT_16/UINT_32 from parquet") { - def makeRawParquetFile(path: Path): Unit = { - val schemaStr = - """message root { - | required INT32 a(UINT_8); - | required INT32 b(UINT_16); - | required INT32 c(UINT_32); - |} + Seq(true, false).foreach { dictionaryEnabled => + def makeRawParquetFile(path: Path): Unit = { + val schemaStr = + """message root { + | required INT32 a(UINT_8); + | required INT32 b(UINT_16); + | required INT32 c(UINT_32); + |} """.stripMargin - val schema = MessageTypeParser.parseMessageType(schemaStr) - + val schema = MessageTypeParser.parseMessageType(schemaStr) - val writer = createParquetWriter(schema, path) + val writer = createParquetWriter(schema, path, dictionaryEnabled) - (0 until 10).foreach { i => - val record = new SimpleGroup(schema) - record.add(0, i + Byte.MaxValue) - record.add(1, i + Short.MaxValue) - record.add(2, i + Int.MaxValue) - writer.write(record) + val factory = new SimpleGroupFactory(schema) + (0 until 10).foreach { i => + val group = factory.newGroup() + .append("a", i + Byte.MaxValue) + .append("b", i + Short.MaxValue) + .append("c", i + Int.MaxValue) + writer.write(group) + } + writer.close() } - writer.close() - } - - withTempDir { dir => - val path = new Path(dir.toURI.toString, "part-r-0.parquet") - makeRawParquetFile(path) - readParquetFile(path.toString) { df => - checkAnswer(df, (0 until 10).map { i => - Row(i + Byte.MaxValue.toShort, i + Short.MaxValue.toInt, i + Int.MaxValue.toLong) - Row(i + Byte.MaxValue, i + Short.MaxValue, i + Int.MaxValue.toLong) - }) + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + makeRawParquetFile(path) + readParquetFile(path.toString) { df => + checkAnswer(df, (0 until 10).map { i => + Row(i + Byte.MaxValue, i + Short.MaxValue, i + Int.MaxValue.toLong) + }) + } } } } + test("write metadata") { val hadoopConf = spark.sessionState.newHadoopConf() withTempPath { file => From 10ed3b9f4f9197a4b5ca98585db92858a04317c1 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 24 Mar 2021 18:38:53 +0800 Subject: [PATCH 05/10] activate dictionary decoding --- .../parquet/VectorizedColumnReader.java | 24 +++++++++++-------- .../datasources/parquet/ParquetIOSuite.scala | 16 +++++++------ 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index d7e3de7dc2af0..017b83dc9aa63 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -278,16 +278,20 @@ void readBatch(int total, WritableColumnVector column) throws IOException { // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some // non-dictionary encoded values have already been added). PrimitiveType primitiveType = descriptor.getPrimitiveType(); - if (primitiveType.getOriginalType() == OriginalType.DECIMAL && - primitiveType.getDecimalMetadata().getPrecision() <= Decimal.MAX_INT_DIGITS() && - primitiveType.getPrimitiveTypeName() == INT64) { - // We need to make sure that we initialize the right type for the dictionary otherwise - // WritableColumnVector will throw an exception when trying to decode to an Int when the - // dictionary is in fact initialized as Long - column.setDictionary(new ParquetDictionary(dictionary, true)); - } else { - column.setDictionary(new ParquetDictionary(dictionary, false)); - } + + // We need to make sure that we initialize the right type for the dictionary otherwise + // WritableColumnVector will throw an exception when trying to decode to an Int when the + // dictionary is in fact initialized as Long + boolean castLongToInt = primitiveType.getOriginalType() == OriginalType.DECIMAL && + primitiveType.getDecimalMetadata().getPrecision() <= Decimal.MAX_INT_DIGITS() && + primitiveType.getPrimitiveTypeName() == INT64; + + // We require a long value, but we need to use dictionary to decode the original + // signed int first + boolean isUnsignedInt32 = primitiveType.getOriginalType() == OriginalType.UINT_32; + + column.setDictionary( + new ParquetDictionary(dictionary, castLongToInt || isUnsignedInt32)); } else { decodeDictionaryIds(rowId, num, column, dictionaryIds); } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index bf8a09b685449..bb3fada9b3508 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -40,8 +40,8 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.sql._ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -459,11 +459,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val writer = createParquetWriter(schema, path, dictionaryEnabled) val factory = new SimpleGroupFactory(schema) - (0 until 10).foreach { i => + (0 until 1000).foreach { i => val group = factory.newGroup() - .append("a", i + Byte.MaxValue) - .append("b", i + Short.MaxValue) - .append("c", i + Int.MaxValue) + .append("a", i % 100 + Byte.MaxValue) + .append("b", i % 100 + Short.MaxValue) + .append("c", i % 100 + Int.MaxValue) writer.write(group) } writer.close() @@ -473,8 +473,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val path = new Path(dir.toURI.toString, "part-r-0.parquet") makeRawParquetFile(path) readParquetFile(path.toString) { df => - checkAnswer(df, (0 until 10).map { i => - Row(i + Byte.MaxValue, i + Short.MaxValue, i + Int.MaxValue.toLong) + checkAnswer(df, (0 until 1000).map { i => + Row(i % 100 + Byte.MaxValue, + i % 100 + Short.MaxValue, + i % 100 + Int.MaxValue.toLong) }) } } From efe9c4af56fa76aac00ca7f819cce50cb03eaf43 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 24 Mar 2021 18:45:16 +0800 Subject: [PATCH 06/10] clean --- .../datasources/parquet/ParquetIOSuite.scala | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index bb3fada9b3508..686f7a0a9f35c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -52,26 +52,6 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport -// with an empty configuration (it is after all not intended to be used in this way?) -// and members are private so we need to make our own in order to pass the schema -// to the writer. -private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] { - var groupWriter: GroupWriter = null - - override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { - groupWriter = new GroupWriter(recordConsumer, schema) - } - - override def init(configuration: Configuration): WriteContext = { - new WriteContext(schema, new java.util.HashMap[String, String]()) - } - - override def write(record: Group): Unit = { - groupWriter.write(record) - } -} - /** * A test suite that tests basic Parquet I/O. */ From 3642f91059689d498304ff08fab15608d6e8b9f2 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 24 Mar 2021 19:41:55 +0800 Subject: [PATCH 07/10] clean --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 686f7a0a9f35c..82e605fc9fd14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -24,20 +24,16 @@ import scala.collection.mutable import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.parquet.column.{Encoding, ParquetProperties} import org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0 -import org.apache.parquet.example.data.{Group, GroupWriter} +import org.apache.parquet.example.data.Group import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory} import org.apache.parquet.hadoop._ -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.hadoop.api.WriteSupport.WriteContext import org.apache.parquet.hadoop.example.ExampleParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP -import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} From 869dc147220d4bf53c923eb4c3e4d306d6da3b0c Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 25 Mar 2021 00:05:17 +0800 Subject: [PATCH 08/10] comments --- .../execution/datasources/parquet/ParquetDictionary.java | 5 +++-- .../datasources/parquet/VectorizedColumnReader.java | 9 ++++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java index a9da357b62de3..ea15607bb8d15 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java @@ -40,8 +40,9 @@ public int decodeToInt(int id) { @Override public long decodeToLong(int id) { if (needTransform) { - // for unsigned int32, it stores as signed int32. We need to decode it - // to int before converting to long + // For unsigned int32, it stores as dictionary encoded signed int32 in Parquet + // whenever dictionary is available. + // Here we lazily decode it to the original signed int value then convert to long(unit32). return Integer.toUnsignedLong(dictionary.decodeToInt(id)); } else { return dictionary.decodeToLong(id); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 017b83dc9aa63..92d107c60b42c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -374,6 +374,11 @@ private void decodeDictionaryIds( } } } else if (column.dataType() == DataTypes.LongType) { + // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType. + // For unsigned int32, it stores as dictionary encoded signed int32 in Parquet + // whenever dictionary is available. + // Here we eagerly decode it to the original signed int value then convert to + // long(unit32). for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { column.putLong(i, @@ -576,7 +581,9 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw defColumn.readIntegers( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.LongType) { - // We use LongType to handle UINT32. In Parquet, UINT32 values are stored as signed integers + // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType. + // For unsigned int32, it stores as plain signed int32 in Parquet when dictionary fall backs. + // We use read them as long values. defColumn.readUnsignedIntegers( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.ByteType) { From d9afc7916fa08f3bea9e89ab7a48cfd38c76c190 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 25 Mar 2021 00:07:08 +0800 Subject: [PATCH 09/10] comments --- .../execution/datasources/parquet/VectorizedColumnReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 92d107c60b42c..e091cbac41f75 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -583,7 +583,7 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw } else if (column.dataType() == DataTypes.LongType) { // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType. // For unsigned int32, it stores as plain signed int32 in Parquet when dictionary fall backs. - // We use read them as long values. + // We read them as long values. defColumn.readUnsignedIntegers( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.ByteType) { From f9ab2d533c8fc5877cc39436d2654dce690ad3ca Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 25 Mar 2021 10:29:28 +0800 Subject: [PATCH 10/10] nit --- .../datasources/parquet/VectorizedRleValuesReader.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 8c4289903b601..384bcb30a17c7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -205,11 +205,11 @@ public void readIntegers( // A fork of `readIntegers`, reading the signed integers as unsigned in long type public void readUnsignedIntegers( - int total, - WritableColumnVector c, - int rowId, - int level, - VectorizedValuesReader data) throws IOException { + int total, + WritableColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { int left = total; while (left > 0) { if (this.currentCount == 0) this.readNextGroup();