Skip to content

Commit 0dea984

Browse files
committed
[FLINK-22649][python][table-planner-blink] Support StreamExecPythonCalc json serialization/deserialization
1 parent 62f91de commit 0dea984

9 files changed

Lines changed: 356 additions & 27 deletions

File tree

flink-python/pyflink/table/tests/test_udf.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,54 @@ def local_zoned_timestamp_func(local_zoned_timestamp_param):
748748
actual = source_sink_utils.results()
749749
self.assert_equals(actual, ["+I[1970-01-01T00:00:00.123Z]"])
750750

751+
def test_execute_from_json_plan(self):
752+
# create source file path
753+
tmp_dir = self.tempdir
754+
data = ['1,1', '3,3', '2,2']
755+
source_path = tmp_dir + '/test_execute_from_json_plan_input.csv'
756+
sink_path = tmp_dir + '/test_execute_from_json_plan_out'
757+
with open(source_path, 'w') as fd:
758+
for ele in data:
759+
fd.write(ele + '\n')
760+
761+
source_table = """
762+
CREATE TABLE source_table (
763+
a BIGINT,
764+
b BIGINT
765+
) WITH (
766+
'connector' = 'filesystem',
767+
'path' = '%s',
768+
'format' = 'csv'
769+
)
770+
""" % source_path
771+
self.t_env.execute_sql(source_table)
772+
773+
self.t_env.execute_sql("""
774+
CREATE TABLE sink_table (
775+
id BIGINT,
776+
data BIGINT
777+
) WITH (
778+
'connector' = 'filesystem',
779+
'path' = '%s',
780+
'format' = 'csv'
781+
)
782+
""" % sink_path)
783+
784+
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
785+
self.t_env.create_temporary_system_function("add_one", add_one)
786+
787+
json_plan = self.t_env._j_tenv.getJsonPlan("INSERT INTO sink_table SELECT "
788+
"a, "
789+
"add_one(b) "
790+
"FROM source_table")
791+
from py4j.java_gateway import get_method
792+
get_method(self.t_env._j_tenv.executeJsonPlan(json_plan), "await")()
793+
794+
import glob
795+
lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
796+
lines.sort()
797+
self.assertEqual(lines, ['1,2', '2,3', '3,4'])
798+
751799

752800
class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedFunctionTests,
753801
PyFlinkBlinkBatchTableTestCase):

flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCalc.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,39 @@
2424
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCalc;
2525
import org.apache.flink.table.types.logical.RowType;
2626

27-
import org.apache.calcite.rex.RexProgram;
27+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
28+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
29+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
30+
31+
import org.apache.calcite.rex.RexNode;
32+
33+
import java.util.Collections;
34+
import java.util.List;
2835

2936
/** Batch {@link ExecNode} for Python ScalarFunctions. */
37+
@JsonIgnoreProperties(ignoreUnknown = true)
3038
public class BatchExecPythonCalc extends CommonExecPythonCalc implements BatchExecNode<RowData> {
3139

3240
public BatchExecPythonCalc(
33-
RexProgram calcProgram,
41+
List<RexNode> projection,
3442
InputProperty inputProperty,
3543
RowType outputType,
3644
String description) {
37-
super(calcProgram, inputProperty, outputType, description);
45+
this(
46+
projection,
47+
getNewNodeId(),
48+
Collections.singletonList(inputProperty),
49+
outputType,
50+
description);
51+
}
52+
53+
@JsonCreator
54+
public BatchExecPythonCalc(
55+
@JsonProperty(FIELD_NAME_PROJECTION) List<RexNode> projection,
56+
@JsonProperty(FIELD_NAME_ID) int id,
57+
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
58+
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
59+
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
60+
super(projection, id, inputProperties, outputType, description);
3861
}
3962
}

flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,30 @@
4040
import org.apache.flink.table.types.logical.LogicalType;
4141
import org.apache.flink.table.types.logical.RowType;
4242

43+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
44+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
45+
4346
import org.apache.calcite.rex.RexCall;
4447
import org.apache.calcite.rex.RexFieldAccess;
4548
import org.apache.calcite.rex.RexInputRef;
4649
import org.apache.calcite.rex.RexNode;
47-
import org.apache.calcite.rex.RexProgram;
4850

4951
import java.lang.reflect.Constructor;
5052
import java.util.ArrayList;
51-
import java.util.Collections;
5253
import java.util.LinkedHashMap;
5354
import java.util.List;
5455
import java.util.stream.Collectors;
5556

