Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[mllib] object Loader {
assert(loadedFields.contains(field.name), s"Unable to parse model data." +
s" Expected field with name ${field.name} was missing in loaded schema:" +
s" ${loadedFields.mkString(", ")}")
assert(loadedFields(field.name) == field.dataType,
assert(loadedFields(field.name).typeName == field.dataType.typeName,
s"Unable to parse model data. Expected field $field but found field" +
s" with different type: ${loadedFields(field.name)}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import parquet.io.api._
import parquet.schema.MessageType

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -381,7 +381,14 @@ private[parquet] object RowWriteSupport {
}

def setSchema(schema: Seq[Attribute], configuration: Configuration) {
val encoded = ParquetTypesConverter.convertToString(schema)
val updatedSchama = schema.map {
case a if a.dataType.isInstanceOf[ArrayType] =>
val newArray = ArrayType(a.dataType.asInstanceOf[ArrayType].elementType)
val newAttr = AttributeReference(a.name, newArray, a.nullable, a.metadata)()
newAttr
case other => other
}
val encoded = ParquetTypesConverter.convertToString(updatedSchama)
configuration.set(SPARK_ROW_SCHEMA, encoded)
configuration.set(
ParquetOutputFormat.WRITER_VERSION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
override val extendedResolutionRules =
catalog.PreInsertionCasts ::
catalog.ParquetConversions ::
catalog.CreateTables ::
catalog.PreInsertionCasts ::
ExtractPythonUdfs ::
ResolveUdtfsAlias ::
sources.PreInsertCastAndRename ::
Expand Down Expand Up @@ -343,6 +343,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq(
DataSourceStrategy,
HiveDataSourceStrategy,
HiveCommandStrategy(self),
HiveDDLStrategy,
DDLStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Collects all `MetastoreRelation`s which should be replaced
val toBeReplaced = plan.collect {
// Write path
case InsertIntoTable(relation: MetastoreRelation, _, _, _)
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is right here. ParquetConversions is an analysis rule, which only processes logical plans. However, InsertIntoHiveTable is a physical plan node.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InsertIntoHiveTable is a LogicalPlan defined in HiveMetastoreCatalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I mistook this for the physical plan with the same name...

// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
Expand Down Expand Up @@ -458,6 +458,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

withAlias
}
case InsertIntoHiveTable(r: MetastoreRelation, p, c, o) if relationMap.contains(r) =>
val parquetRelation = relationMap(r)
InsertIntoHiveTable(parquetRelation, p, c, o)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, CreateTableUsing}
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, CreateTableUsing, LogicalRelation, InsertIntoDataSource, InsertableRelation}
import org.apache.spark.sql.types.StringType


Expand Down Expand Up @@ -254,4 +254,13 @@ private[hive] trait HiveStrategies {
case _ => Nil
}
}

object HiveDataSourceStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case i @ InsertIntoHiveTable(
l @ LogicalRelation(t: InsertableRelation), part, query, overwrite) if part.isEmpty =>
ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
case _ => Nil
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.parquet

import java.io.File

import scala.collection.mutable.ArrayBuffer

import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.{SQLConf, QueryTest}
Expand Down Expand Up @@ -299,6 +301,37 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
super.afterAll()
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}

test("insert array into parquet hive table using data source api") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried this test with our master, it did not fail. I think you need to first turn off the conversion for the write path and then turn on the conversion for the read path. You can use spark.sql.parquet.useDataSourceApi to control it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.parquet.useDataSourceApi is turn on already in the unit test I added. It failed on the master I just pulled.

val data1="""{ "timestamp": 1422435598, "data_array": [ { "field0": null, "field1": 1, "field2": 2} ] }"""
val data2="""{ "timestamp": 1422435599, "data_array": [ { "field0": 0, "field1": null, "field2": 3} ] }"""

val json = sparkContext.makeRDD(data1 :: data2 :: Nil)
val rdd = jsonRDD(json)
rdd.registerTempTable("tmp_table")

val partitionedTableDir = File.createTempFile("persisted_table", "sparksql")
partitionedTableDir.delete()
partitionedTableDir.mkdir()

sql(
s"""
|create external table persisted_table
|(
| data_array ARRAY <STRUCT<field0: BIGINT, field1: BIGINT, field2: BIGINT>>,
| timestamp BIGINT
|)
|STORED AS PARQUET Location '${partitionedTableDir.getCanonicalPath}'
""".stripMargin)

sql("insert into table persisted_table select * from tmp_table").collect

checkAnswer(
sql("select data_array.field0, data_array.field1, data_array.field2 from persisted_table"),
Row(ArrayBuffer(null), ArrayBuffer(1), ArrayBuffer(2)) ::
Row (ArrayBuffer(0), ArrayBuffer(null), ArrayBuffer(3)) :: Nil
)
}
}

class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {
Expand Down