From b2cdcdf2d935bda3218cc97a32bdf5f08318c0af Mon Sep 17 00:00:00 2001 From: huangxingbo Date: Sat, 15 May 2021 15:24:43 +0800 Subject: [PATCH 1/4] [FLINK-22650][python][table-planner-blink] Support StreamExecPythonCorrelate json serialization/deserialization --- flink-python/pyflink/table/tests/test_udtf.py | 50 ++- .../exec/batch/BatchExecPythonCorrelate.java | 11 +- .../common/CommonExecPythonCorrelate.java | 22 +- .../stream/StreamExecPythonCorrelate.java | 38 +- .../logical/PythonCorrelateSplitRule.java | 66 +++- .../rules/logical/PythonCalcSplitRule.scala | 10 +- .../exec/stream/JsonSerdeCoverageTest.java | 1 - .../stream/PythonCorrelateJsonPlanTest.java | 75 ++++ .../testPythonTableFunction.out | 329 ++++++++++++++++++ .../CalcPythonCorrelateTransposeRuleTest.xml | 8 +- .../logical/PythonCorrelateSplitRuleTest.xml | 17 +- 11 files changed, 593 insertions(+), 34 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out diff --git a/flink-python/pyflink/table/tests/test_udtf.py b/flink-python/pyflink/table/tests/test_udtf.py index 98ca8f5c8f8db..976b2f6dc8dcc 100644 --- a/flink-python/pyflink/table/tests/test_udtf.py +++ b/flink-python/pyflink/table/tests/test_udtf.py @@ -78,7 +78,55 @@ class PyFlinkStreamUserDefinedTableFunctionTests(UserDefinedTableFunctionTests, class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests, PyFlinkBlinkStreamTableTestCase): - pass + def test_execute_from_json_plan(self): + # create source file path + tmp_dir = self.tempdir + data = ['1,1', '3,2', '2,1'] + source_path = tmp_dir + '/test_execute_from_json_plan_input.csv' + sink_path = tmp_dir + '/test_execute_from_json_plan_out' + with open(source_path, 'w') as fd: + for ele in data: + fd.write(ele + '\n') + + source_table = """ + CREATE TABLE source_table ( + a BIGINT, + b BIGINT + ) WITH ( + 'connector' = 'filesystem', + 'path' = '%s', + 'format' = 'csv' + ) + """ % source_path + self.t_env.execute_sql(source_table) + + self.t_env.execute_sql(""" + CREATE TABLE sink_table ( + a BIGINT, + b BIGINT, + c BIGINT + ) WITH ( + 'connector' = 'filesystem', + 'path' = '%s', + 'format' = 'csv' + ) + """ % sink_path) + + self.t_env.create_temporary_system_function( + "multi_emit", udtf(MultiEmit(), result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()])) + + json_plan = self.t_env._j_tenv.getJsonPlan("INSERT INTO sink_table " + "SELECT a, x, y FROM source_table " + "LEFT JOIN LATERAL TABLE(multi_emit(a, b))" + " as T(x, y)" + " ON TRUE") + from py4j.java_gateway import get_method + get_method(self.t_env._j_tenv.executeJsonPlan(json_plan), "await")() + + import glob + lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')] + lines.sort() + self.assertEqual(lines, ['1,1,0', '2,2,0', '3,3,0', '3,3,1']) class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedTableFunctionTests, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java index 9e4945ff10b87..1ee749acf6a66 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java @@ -27,6 +27,8 @@ import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; +import java.util.Collections; + /** Batch exec node which matches along with join a Python user defined table function. */ public class BatchExecPythonCorrelate extends CommonExecPythonCorrelate implements BatchExecNode { @@ -38,6 +40,13 @@ public BatchExecPythonCorrelate( InputProperty inputProperty, RowType outputType, String description) { - super(joinType, invocation, condition, inputProperty, outputType, description); + super( + joinType, + invocation, + condition, + getNewNodeId(), + Collections.singletonList(inputProperty), + outputType, + description); } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java index 12720d12c833b..6aeac8660bb18 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java @@ -38,31 +38,47 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import java.lang.reflect.Constructor; -import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; /** Base {@link ExecNode} which matches along with join a Python user defined table function. */ +@JsonIgnoreProperties(ignoreUnknown = true) public abstract class CommonExecPythonCorrelate extends ExecNodeBase implements SingleTransformationTranslator { + + public static final String FIELD_NAME_JOIN_TYPE = "joinType"; + public static final String FIELD_NAME_FUNCTION_CALL = "functionCall"; + public static final String FIELD_NAME_CONDITION = "condition"; + private static final String PYTHON_TABLE_FUNCTION_OPERATOR_NAME = "org.apache.flink.table.runtime.operators.python.table.RowDataPythonTableFunctionOperator"; + @JsonProperty(FIELD_NAME_JOIN_TYPE) private final FlinkJoinType joinType; + + @JsonProperty(FIELD_NAME_FUNCTION_CALL) private final RexCall invocation; public CommonExecPythonCorrelate( FlinkJoinType joinType, RexCall invocation, RexNode condition, - InputProperty inputProperty, + int id, + List inputProperties, RowType outputType, String description) { - super(Collections.singletonList(inputProperty), outputType, description); + super(id, inputProperties, outputType, description); + checkArgument(inputProperties.size() == 1); this.joinType = joinType; this.invocation = invocation; if (joinType == FlinkJoinType.LEFT && condition != null) { diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java index e1e6be73ade32..88bad60232c21 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java @@ -24,10 +24,20 @@ import org.apache.flink.table.runtime.operators.join.FlinkJoinType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; + /** Stream exec node which matches along with join a Python user defined table function. */ +@JsonIgnoreProperties(ignoreUnknown = true) public class StreamExecPythonCorrelate extends CommonExecPythonCorrelate implements StreamExecNode { public StreamExecPythonCorrelate( @@ -37,6 +47,32 @@ public StreamExecPythonCorrelate( InputProperty inputProperty, RowType outputType, String description) { - super(joinType, invocation, condition, inputProperty, outputType, description); + this( + joinType, + invocation, + condition, + getNewNodeId(), + Collections.singletonList(inputProperty), + outputType, + description); + } + + @JsonCreator + public StreamExecPythonCorrelate( + @JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType joinType, + @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation, + @JsonProperty(FIELD_NAME_CONDITION) @Nullable RexNode condition, + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { + super( + joinType, + (RexCall) invocation, + condition, + id, + inputProperties, + outputType, + description); } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java index 80b2f1936885e..829a4bd7be45e 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java @@ -29,14 +29,17 @@ import org.apache.calcite.plan.hep.HepRelVertex; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; import org.apache.calcite.rex.RexFieldAccess; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexProgram; import org.apache.calcite.rex.RexProgramBuilder; import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.rex.RexVisitorImpl; import org.apache.calcite.sql.validate.SqlValidatorUtil; import java.util.LinkedList; @@ -112,10 +115,36 @@ private List createNewFieldNames( for (int i = 0; i < primitiveFieldCount; i++) { calcProjects.add(RexInputRef.of(i, rowType)); } + // change RexCorrelVariable to RexInputRef. + RexVisitorImpl visitor = + new RexVisitorImpl(true) { + @Override + public RexNode visitFieldAccess(RexFieldAccess fieldAccess) { + RexNode expr = fieldAccess.getReferenceExpr(); + if (expr instanceof RexCorrelVariable) { + RelDataTypeField field = fieldAccess.getField(); + return new RexInputRef(field.getIndex(), field.getType()); + } else { + return rexBuilder.makeFieldAccess( + expr.accept(this), fieldAccess.getField().getIndex()); + } + } + }; // add the fields of the extracted rex calls. Iterator iterator = extractedRexNodes.iterator(); while (iterator.hasNext()) { - calcProjects.add(iterator.next()); + RexNode rexNode = iterator.next(); + if (rexNode instanceof RexCall) { + RexCall rexCall = (RexCall) rexNode; + List newProjects = + rexCall.getOperands().stream() + .map(x -> x.accept(visitor)) + .collect(Collectors.toList()); + RexCall newRexCall = rexCall.clone(rexCall.getType(), newProjects); + calcProjects.add(newRexCall); + } else { + calcProjects.add(rexNode); + } } List nameList = new LinkedList<>(); @@ -252,18 +281,31 @@ public void onMatch(RelOptRuleCall call) { mergedCalc.copy(mergedCalc.getTraitSet(), newScan, mergedCalc.getProgram()); } - FlinkLogicalCalc leftCalc = - createNewLeftCalc(left, rexBuilder, extractedRexNodes, correlate); + FlinkLogicalCorrelate newCorrelate; + if (extractedRexNodes.size() > 0) { + FlinkLogicalCalc leftCalc = + createNewLeftCalc(left, rexBuilder, extractedRexNodes, correlate); - FlinkLogicalCorrelate newCorrelate = - new FlinkLogicalCorrelate( - correlate.getCluster(), - correlate.getTraitSet(), - leftCalc, - rightNewInput, - correlate.getCorrelationId(), - correlate.getRequiredColumns(), - correlate.getJoinType()); + newCorrelate = + new FlinkLogicalCorrelate( + correlate.getCluster(), + correlate.getTraitSet(), + leftCalc, + rightNewInput, + correlate.getCorrelationId(), + correlate.getRequiredColumns(), + correlate.getJoinType()); + } else { + newCorrelate = + new FlinkLogicalCorrelate( + correlate.getCluster(), + correlate.getTraitSet(), + left, + rightNewInput, + correlate.getCorrelationId(), + correlate.getRequiredColumns(), + correlate.getJoinType()); + } FlinkLogicalCalc newTopCalc = createTopCalc( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala index 10596b3b96e34..c6c9985dff9f9 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala @@ -22,7 +22,7 @@ import java.util.function.Function import org.apache.calcite.plan.RelOptRule.{any, operand} import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.rex.{RexBuilder, RexCall, RexFieldAccess, RexInputRef, RexLocalRef, RexNode, RexProgram} +import org.apache.calcite.rex.{RexBuilder, RexCall, RexCorrelVariable, RexFieldAccess, RexInputRef, RexLocalRef, RexNode, RexProgram} import org.apache.calcite.sql.validate.SqlValidatorUtil import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.functions.python.PythonFunctionKind @@ -393,7 +393,13 @@ private class ScalarFunctionSplitter( expr match { case localRef: RexLocalRef if containsPythonCall(program.expandLocalRef(localRef)) => getExtractedRexFieldAccess(fieldAccess, localRef.getIndex) - case _ => getExtractedRexNode(fieldAccess) + case _: RexCorrelVariable => + val field = fieldAccess.getField + new RexInputRef(field.getIndex, field.getType) + case _ => + val newFieldAccess = rexBuilder.makeFieldAccess( + expr.accept(this), fieldAccess.getField.getIndex) + getExtractedRexNode(newFieldAccess) } } else { fieldAccess diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java index e3539a3282698..6ed1b2cac8e2a 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java @@ -48,7 +48,6 @@ public class JsonSerdeCoverageTest { "StreamExecGroupTableAggregate", "StreamExecPythonGroupTableAggregate", "StreamExecPythonOverAggregate", - "StreamExecPythonCorrelate", "StreamExecSort", "StreamExecMultipleInput", "StreamExecValues"); diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java new file mode 100644 index 0000000000000..c725d7b2130af --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction; +import org.apache.flink.table.planner.utils.MockPythonTableFunction; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +/** Test json serialization/deserialization for correlate. */ +public class PythonCorrelateJsonPlanTest extends TableTestBase { + private StreamTableTestUtil util; + private TableEnvironment tEnv; + + @Before + public void setup() { + TableConfig tableConfig = TableConfig.getDefault(); + util = streamTestUtil(tableConfig); + util.addFunction( + "TableFunc", new MockPythonTableFunction(), new RowTypeInfo(Types.INT, Types.INT)); + util.addFunction("pyFunc", new PythonScalarFunction("pyFunc")); + tEnv = util.getTableEnv(); + + String srcTableDdl = + "CREATE TABLE MyTable (\n" + + " a int,\n" + + " b int,\n" + + " c int,\n" + + " d timestamp(3)\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false')"; + tEnv.executeSql(srcTableDdl); + } + + @Test + public void testPythonTableFunction() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a int,\n" + + " b int\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + + String sqlQuery = + "INSERT INTO MySink SELECT x, y FROM MyTable " + + "LEFT JOIN LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) ON TRUE"; + util.verifyJsonPlan(sqlQuery); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out new file mode 100644 index 0000000000000..c2b71f2b9dcd6 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out @@ -0,0 +1,329 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "INT", + "schema.3.name" : "d", + "connector" : "values", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "bounded" : "false", + "schema.0.name" : "a", + "schema.1.data-type" : "INT" + } + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "INT" + }, { + "c" : "INT" + }, { + "d" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "*", + "kind" : "TIMES", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "INT" + }, { + "c" : "INT" + }, { + "d" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "f0" : "INT" + } ] + }, + "description" : "Calc(select=[a, b, c, d, (a * a) AS f0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate", + "joinType" : "LEFT", + "functionCall" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "TableFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "displayName" : "pyFunc", + "functionKind" : "SCALAR", + "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRTY2FsYXJGdW5jdGlvbnMkUHl0aG9uU2NhbGFyRnVuY3Rpb275pBZGRJT8qAIAAUwABG5hbWV0ABJMamF2YS9sYW5nL1N0cmluZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb26383IwrjqOqQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cHQABnB5RnVuYw" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "structKind" : "FULLY_QUALIFIED", + "nullable" : false, + "fields" : [ { + "typeName" : "INTEGER", + "nullable" : true, + "fieldName" : "f0" + }, { + "typeName" : "INTEGER", + "nullable" : true, + "fieldName" : "f1" + } ] + } + }, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "INT" + }, { + "c" : "INT" + }, { + "d" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "f0" : "INT" + }, { + "f00" : "INT" + }, { + "f1" : "INT" + } ] + }, + "description" : "PythonCorrelate(invocation=[TableFunc($4, pyFunc($0, $1))], correlate=[table(TableFunc(f0,pyFunc(a, b)))], select=[a,b,c,d,f0,f00,f1], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER c, TIMESTAMP(3) d, INTEGER f0, INTEGER f00, INTEGER f1)], joinType=[LEFT])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "x" : "INT" + }, { + "y" : "INT" + } ] + }, + "description" : "Calc(select=[f00 AS x, f1 AS y])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "table-sink-class" : "DEFAULT", + "connector" : "values", + "schema.0.data-type" : "INT", + "schema.1.name" : "b", + "schema.0.name" : "a", + "schema.1.data-type" : "INT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "x" : "INT" + }, { + "y" : "INT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[x, y])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml index a9664372a4950..72ac8d846689a 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml @@ -31,12 +31,12 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$3], y=[$4]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml index 0d4bf9e6f10f4..c77a62519f836 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml @@ -32,9 +32,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$3]) @@ -53,11 +52,11 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$3], y=[$4]) @@ -78,7 +77,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$4]) FlinkLogicalCalc(select=[a, b, c, f00 AS x]) +- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}]) :- FlinkLogicalCalc(select=[a, b, c, d, pyFunc(f0) AS f0]) - : +- FlinkLogicalCalc(select=[a, b, c, d, $cor0.d._1 AS f0]) + : +- FlinkLogicalCalc(select=[a, b, c, d, d._1 AS f0]) : +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d]) +- FlinkLogicalTableFunctionScan(invocation=[javaFunc($4)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;]) ]]> @@ -100,9 +99,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$4], y=[$5]) From 81b70108c20a70dab8cbd8fb715ab7fc8ea61b9c Mon Sep 17 00:00:00 2001 From: huangxingbo Date: Mon, 17 May 2021 11:06:36 +0800 Subject: [PATCH 2/4] add filter test --- .../stream/PythonCorrelateJsonPlanTest.java | 17 + .../testJoinWithFilter.out | 431 ++++++++++++++++++ 2 files changed, 448 insertions(+) create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java index c725d7b2130af..7b2739ee46e62 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java @@ -72,4 +72,21 @@ public void testPythonTableFunction() { + "LEFT JOIN LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) ON TRUE"; util.verifyJsonPlan(sqlQuery); } + + @Test + public void testJoinWithFilter() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a int,\n" + + " b int\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + + String sqlQuery = + "INSERT INTO MySink SELECT x, y FROM MyTable, " + + "LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) WHERE x = a and y + 1 = y * y"; + util.verifyJsonPlan(sqlQuery); + } } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out new file mode 100644 index 0000000000000..fe5899b938e72 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out @@ -0,0 +1,431 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "INT", + "schema.3.name" : "d", + "connector" : "values", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "bounded" : "false", + "schema.0.name" : "a", + "schema.1.data-type" : "INT" + } + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "INT" + }, { + "c" : "INT" + }, { + "d" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "*", + "kind" : "TIMES", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "INT" + }, { + "c" : "INT" + }, { + "d" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "f0" : "INT" + } ] + }, + "description" : "Calc(select=[a, b, c, d, (a * a) AS f0])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate", + "joinType" : "INNER", + "functionCall" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "TableFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "pyFunc", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION", + "displayName" : "pyFunc", + "functionKind" : "SCALAR", + "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRTY2FsYXJGdW5jdGlvbnMkUHl0aG9uU2NhbGFyRnVuY3Rpb275pBZGRJT8qAIAAUwABG5hbWV0ABJMamF2YS9sYW5nL1N0cmluZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb26383IwrjqOqQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cHQABnB5RnVuYw" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "structKind" : "FULLY_QUALIFIED", + "nullable" : false, + "fields" : [ { + "typeName" : "INTEGER", + "nullable" : true, + "fieldName" : "f0" + }, { + "typeName" : "INTEGER", + "nullable" : true, + "fieldName" : "f1" + } ] + } + }, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "INT" + }, { + "c" : "INT" + }, { + "d" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "f0" : "INT" + }, { + "f00" : "INT" + }, { + "f1" : "INT" + } ] + }, + "description" : "PythonCorrelate(invocation=[TableFunc($4, pyFunc($0, $1))], correlate=[table(TableFunc(f0,pyFunc(a, b)))], select=[a,b,c,d,f0,f00,f1], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER c, TIMESTAMP(3) d, INTEGER f0, INTEGER f00, INTEGER f1)], joinType=[INNER])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "condition" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "AND", + "kind" : "AND", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "REX_CALL", + "operator" : { + "name" : "=", + "kind" : "EQUALS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "REX_CALL", + "operator" : { + "name" : "+", + "kind" : "PLUS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "*", + "kind" : "TIMES", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BOOLEAN", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "=", + "kind" : "EQUALS", + "syntax" : "BINARY" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BOOLEAN", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BOOLEAN", + "nullable" : true + } + }, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "x" : "INT" + }, { + "y" : "INT" + } ] + }, + "description" : "Calc(select=[f00 AS x, f1 AS y], where=[(((f1 + 1) = (f1 * f1)) AND (f00 = a))])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "table-sink-class" : "DEFAULT", + "connector" : "values", + "schema.0.data-type" : "INT", + "schema.1.name" : "b", + "schema.0.name" : "a", + "schema.1.data-type" : "INT" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "x" : "INT" + }, { + "y" : "INT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[x, y])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file From ca251c70438c994771ea375f648203f8555e4965 Mon Sep 17 00:00:00 2001 From: huangxingbo Date: Mon, 17 May 2021 15:13:59 +0800 Subject: [PATCH 3/4] rebase master and move the logic of condition check from CommonExecPythonCorrelate to physical node --- .../nodes/exec/batch/BatchExecPythonCorrelate.java | 3 --- .../exec/common/CommonExecPythonCorrelate.java | 6 ------ .../exec/stream/StreamExecPythonCorrelate.java | 6 ------ .../batch/BatchPhysicalPythonCorrelate.scala | 10 +++++++++- .../stream/StreamPhysicalPythonCorrelate.scala | 13 +++++++++++-- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java index 1ee749acf6a66..4372ff9e76cfb 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java @@ -25,7 +25,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexNode; import java.util.Collections; @@ -36,14 +35,12 @@ public class BatchExecPythonCorrelate extends CommonExecPythonCorrelate public BatchExecPythonCorrelate( FlinkJoinType joinType, RexCall invocation, - RexNode condition, InputProperty inputProperty, RowType outputType, String description) { super( joinType, invocation, - condition, getNewNodeId(), Collections.singletonList(inputProperty), outputType, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java index 6aeac8660bb18..a5492d228d3b9 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java @@ -58,7 +58,6 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase public static final String FIELD_NAME_JOIN_TYPE = "joinType"; public static final String FIELD_NAME_FUNCTION_CALL = "functionCall"; - public static final String FIELD_NAME_CONDITION = "condition"; private static final String PYTHON_TABLE_FUNCTION_OPERATOR_NAME = "org.apache.flink.table.runtime.operators.python.table.RowDataPythonTableFunctionOperator"; @@ -72,7 +71,6 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase public CommonExecPythonCorrelate( FlinkJoinType joinType, RexCall invocation, - RexNode condition, int id, List inputProperties, RowType outputType, @@ -81,10 +79,6 @@ public CommonExecPythonCorrelate( checkArgument(inputProperties.size() == 1); this.joinType = joinType; this.invocation = invocation; - if (joinType == FlinkJoinType.LEFT && condition != null) { - throw new TableException( - "Currently Python correlate does not support conditions in left join."); - } } @SuppressWarnings("unchecked") diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java index 88bad60232c21..702ee8f89fc05 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java @@ -31,8 +31,6 @@ import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; -import javax.annotation.Nullable; - import java.util.Collections; import java.util.List; @@ -43,14 +41,12 @@ public class StreamExecPythonCorrelate extends CommonExecPythonCorrelate public StreamExecPythonCorrelate( FlinkJoinType joinType, RexCall invocation, - RexNode condition, InputProperty inputProperty, RowType outputType, String description) { this( joinType, invocation, - condition, getNewNodeId(), Collections.singletonList(inputProperty), outputType, @@ -61,7 +57,6 @@ public StreamExecPythonCorrelate( public StreamExecPythonCorrelate( @JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType joinType, @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation, - @JsonProperty(FIELD_NAME_CONDITION) @Nullable RexNode condition, @JsonProperty(FIELD_NAME_ID) int id, @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, @@ -69,7 +64,6 @@ public StreamExecPythonCorrelate( super( joinType, (RexCall) invocation, - condition, id, inputProperties, outputType, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala index 53f8405cc47a3..298476c0ce1df 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.table.planner.plan.nodes.physical.batch +import org.apache.flink.table.api.TableException import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonCorrelate import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode} @@ -64,10 +65,17 @@ class BatchPhysicalPythonCorrelate( } override def translateToExecNode(): ExecNode[_] = { + if (condition.orNull != null) { + if (joinType == JoinRelType.LEFT) { + throw new TableException("Currently Python correlate does not support conditions" + + " in left join.") + } + throw new TableException("The condition of BatchPhysicalPythonCorrelate should be null.") + } + new BatchExecPythonCorrelate( JoinTypeUtil.getFlinkJoinType(joinType), scan.getCall.asInstanceOf[RexCall], - condition.orNull, InputProperty.DEFAULT, FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala index c3acaf3572d4c..434f163a37b80 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala @@ -17,16 +17,18 @@ */ package org.apache.flink.table.planner.plan.nodes.physical.stream +import org.apache.flink.table.api.TableException import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode} import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan +import org.apache.flink.table.planner.plan.utils.JoinTypeUtil + import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.JoinRelType import org.apache.calcite.rex.{RexCall, RexNode} -import org.apache.flink.table.planner.plan.utils.JoinTypeUtil /** * Flink RelNode which matches along with join a python user defined table function. @@ -63,10 +65,17 @@ class StreamPhysicalPythonCorrelate( } override def translateToExecNode(): ExecNode[_] = { + if (condition.orNull != null) { + if (joinType == JoinRelType.LEFT) { + throw new TableException("Currently Python correlate does not support conditions" + + " in left join.") + } + throw new TableException("The condition of StreamPhysicalPythonCorrelate should be null.") + } + new StreamExecPythonCorrelate( JoinTypeUtil.getFlinkJoinType(joinType), scan.getCall.asInstanceOf[RexCall], - condition.orNull, InputProperty.DEFAULT, FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription From 74e822ff7dc2b31756dd1ead6f00b7a7c4cff76d Mon Sep 17 00:00:00 2001 From: huangxingbo Date: Mon, 17 May 2021 16:44:20 +0800 Subject: [PATCH 4/4] fix checkstyle --- .../plan/nodes/exec/stream/StreamExecPythonCorrelate.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java index 702ee8f89fc05..91585e9e6fea3 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java @@ -61,12 +61,6 @@ public StreamExecPythonCorrelate( @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { - super( - joinType, - (RexCall) invocation, - id, - inputProperties, - outputType, - description); + super(joinType, (RexCall) invocation, id, inputProperties, outputType, description); } }