Skip to content

Commit 1088b87

Browse files
committed
[FLINK-22523][table-planner-blink] Window TVF should throw helpful exception when specifying offset parameter (#15803)
1 parent ce7c78c commit 1088b87

4 files changed

Lines changed: 67 additions & 5 deletions

File tree

flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail
5757
if (!checkIntervalOperands(callBinding, 2)) {
5858
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
5959
}
60+
if (callBinding.getOperandCount() == 5) {
61+
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
62+
}
6063
// check time attribute
6164
return throwExceptionOrReturnFalse(
6265
checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure);
@@ -66,7 +69,7 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail
6669
public String getAllowedSignatures(SqlOperator op, String opName) {
6770
return opName
6871
+ "(TABLE table_name, DESCRIPTOR(timecol), "
69-
+ "datetime interval, datetime interval[, datetime interval])";
72+
+ "datetime interval, datetime interval)";
7073
}
7174
}
7275
}

flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail
5757
if (!checkIntervalOperands(callBinding, 2)) {
5858
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
5959
}
60+
if (callBinding.getOperandCount() == 5) {
61+
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
62+
}
6063
// check time attribute
6164
return throwExceptionOrReturnFalse(
6265
checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure);
@@ -66,7 +69,7 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail
6669
public String getAllowedSignatures(SqlOperator op, String opName) {
6770
return opName
6871
+ "(TABLE table_name, DESCRIPTOR(timecol), "
69-
+ "datetime interval, datetime interval[, datetime interval])";
72+
+ "datetime interval, datetime interval)";
7073
}
7174
}
7275
}

flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,17 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail
5555
if (!checkIntervalOperands(callBinding, 2)) {
5656
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
5757
}
58+
if (callBinding.getOperandCount() == 4) {
59+
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
60+
}
5861
// check time attribute
5962
return throwExceptionOrReturnFalse(
6063
checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure);
6164
}
6265

6366
@Override
6467
public String getAllowedSignatures(SqlOperator op, String opName) {
65-
return opName
66-
+ "(TABLE table_name, DESCRIPTOR(timecol), datetime interval"
67-
+ "[, datetime interval])";
68+
return opName + "(TABLE table_name, DESCRIPTOR(timecol), datetime interval)";
6869
}
6970
}
7071
}

flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,4 +140,59 @@ class WindowTableFunctionTest extends TableTestBase {
140140
util.verifyExplain(sql)
141141
}
142142

143+
@Test
144+
def testInvalidTumbleParameters(): Unit = {
145+
val sql =
146+
"""
147+
|SELECT *
148+
|FROM TABLE(TUMBLE(
149+
| TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE, INTERVAL '5' MINUTE))
150+
|""".stripMargin
151+
152+
thrown.expectMessage("Supported form(s): " +
153+
"TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval)")
154+
thrown.expect(classOf[ValidationException])
155+
util.verifyExplain(sql)
156+
}
157+
158+
@Test
159+
def testInvalidHopParameters(): Unit = {
160+
val sql =
161+
"""
162+
|SELECT *
163+
|FROM TABLE(
164+
| HOP(
165+
| TABLE MyTable,
166+
| DESCRIPTOR(rowtime),
167+
| INTERVAL '1' MINUTE,
168+
| INTERVAL '15' MINUTE,
169+
| INTERVAL '5' MINUTE))
170+
|""".stripMargin
171+
172+
thrown.expectMessage("Supported form(s): " +
173+
"HOP(TABLE table_name, DESCRIPTOR(timecol), datetime interval, datetime interval)")
174+
thrown.expect(classOf[ValidationException])
175+
util.verifyExplain(sql)
176+
}
177+
178+
@Test
179+
def testInvalidCumulateParameters(): Unit = {
180+
val sql =
181+
"""
182+
|SELECT *
183+
|FROM TABLE(
184+
| CUMULATE(
185+
| TABLE MyTable,
186+
| DESCRIPTOR(rowtime),
187+
| INTERVAL '1' MINUTE,
188+
| INTERVAL '15' MINUTE,
189+
| INTERVAL '5' MINUTE))
190+
|""".stripMargin
191+
192+
thrown.expectMessage("Supported form(s): " +
193+
"CUMULATE(TABLE table_name, DESCRIPTOR(timecol), datetime interval, datetime interval)")
194+
thrown.expect(classOf[ValidationException])
195+
util.verifyExplain(sql)
196+
}
197+
143198
}

0 commit comments

Comments
 (0)