57+
import static org.apache.flink.util.Preconditions.checkArgument;
58+
import static org.apache.flink.util.Preconditions.checkNotNull;
59+
5660
/** Base class for exec Python Calc. */
61+
@JsonIgnoreProperties(ignoreUnknown = true)
5762
public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
5863
implements SingleTransformationTranslator<RowData> {
5964

65+
public static final String FIELD_NAME_PROJECTION = "projection";
66+
6067
private static final String PYTHON_SCALAR_FUNCTION_OPERATOR_NAME =
6168
"org.apache.flink.table.runtime.operators.python.scalar."
6269
+ "RowDataPythonScalarFunctionOperator";
@@ -65,15 +72,18 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
6572
"org.apache.flink.table.runtime.operators.python.scalar.arrow."
6673
+ "RowDataArrowPythonScalarFunctionOperator";
6774

68-
private final RexProgram calcProgram;
75+
@JsonProperty(FIELD_NAME_PROJECTION)
76+
private final List<RexNode> projection;
6977

7078
public CommonExecPythonCalc(
71-
RexProgram calcProgram,
72-
InputProperty inputProperty,
79+
List<RexNode> projection,
80+
int id,
81+
List<InputProperty> inputProperties,
7382
RowType outputType,
7483
String description) {
75-
super(Collections.singletonList(inputProperty), outputType, description);
76-
this.calcProgram = calcProgram;
84+
super(id, inputProperties, outputType, description);
85+
checkArgument(inputProperties.size() == 1);
86+
this.projection = checkNotNull(projection);
7787
}
7888

7989
@SuppressWarnings("unchecked")
@@ -85,29 +95,23 @@ protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
8595
final Configuration config =
8696
CommonPythonUtil.getMergedConfig(planner.getExecEnv(), planner.getTableConfig());
8797
OneInputTransformation<RowData, RowData> ret =
88-
createPythonOneInputTransformation(
89-
inputTransform, calcProgram, getDescription(), config);
98+
createPythonOneInputTransformation(inputTransform, getDescription(), config);
9099
if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(config)) {
91100
ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
92101
}
93102
return ret;
94103
}
95104

