Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ private[parquet] object CatalystWriteSupport {
def setSchema(schema: StructType, configuration: Configuration): Unit = {
schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
configuration.set(SPARK_ROW_SCHEMA, schema.json)
configuration.set(
configuration.setIfUnset(
ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_1_0.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.util.Collections

import org.apache.parquet.column.{Encoding, ParquetProperties}

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
Expand Down Expand Up @@ -513,6 +515,41 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}

test("SPARK-11044 Parquet writer version fixed as version1 ") {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Remove this empty line.

// For dictionary encoding, Parquet changes the encoding types according to its writer version
// So, this test checks the encoding types in order to ensure that the file is written with
// writer version2.
withTempPath { dir =>
val clonedConf = new Configuration(hadoopConfiguration)
try {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Remove this empty line.

// Write a Parquet file with writer version 2
hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_2_0.toString)

// By default, dictionary encoding is enabled from Parquet 1.2.0 but
// it is enabled just in case.
hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true)
val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
sqlContext.range(1 << 16).selectExpr("(id % 4) AS i")
.coalesce(1).write.mode("overwrite").parquet(path)

val blockMetadata = readFooter(new Path(path), hadoopConfiguration).getBlocks.asScala.head
val columnChunkMetadata = blockMetadata.getColumns.asScala.head

// If the file is written with version 2, this should include
// [[Encoding.RLE_DICTIONARY]] type. For version 1, it is Encoding.PLAIN_DICTIONARY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the [[...]] notation is only useful when writing ScalaDoc. In case of inline comment s like this, you may either omit the brackets or use backquotes to emphasize that the quoted part is a Scala/Java entity.

assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
} finally {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Remove this empty line.

// Manually clear the hadoop configuration for other tests.
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}

test("read dictionary encoded decimals written as INT32") {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary
Expand Down