-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-17620][SQL] Determine Serde by hive.default.fileformat when Creating Hive Serde Tables #15190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #65742 has finished for PR 15190 at commit
|
| assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2") | ||
| } | ||
|
|
||
| test("Test default fileformat") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the test case name to Test the default fileformat for Hive-serde tables
| s""" | ||
| |CREATE TABLE IF NOT EXISTS fileformat_test (id int) | ||
| """.stripMargin | ||
| val (desc, exists) = extractTableDesc(s1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)")| s""" | ||
| |CREATE TABLE IF NOT EXISTS fileformat_test (id int) | ||
| """.stripMargin | ||
| val (desc, exists) = extractTableDesc(s1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Thanks !! I have updated as per your comments.
|
Please update the PR description. This is not for |
|
Test build #65744 has finished for PR 15190 at commit
|
|
Test build #65752 has finished for PR 15190 at commit
|
| .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), | ||
| outputFormat = defaultHiveSerde.flatMap(_.outputFormat) | ||
| .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), | ||
| // Note: Keep this unspecified because we use the presence of the serde to decide |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is kept as unspecified because it is intended to write the table with Hive write path. If we specify serde here, it will be converted to datasource table. Is it ok? cc @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @yhuai to confirm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is not valid now. This was removed by the PR: #13386 (See the code changes made in HiveMetastoreCatalog.scala)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current checking conditions are based on ctx.createFileFormat and ctx.rowFormat. Thus, I think this PR looks ok. : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya @cloud-fan Actually i am not sure, if the above comment is in sync with the code. When we had this comment, we used to have CreateTableAsSelectLogicalPlan to represent the CTAS case and we used to check for serde's presence to determine whether or not to convert it to a data source table like following.
if (sessionState.convertCTAS && table.storage.serde.isEmpty) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (table.identifier.database.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
TableIdentifier(desc.identifier.table),
conf.defaultDataSourceName,
temporary = false,
Array.empty[String],
bucketSpec = None,
mode,
options = Map.empty[String, String],
child
)
} else {
val desc = if (table.storage.serde.isEmpty) {
// add default serde
table.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
table
}I think this code has changed and moved to SparkSqlParser ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. looks ok now.
|
PR title should be |
| // Note: Keep this unspecified because we use the presence of the serde to decide | ||
| // whether to convert a table created by CTAS to a datasource table. | ||
| serde = None, | ||
| serde = defaultHiveSerde.flatMap(_.serde), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In DataSinks strategy, we set a default serde for CreateTable if tableDesc.storage.serde.isEmpty. I think we should also remove it and add .orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Please see my comment below
| assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2") | ||
| } | ||
|
|
||
| test("Test the default fileformat for Hive-serde tables") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a test for withSQLConf("hive.default.fileformat" -> "")?
|
As we set default serde/inputFormat/outFormat now, I think we don't need to set default values for them in |
|
@viirya I think we can come here from multiple code paths like visitCreateTableUsing. I think we can come to DataSinks's CreateTable case without serde being set. Let me know what you think. |
|
oh. right. Should we set default serde in |
|
Currently the default serde setting path looks scattered. I think it is better to avoid that. |
|
@viirya In my understanding, thats the datasource table code path. I am not sure if we should look at hive.default.fileformat property to set the default storage for data source tables ? In my opinion, its probably better to leave it the way it is to make sure we always have a default setting in the common path so we don't miss it. |
|
hmmm, as we prohibit |
|
@dilipbiswal yeah. I see. I still think to set default serde at different places looks not good to me, especially in other places it is hard-coded. |
|
@dilipbiswal nvm. It is not related to this change. |
|
Can you try |
|
@yhuai Hi Yin, create table ... as select ... would respect the setting of hive.default.fileformat. scala> spark.sql("SET hive.default.fileformat=parquet")
res14: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> spark.sql("CREATE TABLE tmp_default4 select * from tmp_default")
res11: org.apache.spark.sql.DataFrame = []
scala> spark.sql("DESC FORMATTED tmp_default4").collect.foreach(println)
...
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe,]
[InputFormat:,org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat,]
...
scala> spark.sql("SET hive.default.fileformat=orc")
res14: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> spark.sql("CREATE TABLE tmp_default5 select * from tmp_default")
res15: org.apache.spark.sql.DataFrame = []
scala> spark.sql("DESC FORMATTED tmp_default5").collect.foreach(println)
...
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
... |
|
@dilipbiswal Are they converted to data source tables? |
|
@gatorsmile Thanks.. didn't realize we wanted to find out the CTAS behaviour. Here is the result.. scala> spark.sql("SET spark.sql.hive.convertCTAS=true")
res23: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> spark.sql("CREATE TABLE tmp_default6 as select * from tmp_default")
res24: org.apache.spark.sql.DataFrame = []
scala> spark.sql("DESC extended tmp_default6").collect.foreach(println)
[# Detailed Table Information,CatalogTable(
Provider: parquet
Storage(Location: file:/home/mygit/apache/spark/bin/spark-warehouse/tmp_default6, InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, Serde: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Properties: [path=file:/user/hive/warehouse/tmp_default6, serialization.format=1])),] |
|
@dilipbiswal Based on your tests and the source codes, I think your fix does not break anything. |
|
Looks good. |
|
cc @yhuai @cloud-fan Based on the above PR discussion, it sounds like this PR is ok to merge. What do you think? Thank you! |
|
If we have |
|
@yhuai We will use Parquet format in your example. We look at Here is the output for your perusal. spark-sql> set spark.sql.hive.convertCTAS=true;
spark.sql.hive.convertCTAS true
Time taken: 3.309 seconds, Fetched 1 row(s)
spark-sql> set hive.default.fileformat=orc;
hive.default.fileformat orc
Time taken: 0.053 seconds, Fetched 1 row(s)
spark-sql> CREATE TABLE IF NOT EXISTS test select 1 from foo;
spark-sql> describe formatted test;
...
# Storage Information
SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat Now change spark-sql> set spark.sql.sources.default=orc;
spark.sql.sources.default orc
spark-sql> CREATE TABLE IF NOT EXISTS test2 select 1 from foo;
Time taken: 0.451 seconds
spark-sql> describe formatted test2;
...
# Storage Information
SerDe Library: org.apache.hadoop.hive.ql.io.orc.OrcSerde
InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat Please let me know if you have any further questions. |
|
Added them to your test cases. : ) |
|
Test build #66919 has finished for PR 15190 at commit
|
|
retest this please |
|
Test build #66944 has finished for PR 15190 at commit
|
|
retest this please |
|
Test build #66949 has finished for PR 15190 at commit
|
|
@gatorsmile @yhuai I have added a new test. Can we please take a look at this again ? |
|
@dilipbiswal That makes sense. Thank you for testing that. I do not have any other question. |
|
LGTM |
|
Merging to master! Thanks! |
|
Thank you @yhuai @gatorsmile @cloud-fan @viirya @dafrista |
|
I am reverting this patch. Sorry. |
|
Reverted |
|
@yhuai very sorry Yin.. Let me look what happened here.. Is there a way to open this pull request ? or i need to open a new one ? |
|
It sounds like the other merged PRs made a change and impact your code. Please resolve it. Thanks! |
|
Yea. Looks like so. No worries. Let's get it tested again. |
|
@gatorsmile @yhuai Its due to difference between scala 2.10 and 2.11 compiler in the way they deal with named parameters. Looks like 2.10 is less forgiving :-) . I have opened #15495 with the fix. |
…eating Hive Serde Tables ## What changes were proposed in this pull request? Reopens the closed PR #15190 (Please refer to the above link for review comments on the PR) Make sure the hive.default.fileformat is used to when creating the storage format metadata. Output ``` SQL scala> spark.sql("SET hive.default.fileformat=orc") res1: org.apache.spark.sql.DataFrame = [key: string, value: string] scala> spark.sql("CREATE TABLE tmp_default(id INT)") res2: org.apache.spark.sql.DataFrame = [] ``` Before ```SQL scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println) .. [# Storage Information,,] [SerDe Library:,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,] [InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,] [OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,] [Compressed:,No,] [Storage Desc Parameters:,,] [ serialization.format,1,] ``` After ```SQL scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println) .. [# Storage Information,,] [SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,] [InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,] [OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,] [Compressed:,No,] [Storage Desc Parameters:,,] [ serialization.format,1,] ``` ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Added new tests to HiveDDLCommandSuite, SQLQuerySuite Author: Dilip Biswal <[email protected]> Closes #15495 from dilipbiswal/orc2.
…eating Hive Serde Tables
## What changes were proposed in this pull request?
Make sure the hive.default.fileformat is used to when creating the storage format metadata.
Output
``` SQL
scala> spark.sql("SET hive.default.fileformat=orc")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> spark.sql("CREATE TABLE tmp_default(id INT)")
res2: org.apache.spark.sql.DataFrame = []
```
Before
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[ serialization.format,1,]
```
After
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[ serialization.format,1,]
```
## How was this patch tested?
Added new tests to HiveDDLCommandSuite
Author: Dilip Biswal <[email protected]>
Closes apache#15190 from dilipbiswal/orc.
…eating Hive Serde Tables ## What changes were proposed in this pull request? Reopens the closed PR apache#15190 (Please refer to the above link for review comments on the PR) Make sure the hive.default.fileformat is used to when creating the storage format metadata. Output ``` SQL scala> spark.sql("SET hive.default.fileformat=orc") res1: org.apache.spark.sql.DataFrame = [key: string, value: string] scala> spark.sql("CREATE TABLE tmp_default(id INT)") res2: org.apache.spark.sql.DataFrame = [] ``` Before ```SQL scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println) .. [# Storage Information,,] [SerDe Library:,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,] [InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,] [OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,] [Compressed:,No,] [Storage Desc Parameters:,,] [ serialization.format,1,] ``` After ```SQL scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println) .. [# Storage Information,,] [SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,] [InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,] [OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,] [Compressed:,No,] [Storage Desc Parameters:,,] [ serialization.format,1,] ``` ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Added new tests to HiveDDLCommandSuite, SQLQuerySuite Author: Dilip Biswal <[email protected]> Closes apache#15495 from dilipbiswal/orc2.
…eating Hive Serde Tables
## What changes were proposed in this pull request?
Make sure the hive.default.fileformat is used to when creating the storage format metadata.
Output
``` SQL
scala> spark.sql("SET hive.default.fileformat=orc")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> spark.sql("CREATE TABLE tmp_default(id INT)")
res2: org.apache.spark.sql.DataFrame = []
```
Before
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[ serialization.format,1,]
```
After
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[ serialization.format,1,]
```
## How was this patch tested?
Added new tests to HiveDDLCommandSuite
Author: Dilip Biswal <[email protected]>
Closes apache#15190 from dilipbiswal/orc.
…eating Hive Serde Tables ## What changes were proposed in this pull request? Reopens the closed PR apache#15190 (Please refer to the above link for review comments on the PR) Make sure the hive.default.fileformat is used to when creating the storage format metadata. Output ``` SQL scala> spark.sql("SET hive.default.fileformat=orc") res1: org.apache.spark.sql.DataFrame = [key: string, value: string] scala> spark.sql("CREATE TABLE tmp_default(id INT)") res2: org.apache.spark.sql.DataFrame = [] ``` Before ```SQL scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println) .. [# Storage Information,,] [SerDe Library:,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,] [InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,] [OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,] [Compressed:,No,] [Storage Desc Parameters:,,] [ serialization.format,1,] ``` After ```SQL scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println) .. [# Storage Information,,] [SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,] [InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,] [OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,] [Compressed:,No,] [Storage Desc Parameters:,,] [ serialization.format,1,] ``` ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Added new tests to HiveDDLCommandSuite, SQLQuerySuite Author: Dilip Biswal <[email protected]> Closes apache#15495 from dilipbiswal/orc2.
What changes were proposed in this pull request?
Make sure the hive.default.fileformat is used to when creating the storage format metadata.
Output
Before
After
How was this patch tested?
Added new tests to HiveDDLCommandSuite