-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4131] Support "Writing data into the filesystem from queries" #18975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
6ca7771
a975536
a15bf4e
9f596fd
b9db02e
e516bec
e05624f
7f5664d
d50b3a2
61a18a2
4c19aaf
47fde8a
068662a
da7065b
7f4b488
051018e
73f605e
8261b39
163c124
0882dd1
c813ad8
bc5424c
675ffd7
f36e933
64f37f4
b2068ce
8ebe5e2
51f9a0a
e2db5e1
7bb5c85
2e7a29b
7ccbde4
52350e8
2ec9947
05d9d20
392593b
511cfc3
77948bb
fd9322c
e590847
d1b4ec8
dd6a418
c2c693c
e9c88b5
62370fd
b00acdc
b64520b
3aaf6e8
b461e00
e9c24de
28fcb39
0c03a2b
6c24b1b
c4ab411
0ec103e
160c0ec
95ebfd3
4a5ff29
c99872b
449249e
7919041
aeb5d5e
81382df
f93d57a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -142,10 +142,14 @@ object UnsupportedOperationChecker { | |
| "Distinct aggregations are not supported on streaming DataFrames/Datasets. Consider " + | ||
| "using approx_count_distinct() instead.") | ||
|
|
||
|
|
||
|
||
| case _: Command => | ||
| throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + | ||
| "streaming DataFrames/Datasets") | ||
|
|
||
| case _: InsertIntoDir => | ||
| throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") | ||
|
|
||
| // mapGroupsWithState and flatMapGroupsWithState | ||
| case m: FlatMapGroupsWithState if m.isStreaming => | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,7 @@ | |
| package org.apache.spark.sql.catalyst.plans.logical | ||
|
|
||
| import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTable | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression | ||
| import org.apache.spark.sql.catalyst.plans._ | ||
|
|
@@ -359,6 +359,18 @@ case class InsertIntoTable( | |
| override lazy val resolved: Boolean = false | ||
| } | ||
|
|
||
| case class InsertIntoDir( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. added.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add function descriptions and parameter descriptions?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
| path: String, | ||
| isLocal: Boolean, | ||
| rowStorage: CatalogStorageFormat, | ||
| fileStorage: CatalogStorageFormat, | ||
| child: LogicalPlan) | ||
| extends LogicalPlan { | ||
|
|
||
| override def children: Seq[LogicalPlan] = child :: Nil | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: We can simply extend
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated. |
||
| override def output: Seq[Attribute] = Seq.empty | ||
|
||
| } | ||
|
|
||
| /** | ||
| * A container for holding the view description(CatalogTable), and the output of the view. The | ||
| * child should be a logical plan parsed from the `CatalogTable.viewText`, should throw an error | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,11 +26,12 @@ import org.antlr.v4.runtime.tree.TerminalNode | |
|
|
||
| import org.apache.spark.sql.SaveMode | ||
| import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} | ||
| import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation | ||
| import org.apache.spark.sql.catalyst.catalog._ | ||
| import org.apache.spark.sql.catalyst.expressions.Expression | ||
| import org.apache.spark.sql.catalyst.parser._ | ||
| import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, _} | ||
| import org.apache.spark.sql.execution.command._ | ||
| import org.apache.spark.sql.execution.datasources.{CreateTable, _} | ||
| import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} | ||
|
|
@@ -1499,4 +1500,34 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |
| query: LogicalPlan): LogicalPlan = { | ||
| RepartitionByExpression(expressions, query, conf.numShufflePartitions) | ||
| } | ||
|
|
||
| /** | ||
| * Add an INSERT INTO [TABLE]/INSERT OVERWRITE TABLE or INSERT INTO [LOCAL] DIRECOTRY | ||
| * operation to the logical plan. | ||
| */ | ||
| protected override def withInsertInto(ctx: InsertIntoContext, | ||
| query: LogicalPlan): LogicalPlan = withOrigin(ctx) { | ||
|
||
| val tableIdent = Option(ctx.tableIdentifier).map(visitTableIdentifier) | ||
| val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty) | ||
|
|
||
| val dynamicPartitionKeys = partitionKeys.filter(_._2.isEmpty) | ||
| if (ctx.EXISTS != null && dynamicPartitionKeys.nonEmpty) { | ||
| throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " + | ||
| "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) | ||
| } | ||
|
|
||
| validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) | ||
| val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) | ||
| .getOrElse(CatalogStorageFormat.empty) | ||
| val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) | ||
| .getOrElse(CatalogStorageFormat.empty) | ||
|
|
||
| tableIdent match { | ||
| case Some(ti: TableIdentifier) => | ||
| InsertIntoTable(UnresolvedRelation(ti), partitionKeys, query, | ||
| ctx.OVERWRITE != null, ctx.EXISTS != null) | ||
| case _ => | ||
| InsertIntoDir(string(ctx.path), ctx.LOCAL != null, rowStorage, fileStorage, query) | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.hive.execution | ||
|
|
||
| import java.net.URI | ||
| import java.util.Properties | ||
|
|
||
| import scala.language.existentials | ||
|
|
||
| import org.apache.hadoop.fs.{FileSystem, Path} | ||
| import org.apache.hadoop.hive.common.FileUtils | ||
| import org.apache.hadoop.hive.ql.plan.TableDesc | ||
| import org.apache.hadoop.hive.serde.serdeConstants | ||
| import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe | ||
| import org.apache.hadoop.mapred._ | ||
|
|
||
| import org.apache.spark.sql.{Row, SparkSession} | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.internal.HiveSerDe | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
||
| case class InsertIntoDirCommand(path: String, | ||
| isLocal: Boolean, | ||
| rowStorage: CatalogStorageFormat, | ||
| fileStorage: CatalogStorageFormat, | ||
| query: LogicalPlan) extends SaveAsHiveFile { | ||
|
|
||
| override def children: Seq[LogicalPlan] = query :: Nil | ||
|
|
||
| override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { | ||
| assert(children.length == 1) | ||
|
|
||
| val Array(cols, types) = children.head.output.foldLeft(Array("", "")) { case (r, a) => | ||
| r(0) = r(0) + a.name + "," | ||
| r(1) = r(1) + a.dataType.catalogString + ":" | ||
| r | ||
| } | ||
|
|
||
| val properties = new Properties() | ||
| properties.put("columns", cols.dropRight(1)) | ||
| properties.put("columns.types", types.dropRight(1)) | ||
|
|
||
| val sqlContext = sparkSession.sqlContext | ||
|
|
||
| val defaultStorage: CatalogStorageFormat = { | ||
| val defaultStorageType = | ||
| sqlContext.conf.getConfString("hive.default.fileformat", "textfile") | ||
| val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType) | ||
| CatalogStorageFormat( | ||
| locationUri = None, | ||
| inputFormat = defaultHiveSerde.flatMap(_.inputFormat) | ||
| .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), | ||
| outputFormat = defaultHiveSerde.flatMap(_.outputFormat) | ||
| .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), | ||
| serde = defaultHiveSerde.flatMap(_.serde), | ||
| compressed = false, | ||
| properties = Map()) | ||
| } | ||
|
|
||
| val pathUri = if (isLocal) Utils.resolveURI(path) else new URI(path) | ||
| val storage = CatalogStorageFormat( | ||
| locationUri = Some(pathUri), | ||
| inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), | ||
| outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), | ||
| serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), | ||
| compressed = false, | ||
| properties = rowStorage.properties ++ fileStorage.properties) | ||
|
|
||
| properties.put(serdeConstants.SERIALIZATION_LIB, | ||
| storage.serde.getOrElse(classOf[LazySimpleSerDe].getName)) | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| properties.putAll(rowStorage.properties.asJava) | ||
| properties.putAll(fileStorage.properties.asJava) | ||
|
|
||
| var tableDesc = new TableDesc( | ||
| Utils.classForName(storage.inputFormat.get).asInstanceOf[Class[_ <: InputFormat[_, _]]], | ||
| Utils.classForName(storage.outputFormat.get), | ||
| properties | ||
| ) | ||
|
|
||
| val hadoopConf = sparkSession.sessionState.newHadoopConf() | ||
| val jobConf = new JobConf(hadoopConf) | ||
|
|
||
| val targetPath = new Path(path) | ||
| val writeToPath = | ||
| if (isLocal) { | ||
| val localFileSystem = FileSystem.getLocal(jobConf) | ||
| val localPath = localFileSystem.makeQualified(targetPath) | ||
| if (localFileSystem.exists(localPath)) { | ||
| localFileSystem.delete(localPath, true) | ||
| } | ||
| localPath | ||
| } else { | ||
| val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf) | ||
| val dfs = qualifiedPath.getFileSystem(jobConf) | ||
| if (dfs.exists(qualifiedPath)) { | ||
| dfs.delete(qualifiedPath, true) | ||
| } else { | ||
| dfs.mkdirs(qualifiedPath.getParent) | ||
| } | ||
| qualifiedPath | ||
| } | ||
|
|
||
| val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc( | ||
| writeToPath.toString, tableDesc, false) | ||
|
|
||
| saveAsHiveFile( | ||
| sparkSession = sparkSession, | ||
| plan = children.head, | ||
| hadoopConf = hadoopConf, | ||
| fileSinkConf = fileSinkConf, | ||
| outputLocation = path) | ||
|
|
||
| Seq.empty[Row] | ||
| } | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,14 +32,12 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner | |
| import org.apache.hadoop.hive.ql.plan.TableDesc | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.internal.io.FileCommitProtocol | ||
| import org.apache.spark.sql.{AnalysisException, Row, SparkSession} | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTable | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.execution.command.{CommandUtils, DataWritingCommand} | ||
| import org.apache.spark.sql.execution.datasources.FileFormatWriter | ||
| import org.apache.spark.sql.execution.command.CommandUtils | ||
| import org.apache.spark.sql.hive._ | ||
| import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} | ||
| import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} | ||
|
|
@@ -80,7 +78,7 @@ case class InsertIntoHiveTable( | |
| partition: Map[String, Option[String]], | ||
| query: LogicalPlan, | ||
| overwrite: Boolean, | ||
| ifPartitionNotExists: Boolean) extends DataWritingCommand { | ||
| ifPartitionNotExists: Boolean) extends SaveAsHiveFile { | ||
|
|
||
| override def children: Seq[LogicalPlan] = query :: Nil | ||
|
|
||
|
|
@@ -234,10 +232,9 @@ case class InsertIntoHiveTable( | |
| override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { | ||
| assert(children.length == 1) | ||
|
|
||
| val sessionState = sparkSession.sessionState | ||
| val externalCatalog = sparkSession.sharedState.externalCatalog | ||
| val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version | ||
| val hadoopConf = sessionState.newHadoopConf() | ||
| val hadoopConf = sparkSession.sessionState.newHadoopConf() | ||
| val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about keeping
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. added it back. |
||
| val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") | ||
|
|
||
|
|
@@ -257,20 +254,6 @@ case class InsertIntoHiveTable( | |
| val tmpLocation = | ||
| getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir) | ||
| val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) | ||
| val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean | ||
|
|
||
| if (isCompressed) { | ||
| // Please note that isCompressed, "mapreduce.output.fileoutputformat.compress", | ||
| // "mapreduce.output.fileoutputformat.compress.codec", and | ||
| // "mapreduce.output.fileoutputformat.compress.type" | ||
| // have no impact on ORC because it uses table properties to store compression information. | ||
| hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") | ||
| fileSinkConf.setCompressed(true) | ||
| fileSinkConf.setCompressCodec(hadoopConf | ||
| .get("mapreduce.output.fileoutputformat.compress.codec")) | ||
| fileSinkConf.setCompressType(hadoopConf | ||
| .get("mapreduce.output.fileoutputformat.compress.type")) | ||
| } | ||
|
|
||
| val numDynamicPartitions = partition.values.count(_.isEmpty) | ||
| val numStaticPartitions = partition.values.count(_.nonEmpty) | ||
|
|
@@ -332,29 +315,20 @@ case class InsertIntoHiveTable( | |
| case _ => // do nothing since table has no bucketing | ||
| } | ||
|
|
||
| val committer = FileCommitProtocol.instantiate( | ||
| sparkSession.sessionState.conf.fileCommitProtocolClass, | ||
| jobId = java.util.UUID.randomUUID().toString, | ||
| outputPath = tmpLocation.toString) | ||
|
|
||
| val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name => | ||
| query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse { | ||
| throw new AnalysisException( | ||
| s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]") | ||
| }.asInstanceOf[Attribute] | ||
| } | ||
|
|
||
| FileFormatWriter.write( | ||
| saveAsHiveFile( | ||
| sparkSession = sparkSession, | ||
| plan = children.head, | ||
| fileFormat = new HiveFileFormat(fileSinkConf), | ||
| committer = committer, | ||
| outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty), | ||
| hadoopConf = hadoopConf, | ||
| partitionColumns = partitionAttributes, | ||
| bucketSpec = None, | ||
| statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), | ||
| options = Map.empty) | ||
| fileSinkConf = fileSinkConf, | ||
| outputLocation = tmpLocation.toString, | ||
| partitionAttributes = partitionAttributes) | ||
|
|
||
| if (partition.nonEmpty) { | ||
| if (numDynamicPartitions > 0) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add it to
TableIdentifierParserSuite?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is already in TableIdentifierParserSuite