Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f2433c2
init commit
lianhuiwang Jun 16, 2016
0b93636
fix unit test
lianhuiwang Jun 16, 2016
301e950
update
lianhuiwang Jun 18, 2016
808a5fa
update createTempMacro
lianhuiwang Jun 18, 2016
f4ed3bc
address comments
lianhuiwang Jun 20, 2016
af0136d
update
lianhuiwang Jun 20, 2016
5550496
based master
lianhuiwang Nov 10, 2016
9fe1881
update code
lianhuiwang Nov 10, 2016
b8ffdc9
fix function
lianhuiwang Nov 10, 2016
fb8b57a
update comment
lianhuiwang Nov 11, 2016
e895a9c
update comments
lianhuiwang Nov 11, 2016
8d520eb
Merge branch 'master' of https://github.com/apache/spark into macro
lianhuiwang May 27, 2017
651b485
Merge branch 'master' of https://github.com/apache/spark into macro
lianhuiwang May 27, 2017
277ba9f
Merge branch 'macro' of https://github.com/lianhuiwang/spark into macro
lianhuiwang May 27, 2017
314913d
Merge branch 'macro' of https://github.com/lianhuiwang/spark into macro
lianhuiwang May 27, 2017
3d05e4f
reformat code.
lianhuiwang May 27, 2017
22d8b1a
reformat code.
lianhuiwang May 27, 2017
d91f633
reformat code.
lianhuiwang May 27, 2017
1eb23c7
reformat code.
lianhuiwang May 27, 2017
ad85109
remove type check for macro as same with hive.
lianhuiwang May 27, 2017
b52698f
add import
lianhuiwang May 27, 2017
3eacebc
treat macro as temp function like hive
lianhuiwang May 27, 2017
fce1121
add Modifier for FunctionRegistry.
lianhuiwang May 27, 2017
eaff4e9
update comments.
lianhuiwang May 27, 2017
97632a9
add dropMacro().
lianhuiwang May 27, 2017
4ee32e9
reformat code style
lianhuiwang May 27, 2017
b539e94
address some comments.
lianhuiwang May 30, 2017
1563f12
address some comments.
lianhuiwang May 30, 2017
4d8e843
update
lianhuiwang May 30, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ statement
| CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING
(USING resource (',' resource)*)? #createFunction
| DROP TEMPORARY? FUNCTION (IF EXISTS)? qualifiedName #dropFunction
| CREATE TEMPORARY MACRO macroName=identifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Hive also support non-temporary macro's.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Now Hive only support temporary macro's.

'('(columns=colTypeList)?')' expression #createMacro
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: get rid of the parenthesis, you can also just use the colTypeList?

| DROP TEMPORARY MACRO (IF EXISTS)? macroName=identifier #dropMacro
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN)? statement #explain
| SHOW TABLES ((FROM | IN) db=identifier)?
(LIKE? pattern=STRING)? #showTables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
Expand Down Expand Up @@ -589,6 +590,38 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.TEMPORARY != null)
}

/**
* Create a [[CreateMacroCommand]] command.
*
* For example:
* {{{
* CREATE TEMPORARY MACRO macro_name([col_name col_type, ...]) expression;
* }}}
*/
override def visitCreateMacro(ctx: CreateMacroContext): LogicalPlan = withOrigin(ctx) {
val arguments = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns).map { col =>
Copy link
Contributor

@hvanhovell hvanhovell Jun 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call visitColTypeList you will get a Seq[StructField], that also makes it easier to construct the AttributeReference (you don't need to parse the datatype).

AttributeReference(col.name, CatalystSqlParser.parseDataType(col.dataType))()
}
val e = expression(ctx.expression)
CreateMacroCommand(
ctx.macroName.getText,
MacroFunctionWrapper(arguments, e))
}

/**
* Create a [[DropMacroCommand]] command.
*
* For example:
* {{{
* DROP TEMPORARY MACRO [IF EXISTS] macro_name;
* }}}
*/
override def visitDropMacro(ctx: DropMacroContext): LogicalPlan = withOrigin(ctx) {
DropMacroCommand(
ctx.macroName.getText,
ctx.EXISTS != null)
}

