Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ private[parquet] object CatalystWriteSupport {
def setSchema(schema: StructType, configuration: Configuration): Unit = {
schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
configuration.set(SPARK_ROW_SCHEMA, schema.json)
configuration.set(
configuration.setIfUnset(
ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_1_0.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.util.Collections

import org.apache.parquet.column.{Encoding, ParquetProperties}

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
Expand Down Expand Up @@ -513,6 +515,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}

test("SPARK-11044 Parquet writer version fixed as version1 ") {
// For dictionary encoding, Parquet changes the encoding types according to its writer
// version. So, this test checks one of the encoding types in order to ensure that
// the file is written with writer version2.
withTempPath { dir =>
val clonedConf = new Configuration(hadoopConfiguration)
try {
// Write a Parquet file with writer version2.
hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_2_0.toString)

// By default, dictionary encoding is enabled from Parquet 1.2.0 but
// it is enabled just in case.
hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true)
val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
sqlContext.range(1 << 16).selectExpr("(id % 4) AS i")
.coalesce(1).write.mode("overwrite").parquet(path)

val blockMetadata = readFooter(new Path(path), hadoopConfiguration).getBlocks.asScala.head
val columnChunkMetadata = blockMetadata.getColumns.asScala.head

// If the file is written with version2, this should include
// Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY
assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
} finally {
// Manually clear the hadoop configuration for other tests.
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}

test("read dictionary encoded decimals written as INT32") {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary
Expand Down