From f8377d75af31e0e419d6019f83a453b2bea2ddbb Mon Sep 17 00:00:00 2001 From: "shuo.cs" Date: Wed, 12 May 2021 21:04:38 +0800 Subject: [PATCH 1/5] [FLINK-19796][table] Explicit casting shoule be made if the type of an element in `ARRAY` not equals with the derived component type --- .../functions/sql/FlinkSqlOperatorTable.java | 3 +- .../functions/sql/SqlArrayFunction.java | 71 +++++++++++++++++++ .../runtime/stream/sql/CalcITCase.scala | 13 ++++ 3 files changed, 85 insertions(+), 2 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayFunction.java diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 5cb3e8746a19d..8a43da847b9ad 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -1054,8 +1054,7 @@ public List getAuxiliaryFunctions() { public static final SqlAggFunction SINGLE_VALUE = SqlStdOperatorTable.SINGLE_VALUE; // ARRAY OPERATORS - public static final SqlOperator ARRAY_VALUE_CONSTRUCTOR = - SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR; + public static final SqlOperator ARRAY_VALUE_CONSTRUCTOR = new SqlArrayFunction(); public static final SqlOperator ELEMENT = SqlStdOperatorTable.ELEMENT; // MAP OPERATORS diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayFunction.java new file mode 100644 index 0000000000000..cdd977e8c8a38 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayFunction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.fun.SqlArrayValueConstructor; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeUtil; + +import java.util.List; + +/** + * {@link SqlFunction} for ARRAY, which makes explicit casting if the element type not + * equals the derived component type. + */ +public class SqlArrayFunction extends SqlArrayValueConstructor { + @Override + public RelDataType inferReturnType(SqlOperatorBinding opBinding) { + RelDataType type = + getComponentType(opBinding.getTypeFactory(), opBinding.collectOperandTypes()); + if (null == type) { + return null; + } + + // explicit cast elements to component type if they are not same + if (opBinding instanceof SqlCallBinding) { + SqlCall call = ((SqlCallBinding) opBinding).getCall(); + List operandTypes = opBinding.collectOperandTypes(); + List operands = call.getOperandList(); + int idx = -1; + for (RelDataType opType : operandTypes) { + idx += 1; + if (opType.equalsSansFieldNames(type)) { + continue; + } + call.setOperand(idx, castTo(operands.get(idx), type)); + } + } + + return SqlTypeUtil.createArrayType(opBinding.getTypeFactory(), type, false); + } + + private SqlNode castTo(SqlNode node, RelDataType type) { + return SqlStdOperatorTable.CAST.createCall( + SqlParserPos.ZERO, + node, + SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable())); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala index 723800f5d94fb..36e28d524b66f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala @@ -407,4 +407,17 @@ class CalcITCase extends StreamingTestBase { List("1,HI,1111,true,111","2,HELLO,2222,false,222", "3,HELLO WORLD,3333,true,333") assertEquals(expected.sorted, sink.getAppendResults.sorted) } + + @Test + def testDecimalArrayWithDifferentPrecision(): Unit = { + val sqlQuery = "SELECT ARRAY[0.12, 0.5, 0.99]" + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("[0.12, 0.50, 0.99]") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } } From 6b64bb74108c81f4cb74b74e6d377832844aba82 Mon Sep 17 00:00:00 2001 From: "shuo.cs" Date: Wed, 12 May 2021 21:30:21 +0800 Subject: [PATCH 2/5] add plan test --- .../planner/plan/stream/sql/CalcTest.xml | 21 +++++++++++++++++-- .../planner/plan/stream/sql/CalcTest.scala | 5 +++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml index fac487597703c..e9247ea641e17 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml @@ -22,13 +22,13 @@ limitations under the License. @@ -410,4 +410,21 @@ Calc(select=[ROW(1, _UTF-16LE'Hi', a) AS EXPR$0]) ]]> + + + + + + + + + + + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala index 7b1453662c341..68dde468cbf0c 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala @@ -170,4 +170,9 @@ class CalcTest extends TableTestBase { def testOrWithIsNullInIf(): Unit = { util.verifyExecPlan("SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable") } + + @Test + def testDecimalArrayWithDifferentPrecision(): Unit = { + util.verifyExecPlan("SELECT ARRAY[0.12, 0.5, 0.99]") + } } From 26e541325a507d36910dbd9a7c56c990587ae656 Mon Sep 17 00:00:00 2001 From: "shuo.cs" Date: Thu, 13 May 2021 09:49:20 +0800 Subject: [PATCH 3/5] fix tests --- .../table/planner/plan/batch/sql/CalcTest.xml | 21 ++++++++++++++++-- .../planner/plan/stream/sql/CalcTest.xml | 22 +++++++++---------- .../planner/plan/batch/sql/CalcTest.scala | 5 +++++ 3 files changed, 35 insertions(+), 13 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml index ccce38d4d5615..7b87a586c6887 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml @@ -22,13 +22,13 @@ limitations under the License. @@ -409,4 +409,21 @@ Calc(select=[ROW(1, _UTF-16LE'Hi', a) AS EXPR$0]) ]]> + + + + + + + + + + + diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml index e9247ea641e17..0c696f3eae522 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml @@ -410,21 +410,21 @@ Calc(select=[ROW(1, _UTF-16LE'Hi', a) AS EXPR$0]) ]]> - - - - - - + + + + + - - - + + - - + + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala index d6fe1890dd96c..b0241b30ce47b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala @@ -171,4 +171,9 @@ class CalcTest extends TableTestBase { def testOrWithIsNullInIf(): Unit = { util.verifyExecPlan("SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable") } + + @Test + def testDecimalArrayWithDifferentPrecision(): Unit = { + util.verifyExecPlan("SELECT ARRAY[0.12, 0.5, 0.99]") + } } From d1baacbc486e54423a659cbab87ed5c45f159edf Mon Sep 17 00:00:00 2001 From: "shuo.cs" Date: Thu, 13 May 2021 20:11:45 +0800 Subject: [PATCH 4/5] Fix `MAP` too --- .../functions/sql/FlinkSqlOperatorTable.java | 5 +- ...Function.java => SqlArrayConstructor.java} | 37 ++------ .../functions/sql/SqlMapConstructor.java | 42 ++++++++++ .../functions/utils/SqlValidatorUtils.java | 84 +++++++++++++++++++ .../table/planner/plan/batch/sql/CalcTest.xml | 55 +++++++----- .../planner/plan/stream/sql/CalcTest.xml | 55 +++++++----- .../planner/plan/batch/sql/CalcTest.scala | 5 ++ .../planner/plan/stream/sql/CalcTest.scala | 5 ++ .../runtime/stream/sql/CalcITCase.scala | 13 +++ 9 files changed, 230 insertions(+), 71 deletions(-) rename flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/{SqlArrayFunction.java => SqlArrayConstructor.java} (56%) create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlMapConstructor.java create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 8a43da847b9ad..07e0772319746 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -1054,12 +1054,11 @@ public List getAuxiliaryFunctions() { public static final SqlAggFunction SINGLE_VALUE = SqlStdOperatorTable.SINGLE_VALUE; // ARRAY OPERATORS - public static final SqlOperator ARRAY_VALUE_CONSTRUCTOR = new SqlArrayFunction(); + public static final SqlOperator ARRAY_VALUE_CONSTRUCTOR = new SqlArrayConstructor(); public static final SqlOperator ELEMENT = SqlStdOperatorTable.ELEMENT; // MAP OPERATORS - public static final SqlOperator MAP_VALUE_CONSTRUCTOR = - SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR; + public static final SqlOperator MAP_VALUE_CONSTRUCTOR = new SqlMapConstructor(); // ARRAY MAP SHARED OPERATORS public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM; diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayConstructor.java similarity index 56% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayFunction.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayConstructor.java index cdd977e8c8a38..e7bbd382bf1b6 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayConstructor.java @@ -18,24 +18,20 @@ package org.apache.flink.table.planner.functions.sql; +import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils; + import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.sql.SqlCall; -import org.apache.calcite.sql.SqlCallBinding; -import org.apache.calcite.sql.SqlFunction; -import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.fun.SqlArrayValueConstructor; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.SqlTypeUtil; -import java.util.List; - /** - * {@link SqlFunction} for ARRAY, which makes explicit casting if the element type not + * {@link SqlOperator} for ARRAY, which makes explicit casting if the element type not * equals the derived component type. */ -public class SqlArrayFunction extends SqlArrayValueConstructor { +public class SqlArrayConstructor extends SqlArrayValueConstructor { + @Override public RelDataType inferReturnType(SqlOperatorBinding opBinding) { RelDataType type = @@ -45,27 +41,8 @@ public RelDataType inferReturnType(SqlOperatorBinding opBinding) { } // explicit cast elements to component type if they are not same - if (opBinding instanceof SqlCallBinding) { - SqlCall call = ((SqlCallBinding) opBinding).getCall(); - List operandTypes = opBinding.collectOperandTypes(); - List operands = call.getOperandList(); - int idx = -1; - for (RelDataType opType : operandTypes) { - idx += 1; - if (opType.equalsSansFieldNames(type)) { - continue; - } - call.setOperand(idx, castTo(operands.get(idx), type)); - } - } + SqlValidatorUtils.adjustTypeForArrayConstructor(type, opBinding); return SqlTypeUtil.createArrayType(opBinding.getTypeFactory(), type, false); } - - private SqlNode castTo(SqlNode node, RelDataType type) { - return SqlStdOperatorTable.CAST.createCall( - SqlParserPos.ZERO, - node, - SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable())); - } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlMapConstructor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlMapConstructor.java new file mode 100644 index 0000000000000..d47e9ff3f8275 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlMapConstructor.java @@ -0,0 +1,42 @@ +package org.apache.flink.table.planner.functions.sql; + +import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.fun.SqlMapValueConstructor; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import java.util.List; + +/** + * {@link SqlOperator} for MAP, which makes explicit casting if the element type not + * equals the derived component type. + */ +public class SqlMapConstructor extends SqlMapValueConstructor { + + @Override + public RelDataType inferReturnType(SqlOperatorBinding opBinding) { + Pair type = + getComponentTypes(opBinding.getTypeFactory(), opBinding.collectOperandTypes()); + if (null == type) { + return null; + } + + // explicit cast elements to component type if they are not same + SqlValidatorUtils.adjustTypeForMapConstructor(type, opBinding); + + return SqlTypeUtil.createMapType(opBinding.getTypeFactory(), type.left, type.right, false); + } + + private Pair getComponentTypes( + RelDataTypeFactory typeFactory, List argTypes) { + return Pair.of( + typeFactory.leastRestrictive(Util.quotientList(argTypes, 2, 0)), + typeFactory.leastRestrictive(Util.quotientList(argTypes, 2, 1))); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java new file mode 100644 index 0000000000000..279541eec739f --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.utils; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.util.Pair; + +import java.util.List; + +/** Utility methods related to SQL validation. */ +public class SqlValidatorUtils { + + public static void adjustTypeForArrayConstructor( + RelDataType componentType, SqlOperatorBinding opBinding) { + if (opBinding instanceof SqlCallBinding) { + adjustTypeForMultisetConstructor( + componentType, componentType, (SqlCallBinding) opBinding); + } + } + + public static void adjustTypeForMapConstructor( + Pair componentType, SqlOperatorBinding opBinding) { + if (opBinding instanceof SqlCallBinding) { + adjustTypeForMultisetConstructor( + componentType.getKey(), componentType.getValue(), (SqlCallBinding) opBinding); + } + } + + /** + * When the element element does not equal with the component type, making explicit casting. + * + * @param evenType derived type for element with even index + * @param oddType derived type for element with odd index + * @param sqlCallBinding description of call + */ + private static void adjustTypeForMultisetConstructor( + RelDataType evenType, RelDataType oddType, SqlCallBinding sqlCallBinding) { + SqlCall call = sqlCallBinding.getCall(); + List operandTypes = sqlCallBinding.collectOperandTypes(); + List operands = call.getOperandList(); + RelDataType elementType; + for (int i = 0; i < operands.size(); i++) { + if (i % 2 == 0) { + elementType = evenType; + } else { + elementType = oddType; + } + if (operandTypes.get(i).equalsSansFieldNames(elementType)) { + continue; + } + call.setOperand(i, castTo(operands.get(i), elementType)); + } + } + + private static SqlNode castTo(SqlNode node, RelDataType type) { + return SqlStdOperatorTable.CAST.createCall( + SqlParserPos.ZERO, + node, + SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable())); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml index 7b87a586c6887..fdd65b4b34b2a 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml @@ -65,6 +65,40 @@ LogicalProject(a=[$0], b=[$1], c=[$2]) 20))]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + + + + + + + + + + + + + + + + + + + + + + @@ -342,13 +376,13 @@ LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable4, sourc @@ -409,21 +443,4 @@ Calc(select=[ROW(1, _UTF-16LE'Hi', a) AS EXPR$0]) ]]> - - - - - - - - - - - diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml index 0c696f3eae522..eee8884b7cd76 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml @@ -48,6 +48,40 @@ LogicalProject(a=[$0], b=[$1], c=[$2]) 20))]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + + + + + + + + + + + + + + + + + + + + + + @@ -343,13 +377,13 @@ LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable4, sourc @@ -410,21 +444,4 @@ Calc(select=[ROW(1, _UTF-16LE'Hi', a) AS EXPR$0]) ]]> - - - - - - - - - - - diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala index b0241b30ce47b..cdd34e22df15b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala @@ -176,4 +176,9 @@ class CalcTest extends TableTestBase { def testDecimalArrayWithDifferentPrecision(): Unit = { util.verifyExecPlan("SELECT ARRAY[0.12, 0.5, 0.99]") } + + @Test + def testDecimalMapWithDifferentPrecision(): Unit = { + util.verifyExecPlan("SELECT MAP['a', 0.12, 'b', 0.5]") + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala index 68dde468cbf0c..659c345649ce0 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala @@ -175,4 +175,9 @@ class CalcTest extends TableTestBase { def testDecimalArrayWithDifferentPrecision(): Unit = { util.verifyExecPlan("SELECT ARRAY[0.12, 0.5, 0.99]") } + + @Test + def testDecimalMapWithDifferentPrecision(): Unit = { + util.verifyExecPlan("SELECT MAP['a', 0.12, 'b', 0.5]") + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala index 36e28d524b66f..1bc6141cb7171 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala @@ -420,4 +420,17 @@ class CalcITCase extends StreamingTestBase { val expected = List("[0.12, 0.50, 0.99]") assertEquals(expected.sorted, sink.getAppendResults.sorted) } + + @Test + def testDecimalMapWithDifferentPrecision(): Unit = { + val sqlQuery = "SELECT Map['a', 0.12, 'b', 0.5]" + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("{a=0.12, b=0.50}") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } } From e525c3ddf9151a8d8594c0320b31bb4ebdd1b4c4 Mon Sep 17 00:00:00 2001 From: "shuo.cs" Date: Fri, 14 May 2021 09:38:57 +0800 Subject: [PATCH 5/5] fix style --- .../functions/sql/SqlMapConstructor.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlMapConstructor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlMapConstructor.java index d47e9ff3f8275..a54cac5664102 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlMapConstructor.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlMapConstructor.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.table.planner.functions.sql; import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils;