Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,10 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw
canReadAsIntDecimal(column.dataType())) {
defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.LongType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add an extra check to make sure we are reading unsigned values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// We use LongType to handle UINT32
defColumn.readIntegersAsUnsigned(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: readUnsighedIntegers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we follow 38fbe56 and check if dictionary encoding also needs update?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, checking~

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks irrelevant to me

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the dictionary decoding code path, change the parquet data generator a bit to produce right encoded/plain data

num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.ByteType) {
defColumn.readBytes(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ public final void readIntegers(int total, WritableColumnVector c, int rowId) {
}
}

@Override
public final void readIntegersAsUnsigned(int total, WritableColumnVector c, int rowId) {
int requiredBytes = total * 4;
ByteBuffer buffer = getBuffer(requiredBytes);
for (int i = 0; i < total; i += 1) {
c.putLong(rowId + i, Integer.toUnsignedLong(buffer.getInt()));
Copy link
Member Author

@yaooqinn yaooqinn Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can improve here by coverting the buffer.array() to unsigned stuffs, but I am not sure it's faster and how to do that right now.

}
}

// A fork of `readIntegers` to rebase the date values. For performance reasons, this method
// iterates the values twice: check if we need to rebase first, then go to the optimized branch
// if rebase is not needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,41 @@ public void readIntegers(
}
}

// A fork of `readIntegers`, reading the signed integers as unsigned in long type
public void readIntegersAsUnsigned(
int total,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
if (currentValue == level) {
data.readIntegersAsUnsigned(n, c, rowId);
} else {
c.putNulls(rowId, n);
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putLong(rowId + i, Integer.toUnsignedLong(data.readInteger()));
} else {
c.putNull(rowId + i);
}
}
break;
}
rowId += n;
left -= n;
currentCount -= n;
}
}

// A fork of `readIntegers`, which rebases the date int value (days) before filling
// the Spark column vector.
public void readIntegersWithRebase(
Expand Down Expand Up @@ -602,6 +637,11 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) {
}
}

@Override
public void readIntegersAsUnsigned(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}

