Skip to content

Commit 8032fca

Browse files
authored
Merge branch 'apache:master' into hudiCatalog
2 parents c5051b5 + f20acb8 commit 8032fca

File tree

3 files changed

+482
-0
lines changed

3 files changed

+482
-0
lines changed
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hudi.command.procedures
19+
20+
import org.apache.hudi.DataSourceReadOptions
21+
import org.apache.spark.internal.Logging
22+
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
23+
import org.apache.spark.sql.{Row, SaveMode}
24+
25+
import java.util.function.Supplier
26+
27+
class CopyToTableProcedure extends BaseProcedure with ProcedureBuilder with Logging {
28+
29+
30+
private val PARAMETERS = Array[ProcedureParameter](
31+
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
32+
ProcedureParameter.optional(1, "query_type", DataTypes.StringType, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL),
33+
ProcedureParameter.required(2, "new_table", DataTypes.StringType, None),
34+
ProcedureParameter.optional(3, "begin_instance_time", DataTypes.StringType, ""),
35+
ProcedureParameter.optional(4, "end_instance_time", DataTypes.StringType, ""),
36+
ProcedureParameter.optional(5, "as_of_instant", DataTypes.StringType, ""),
37+
ProcedureParameter.optional(6, "save_mode", DataTypes.StringType, "overwrite")
38+
)
39+
40+
private val OUTPUT_TYPE = new StructType(Array[StructField](
41+
StructField("status", DataTypes.IntegerType, nullable = true, Metadata.empty))
42+
)
43+
44+
def parameters: Array[ProcedureParameter] = PARAMETERS
45+
46+
def outputType: StructType = OUTPUT_TYPE
47+
48+
override def call(args: ProcedureArgs): Seq[Row] = {
49+
super.checkArgs(PARAMETERS, args)
50+
51+
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
52+
val queryType = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
53+
val newTableName = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
54+
val beginInstance = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
55+
val endInstance = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String]
56+
val asOfInstant = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[String]
57+
val saveModeStr = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[String]
58+
59+
assert(saveModeStr.nonEmpty, "save_mode(append,overwrite) can not be null.")
60+
61+
val saveMode: Any = saveModeStr.toLowerCase match {
62+
case "overwrite" => SaveMode.Overwrite
63+
case "append" => SaveMode.Append
64+
case _ => assert(assertion = false, s"save_mode not support $saveModeStr.")
65+
}
66+
67+
68+
val tablePath = getBasePath(tableName)
69+
70+
val sourceDataFrame = queryType match {
71+
case DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL => if (asOfInstant.nonEmpty) {
72+
sparkSession.read
73+
.format("org.apache.hudi")
74+
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
75+
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, asOfInstant)
76+
.load(tablePath)
77+
} else {
78+
sparkSession.read
79+
.format("org.apache.hudi")
80+
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
81+
.load(tablePath)
82+
}
83+
case DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL =>
84+
assert(beginInstance.nonEmpty && endInstance.nonEmpty, "when the query_type is incremental, begin_instance_time and end_instance_time can not be null.")
85+
sparkSession.read
86+
.format("org.apache.hudi")
87+
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
88+
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, beginInstance)
89+
.option(DataSourceReadOptions.END_INSTANTTIME.key, endInstance)
90+
.load(tablePath)
91+
case DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL =>
92+
sparkSession.read
93+
.format("org.apache.hudi")
94+
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
95+
.load(tablePath)
96+
}
97+
if (sparkSession.catalog.tableExists(newTableName)) {
98+
val schema = sparkSession.read.table(newTableName).schema
99+
val selectColumns = schema.fields.toStream.map(_.name)
100+
sourceDataFrame.selectExpr(selectColumns: _*)
101+
.write
102+
.mode(saveMode.toString)
103+
.saveAsTable(newTableName)
104+
} else {
105+
sourceDataFrame.write
106+
.mode(saveMode.toString)
107+
.saveAsTable(newTableName)
108+
}
109+
110+
111+
Seq(Row(0))
112+
}
113+
114+
override def build = new CopyToTableProcedure()
115+
}
116+
117+
object CopyToTableProcedure {
118+
val NAME = "copy_to_table"
119+
120+
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
121+
override def get() = new CopyToTableProcedure()
122+
}
123+
}
124+
125+
126+
127+
128+

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ object HoodieProcedures {
7474
mapBuilder.put(ShowMetadataStatsProcedure.NAME, ShowMetadataStatsProcedure.builder)
7575
mapBuilder.put(ValidateMetadataFilesProcedure.NAME, ValidateMetadataFilesProcedure.builder)
7676
mapBuilder.put(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder)
77+
mapBuilder.put(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
7778
mapBuilder.build
7879
}
7980
}

0 commit comments

Comments
 (0)