96105
private OneInputTransformation<RowData, RowData> createPythonOneInputTransformation(
97-
Transformation<RowData> inputTransform,
98-
RexProgram calcProgram,
99-
String name,
100-
Configuration config) {
106+
Transformation<RowData> inputTransform, String name, Configuration config) {
101107
List<RexCall> pythonRexCalls =
102-
calcProgram.getProjectList().stream()
103-
.map(calcProgram::expandLocalRef)
108+
projection.stream()
104109
.filter(x -> x instanceof RexCall)
105110
.map(x -> (RexCall) x)
106111
.collect(Collectors.toList());
107112

108113
List<Integer> forwardedFields =
109-
calcProgram.getProjectList().stream()
110-
.map(calcProgram::expandLocalRef)
114+
projection.stream()
111115
.filter(x -> x instanceof RexInputRef)
112116
.map(x -> ((RexInputRef) x).getIndex())
113117
.collect(Collectors.toList());
@@ -142,7 +146,7 @@ private OneInputTransformation<RowData, RowData> createPythonOneInputTransformat
142146
pythonUdfInputOffsets,
143147
pythonFunctionInfos,
144148
forwardedFields.stream().mapToInt(x -> x).toArray(),
145-
calcProgram.getExprList().stream()
149+
pythonRexCalls.stream()
146150
.anyMatch(
147151
x ->
148152
PythonUtil.containsPythonCall(

flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,39 @@
2424
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCalc;
2525
import org.apache.flink.table.types.logical.RowType;
2626

27-
import org.apache.calcite.rex.RexProgram;
27+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
28+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
29+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
30+
31+
import org.apache.calcite.rex.RexNode;
32+
33+
import java.util.Collections;
34+
import java.util.List;
2835

2936
/** Stream {@link ExecNode} for Python ScalarFunctions. */
37+
@JsonIgnoreProperties(ignoreUnknown = true)
3038
public class StreamExecPythonCalc extends CommonExecPythonCalc implements StreamExecNode<RowData> {
3139

3240
public StreamExecPythonCalc(
33-
RexProgram calcProgram,
41+
List<RexNode> projection,
3442
InputProperty inputProperty,
3543
RowType outputType,
3644
String description) {
37-
super(calcProgram, inputProperty, outputType, description);
45+
this(
46+
projection,
47+
getNewNodeId(),
48+
Collections.singletonList(inputProperty),
49+
outputType,
50+
description);
51+
}
52+
53+
@JsonCreator
54+
public StreamExecPythonCalc(
55+
@JsonProperty(FIELD_NAME_PROJECTION) List<RexNode> projection,
56+
@JsonProperty(FIELD_NAME_ID) int id,
57+
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
58+
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
59+
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
60+
super(projection, id, inputProperties, outputType, description);
3861
}
3962
}

flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCalc.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import org.apache.calcite.rel.`type`.RelDataType
2828
import org.apache.calcite.rel.core.Calc
2929
import org.apache.calcite.rex.RexProgram
3030

31+
import scala.collection.JavaConversions._
32+
3133
/**
3234
* Batch physical RelNode for Python ScalarFunctions.
3335
*/
@@ -49,8 +51,10 @@ class BatchPhysicalPythonCalc(
4951
}
5052

5153
override def translateToExecNode(): ExecNode[_] = {
54+
val projection = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
55+
5256
new BatchExecPythonCalc(
53-
getProgram,
57+
projection,
5458
InputProperty.DEFAULT,
5559
FlinkTypeFactory.toLogicalRowType(getRowType),
5660
getRelDetailedDescription)

flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCalc.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import org.apache.calcite.rel.`type`.RelDataType
2828
import org.apache.calcite.rel.core.Calc
2929
import org.apache.calcite.rex.RexProgram
3030

31+
import scala.collection.JavaConversions._
32+
3133
/**
3234
* Stream physical RelNode for Python ScalarFunctions.
3335
*/
@@ -49,8 +51,10 @@ class StreamPhysicalPythonCalc(
4951
}
5052

5153
override def translateToExecNode(): ExecNode[_] = {
54+
val projection = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
55+
5256
new StreamExecPythonCalc(
53-
getProgram,
57+
projection,
5458
InputProperty.DEFAULT,
5559
FlinkTypeFactory.toLogicalRowType(getRowType),
5660
getRelDetailedDescription)

flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public class JsonSerdeCoverageTest {
4949
"StreamExecPythonGroupTableAggregate",
5050
"StreamExecPythonOverAggregate",
5151
"StreamExecPythonCorrelate",
52-
"StreamExecPythonCalc",
5352
"StreamExecSort",
5453
"StreamExecMultipleInput",
5554
"StreamExecValues");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.nodes.exec.stream;
20+
21+
import org.apache.flink.table.api.TableConfig;
22+
import org.apache.flink.table.api.TableEnvironment;
23+
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction;
24+
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
25+
import org.apache.flink.table.planner.utils.TableTestBase;
26+
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
30+
/** Test json serialization/deserialization for calc. */
31+
public class PythonCalcJsonPlanTest extends TableTestBase {
32+
33+
private StreamTableTestUtil util;
34+
private TableEnvironment tEnv;
35+
36+
@Before
37+
public void setup() {
38+
util = streamTestUtil(TableConfig.getDefault());
39+
tEnv = util.getTableEnv();
40+
41+
String srcTableDdl =
42+
"CREATE TABLE MyTable (\n"
43+
+ " a bigint,\n"
44+
+ " b int not null,\n"
45+
+ " c varchar,\n"
46+
+ " d timestamp(3)\n"
47+
+ ") with (\n"
48+
+ " 'connector' = 'values',\n"
49+
+ " 'bounded' = 'false')";
50+
tEnv.executeSql(srcTableDdl);
51+
}
52+
53+
@Test
54+
public void testPythonCalc() {
55+
tEnv.createTemporaryFunction("pyFunc", new PythonScalarFunction("pyFunc"));
56+
String sinkTableDdl =
57+
"CREATE TABLE MySink (\n"
58+
+ " a bigint,\n"
59+
+ " b int\n"
60+
+ ") with (\n"
61+
+ " 'connector' = 'values',\n"
62+
+ " 'table-sink-class' = 'DEFAULT')";
63+
tEnv.executeSql(sinkTableDdl);
64+
util.verifyJsonPlan("insert into MySink select a, pyFunc(b, b) from MyTable");
65+
}
66+
}

0 commit comments

Comments
 (0)