@Override
public void readIntegersWithRebase(
int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public interface VectorizedValuesReader {
void readBytes(int total, WritableColumnVector c, int rowId);
void readIntegers(int total, WritableColumnVector c, int rowId);
void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase);
void readIntegersAsUnsigned(int total, WritableColumnVector c, int rowId);
void readLongs(int total, WritableColumnVector c, int rowId);
void readLongsWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase);
void readFloats(int total, WritableColumnVector c, int rowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ private[parquet] class ParquetRowConverter(
updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {

catalystType match {
case LongType if parquetType.getOriginalType == OriginalType.UINT_32 =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
updater.setLong(Integer.toUnsignedLong(value))
}
case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType =>
new ParquetPrimitiveConverter(updater)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,11 @@ class ParquetToSparkSchemaConverter(
case INT32 =>
originalType match {
case INT_8 => ByteType
case INT_16 => ShortType
case INT_32 | null => IntegerType
case INT_16 | UINT_8 => ShortType
case INT_32 | UINT_16 | null => IntegerType
case DATE => DateType
case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS)
case UINT_8 => typeNotSupported()
case UINT_16 => typeNotSupported()
case UINT_32 => typeNotSupported()
case UINT_32 => LongType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were explicitly unsupported at #9646 .. per @liancheng's advice (who's also Parquet committer). So I'm less sure if this is something we should support.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's very old. Almost 6 years ago lol. @liancheng do you have a different thought now?

Copy link
Member Author

@yaooqinn yaooqinn Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @HyukjinKwon,
Yea, I have checked that PR too. There's also a suggestion that we support them.
Lately, Wenchen created https://issues.apache.org/jira/browse/SPARK-34786 for reading uint64. As other unsigned types are not supported too and they are a bit more clear than uint64 which needs a decimal, I raised this PR to collect more opinions.

IMO, for Spark, it is worthwhile to be able to support more storage layer features without breaking our own rules.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My hunch is that Spark SQL didn't support unsigned integral types at all back then. As long as we support that now, it's OK to have.

Copy link
Contributor

@cloud-fan cloud-fan Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mostly about compatibility. Spark won't have unsigned types, but spark should be able to read existing parquet files written by other systems that support unsigned types.

case TIME_MILLIS => typeNotImplemented()
case _ => illegalType()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,18 +313,20 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
test("SPARK-10113 Support for unsigned Parquet logical types") {
val parquetSchema = MessageTypeParser.parseMessageType(
"""message root {
| required int32 c(UINT_32);
| required INT32 a(UINT_8);
| required INT32 b(UINT_16);
| required INT32 c(UINT_32);
|}
""".stripMargin)

val expectedSparkTypes = Seq(ShortType, IntegerType, LongType)

withTempPath { location =>
val path = new Path(location.getCanonicalPath)
val conf = spark.sessionState.newHadoopConf()
writeMetadata(parquetSchema, path, conf)
val errorMessage = intercept[Throwable] {
spark.read.parquet(path.toString).printSchema()
}.toString
assert(errorMessage.contains("Parquet type not supported"))
val sparkTypes = spark.read.parquet(path.toString).schema.map(_.dataType)
assert(sparkTypes === expectedSparkTypes)
}
}

Expand Down Expand Up @@ -381,9 +383,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
checkCompressionCodec(CompressionCodecName.SNAPPY)
}

private def createParquetWriter(schema: MessageType, path: Path): ParquetWriter[Group] = {
val testWriteSupport = new TestGroupWriteSupport(schema)
/**
* Provide a builder for constructing a parquet writer - after PARQUET-248 directly
* constructing the writer is deprecated and should be done through a builder. The default
* builders include Avro - but for raw Parquet writing we must create our own builder.
*/
class ParquetWriterBuilder() extends
ParquetWriter.Builder[Group, ParquetWriterBuilder](path) {
override def getWriteSupport(conf: Configuration) = testWriteSupport

override def self() = this
}
new ParquetWriterBuilder().build()
}

test("read raw Parquet file") {
def makeRawParquetFile(path: Path): Unit = {
val schema = MessageTypeParser.parseMessageType(
val schemaStr =
"""
|message root {
| required boolean _1;
Expand All @@ -392,22 +410,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
| required float _4;
| required double _5;
|}
""".stripMargin)

val testWriteSupport = new TestGroupWriteSupport(schema)
/**
* Provide a builder for constructing a parquet writer - after PARQUET-248 directly
* constructing the writer is deprecated and should be done through a builder. The default
* builders include Avro - but for raw Parquet writing we must create our own builder.
*/
class ParquetWriterBuilder() extends
ParquetWriter.Builder[Group, ParquetWriterBuilder](path) {
override def getWriteSupport(conf: Configuration) = testWriteSupport

override def self() = this
}
""".stripMargin
val schema = MessageTypeParser.parseMessageType(schemaStr)


val writer = new ParquetWriterBuilder().build()
val writer = createParquetWriter(schema, path)

(0 until 10).foreach { i =>
val record = new SimpleGroup(schema)
Expand All @@ -432,6 +439,42 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

test("SPARK-34817: Read UINT_8/UINT_16/UINT_32 from parquet") {
def makeRawParquetFile(path: Path): Unit = {
val schemaStr =
"""message root {
| required INT32 a(UINT_8);
| required INT32 b(UINT_16);
| required INT32 c(UINT_32);
|}
""".stripMargin
val schema = MessageTypeParser.parseMessageType(schemaStr)


val writer = createParquetWriter(schema, path)

(0 until 10).foreach { i =>
val record = new SimpleGroup(schema)
record.add(0, i + Byte.MaxValue)
record.add(1, i + Short.MaxValue)
record.add(2, i + Int.MaxValue)
writer.write(record)
}
writer.close()
}

withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawParquetFile(path)
readParquetFile(path.toString) { df =>
checkAnswer(df, (0 until 10).map { i =>
Row(i + Byte.MaxValue.toShort, i + Short.MaxValue.toInt, i + Int.MaxValue.toLong)
Row(i + Byte.MaxValue, i + Short.MaxValue, i + Int.MaxValue.toLong)

})
}
}
}
test("write metadata") {
val hadoopConf = spark.sessionState.newHadoopConf()
withTempPath { file =>
Expand Down