/**
* Create a [[DropTableCommand]] command.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._

/**
* This class provides arguments and body expression of the macro.
*/
case class MacroFunctionWrapper(arguments: Seq[AttributeReference], body: Expression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed.


/**
* The DDL command that creates a macro.
* To create a temporary macro, the syntax of using this command in SQL is:
* {{{
* CREATE TEMPORARY MACRO macro_name([col_name col_type, ...]) expression;
* }}}
*/
case class CreateMacroCommand(macroName: String, macroFunction: MacroFunctionWrapper)
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val inputSet = AttributeSet(macroFunction.arguments)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

val colNames = macroFunction.arguments.map(_.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check that all parameter names are unique.

val colToIndex: Map[String, Int] = colNames.zipWithIndex.toMap
macroFunction.body.transformUp {
case u @ UnresolvedAttribute(nameParts) =>
assert(nameParts.length == 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? The only thing that matters is that the name is in parameter list.

colToIndex.get(nameParts.head).getOrElse(
throw new AnalysisException(s"Cannot create temporary macro '$macroName', " +
s"cannot resolve: [${u}] given input columns: [${inputSet}]"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

colNames?

u
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not replace this by a BoundReference? Then we don't need to do a lookup in the map, every time the macro gets used.

case _: SubqueryExpression =>
throw new AnalysisException(s"Cannot create temporary macro '$macroName', " +
s"cannot support subquery for macro.")
}

val macroInfo = macroFunction.arguments.mkString(",") + "->" + macroFunction.body.toString
val info = new ExpressionInfo(macroInfo, macroName)
val builder = (children: Seq[Expression]) => {
if (children.size != colNames.size) {
throw new AnalysisException(s"actual number of arguments: ${children.size} != " +
s"expected number of arguments: ${colNames.size} for Macro $macroName")
}
macroFunction.body.transformUp {
case u @ UnresolvedAttribute(nameParts) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A BoundReference would allow you to directly get the index. See my previous comment.

assert(nameParts.length == 1)
colToIndex.get(nameParts.head).map(children(_)).getOrElse(
throw new AnalysisException(s"Macro '$macroInfo' cannot resolve '$u' " +
s"given input expressions: [${children.mkString(",")}]"))
}
}
catalog.createTempFunction(macroName, info, builder, ignoreIfExists = false)
Seq.empty[Row]
}
}

/**
* The DDL command that drops a macro.
* ifExists: returns an error if the macro doesn't exist, unless this is true.
* {{{
* DROP TEMPORARY MACRO [IF EXISTS] macro_name;
* }}}
*/
case class DropMacroCommand(macroName: String, ifExists: Boolean)
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will drop any function... Can we make it Macro specific?

val catalog = sparkSession.sessionState.catalog
if (FunctionRegistry.builtin.functionExists(macroName)) {
throw new AnalysisException(s"Cannot drop native function '$macroName'")
}
catalog.dropTempFunction(macroName, ifExists)
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1309,4 +1309,26 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
}

test("create/drop temporary macro") {
intercept[AnalysisException] {
sql(s"CREATE TEMPORARY MACRO simple_add_error(x int) x + y")
}
sql("CREATE TEMPORARY MACRO fixed_number() 42")
checkAnswer(sql("SELECT fixed_number()"), Row(42))
sql("CREATE TEMPORARY MACRO string_len_plus_two(x string) length(x) + 2")
checkAnswer(sql("SELECT string_len_plus_two('abc')"), Row(5))
sql("CREATE TEMPORARY MACRO simple_add(x int, y int) x + y")
checkAnswer(sql("SELECT simple_add(1, 2)"), Row(3))
intercept[AnalysisException] {
sql(s"SELECT simple_add(1)")
}
sql("DROP TEMPORARY MACRO fixed_number")
intercept[AnalysisException] {
sql(s"DROP TEMPORARY MACRO abs")
}
intercept[AnalysisException] {
sql("DROP TEMPORARY MACRO SOME_MACRO")
}
sql("DROP TEMPORARY MACRO IF EXISTS SOME_MACRO")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1205,13 +1205,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assertUnsupportedFeature {
sql("ALTER INDEX my_index ON my_table set IDXPROPERTIES (\"prop1\"=\"val1_new\")")}
}

test("create/drop macro commands are not supported") {
assertUnsupportedFeature {
sql("CREATE TEMPORARY MACRO SIGMOID (x DOUBLE) 1.0 / (1.0 + EXP(-x))")
}
assertUnsupportedFeature { sql("DROP TEMPORARY MACRO SIGMOID") }
}
}

// for SPARK-2180 test
Expand Down