Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ private[spark] object SQLConf {

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
// to its length exceeds the threshold.
val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold"

// Whether to perform eager analysis when constructing a dataframe.
// Set to false when debugging requires the ability to look at invalid query plans.
Expand Down Expand Up @@ -177,6 +182,11 @@ private[sql] class SQLConf extends Serializable {
private[spark] def defaultDataSourceName: String =
getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")

// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
private[spark] def schemaStringLengthThreshold: Int =
getConf(SCHEMA_STRING_LENGTH_THRESHOLD, "4000").toInt

private[spark] def dataFrameEagerAnalysis: Boolean =
getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,25 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val table = synchronized {
client.getTable(in.database, in.name)
}
val schemaString = table.getProperty("spark.sql.sources.schema")
val userSpecifiedSchema =
if (schemaString == null) {
None
} else {
Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
val schemaString = Option(table.getProperty("spark.sql.sources.schema"))
.orElse {
// If spark.sql.sources.schema is not defined, we either splitted the schema to multiple
// parts or the schema was not defined. To determine if the schema was defined,
// we check spark.sql.sources.schema.numOfParts.
Option(table.getProperty("spark.sql.sources.schema.numOfParts")) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can easily consolidate this path and remove the convoluted Option.orElse followed by pattern matching.

case Some(numOfParts) =>
val parts = (0 until numOfParts.toInt).map { index =>
Option(table.getProperty(s"spark.sql.sources.schema.part.${index}"))
.getOrElse("Could not read schema from the metastore because it is corrupted.")
}
// Stick all parts back to a single schema string in the JSON representation.
Some(parts.mkString)
case None => None // The schema was not defined.
}
}

val userSpecifiedSchema =
schemaString.flatMap(s => Some(DataType.fromJson(s).asInstanceOf[StructType]))
// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
Expand Down Expand Up @@ -119,7 +131,26 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

tbl.setProperty("spark.sql.sources.provider", provider)
if (userSpecifiedSchema.isDefined) {
tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
val threshold = hive.conf.schemaStringLengthThreshold
val schemaJsonString = userSpecifiedSchema.get.json
// Check if the size of the JSON string of the schema exceeds the threshold.
if (schemaJsonString.size > threshold) {
// Need to split the string.
val parts = schemaJsonString.grouped(threshold).toSeq
// First, record the total number of parts we have.
tbl.setProperty("spark.sql.sources.schema.numOfParts", parts.size.toString)
// Second, write every part to table property.
parts.zipWithIndex.foreach {
case (part, index) =>
tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
}
} else {
// The length is less than the threshold, just put it in the table property.
tbl.setProperty("spark.sql.sources.schema.numOfParts", "1")
// We use spark.sql.sources.schema instead of using spark.sql.sources.schema.part.0
// because users may have already created data source tables in metastore.
tbl.setProperty("spark.sql.sources.schema", schemaJsonString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we just always use schema.part.0 ? seems easier to consolidate the two code path

}
}
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,4 +591,25 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
}
}

test("SPARK-6024 wide schema support") {
// We will need 80 splits for this schema if the threshold is 4000.
val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true)))
assert(
schema.json.size > conf.schemaStringLengthThreshold,
"To correctly test the fix of SPARK-6024, the value of " +
s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}")
// Manually create a metastore data source table.
catalog.createDataSourceTable(
tableName = "wide_schema",
userSpecifiedSchema = Some(schema),
provider = "json",
options = Map("path" -> "just a dummy path"),
isExternal = false)

invalidateTable("wide_schema")

val actualSchema = table("wide_schema").schema
assert(schema === actualSchema)
}
}