Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public WindowAttachedWindowingStrategy(
@JsonProperty(FIELD_NAME_WINDOW_START) int windowStart,
@JsonProperty(FIELD_NAME_WINDOW_END) int windowEnd) {
super(window, timeAttributeType);
checkArgument(windowEnd >= 0 && windowStart >= 0);
checkArgument(windowEnd >= 0);
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
Expand Down Expand Up @@ -185,4 +186,36 @@ public void testProcTimeCumulateWindow() {
+ " INTERVAL '15' SECOND))\n"
+ "GROUP BY b, window_start, window_end");
}

@Test
public void testDistinctSplitEnabled() {
tEnv.getConfig()
.getConfiguration()
.setBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
String sinkTableDdl =
"CREATE TABLE MySink (\n"
+ " a bigint,\n"
+ " window_start timestamp(3),\n"
+ " window_end timestamp(3),\n"
+ " cnt_star bigint,\n"
+ " sum_b bigint,\n"
+ " cnt_distinct_c bigint\n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'sink-insert-only' = 'false',\n"
+ " 'table-sink-class' = 'DEFAULT')";
tEnv.executeSql(sinkTableDdl);

util.verifyJsonPlan(
"insert into MySink select a, "
+ " window_start, "
+ " window_end, "
+ " count(*), "
+ " sum(b), "
+ " count(distinct c) AS uv "
+ "FROM TABLE ("
+ " CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) "
+ "GROUP BY a, window_start, window_end");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class WindowAggregateJsonITCase extends JsonPlanTestBase {
public static Object[] parameters() {
return new Object[][] {
new Object[] {AggregatePhaseStrategy.ONE_PHASE},
new Object[] {AggregatePhaseStrategy.ONE_PHASE}
new Object[] {AggregatePhaseStrategy.TWO_PHASE}
};
}

Expand Down Expand Up @@ -180,4 +180,47 @@ public void testEventTimeCumulateWindow() throws Exception {
"+I[null, 1]"),
result);
}

@Test
public void testDistinctSplitEnabled() throws Exception {
tableEnv.getConfig()
.getConfiguration()
.setBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
createTestValuesSinkTable(
"MySink", "name STRING", "max_double DOUBLE", "cnt_distinct_int BIGINT");

String jsonPlan =
tableEnv.getJsonPlan(
"insert into MySink select name, "
+ " max(`double`),\n"
+ " count(distinct `int`) "
+ "FROM TABLE ("
+ " CUMULATE(\n"
+ " TABLE MyTable,\n"
+ " DESCRIPTOR(rowtime),\n"
+ " INTERVAL '5' SECOND,\n"
+ " INTERVAL '15' SECOND))"
+ "GROUP BY name, window_start, window_end");
tableEnv.executeJsonPlan(jsonPlan).await();

List<String> result = TestValuesTableFactory.getResults("MySink");
assertResult(
Arrays.asList(
"+I[a, 5.0, 3]",
"+I[a, 5.0, 4]",
"+I[a, 5.0, 4]",
"+I[b, 3.0, 1]",
"+I[b, 3.0, 1]",
"+I[b, 3.0, 1]",
"+I[b, 4.0, 1]",
"+I[b, 4.0, 1]",
"+I[b, 4.0, 1]",
"+I[b, 6.0, 2]",
"+I[b, 6.0, 2]",
"+I[null, 7.0, 1]",
"+I[null, 7.0, 1]",
"+I[null, 7.0, 1]"),
result);
}
}
Loading