Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,11 @@ private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sin
TableResult tableResult = tEnv.executeSql("insert into sink " + select);
// wait for the timeout then cancels the job
TimeUnit.SECONDS.sleep(timeout);
tableResult.getJobClient().ifPresent(JobClient::cancel);
try {
tableResult.getJobClient().ifPresent(JobClient::cancel);
} catch (IllegalStateException e) {
log.info("MiniCluster has already been shut down, do nothing.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @xicm ! I have applied your patch but it does not solve the problem.

Actually the test is flaky because in recent change:

We have tweaked the compaction scheduling strategy: when there is a pending compaction, we can not schedule another one, the test is flaky based on whether there is pending compaction when scheduling.

My initial idea is we change the test itself:
4053.patch.zip
Hope it helps, or maybe you have better idea :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't go into this test. my changes is not the root cause, let me close this pr and you push your patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the testWriteAndReadDebeziumJson is flaky a few days ago,it resolved with my changes. But I haven't reproduced it today. It seems I haven't found the root cause.

tEnv.executeSql("DROP TABLE IF EXISTS sink");
return CollectSinkTableFactory.RESULT.values().stream()
.flatMap(Collection::stream)
Expand Down