Skip to content

Conversation

@yuxiqian
Copy link
Member

@yuxiqian yuxiqian commented Feb 8, 2025

This closes FLINK-37278.

Currently, regular SE topology uses the following process to drain existing DataChangeEvents in the pipeline:

  1. SchemaOperator ("client") emits FlushEvent to downstream.
  2. The "client" keeps polling the SchemaCoordinator ("server") with 1-second interval.
  3. The "server" rejects all requests from clients until it has collected enough FlushSuccessEvent notifications from Sink.

As a result, all schema change requests will took at least 1 second to finish, after at least one polling interval.


This PR replaces the polling code with maintaining a pending schema change request queue, where SchemaCoordinator could manage all pending clients and effectively blocking them from handling upstream events. Schema evolution process could start immediately after FlushSuccessEvent got reported, needless to wait for polling requests from clients.


With this change, time consumption of testRegularTablesSourceInMultipleParallelism test case has been reduced from ~6 minutes to ~50 seconds.

@yuxiqian
Copy link
Member Author

yuxiqian commented Feb 8, 2025

Would @hiliuxg like to take a look?

@yuxiqian yuxiqian marked this pull request as ready for review February 8, 2025 08:58
Copy link
Contributor

@Shawn-Hx Shawn-Hx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that SchemaChangeResponse#ResponseCode can only be SUCCESS now. Can we remove SchemaChangeResponse#ResponseCode and simplify the logic in SchemaOperator#handleSchemaChangeEvent ?

@yuxiqian
Copy link
Member Author

Thanks for Shawn's kindly review, comments addressed.

Copy link
Contributor

@Shawn-Hx Shawn-Hx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@gongzexin
Copy link

It seems that SchemaChangeResponse#ResponseCode can only be SUCCESS now. Can we remove SchemaChangeResponse#ResponseCode and simplify the logic in SchemaOperator#handleSchemaChangeEvent ?

@yuxiqian @Shawn-Hx
Hi, Have you noticed that during fault tolerance, the same table will be flushed multiple times (related to the task parallelism).So I think SchemaChangeResponse#ResponseCode#DUPLICATE should not be deleted, but it should be strengthened.

@yuxiqian
Copy link
Member Author

It seems that SchemaChangeResponse#ResponseCode can only be SUCCESS now. Can we remove SchemaChangeResponse#ResponseCode and simplify the logic in SchemaOperator#handleSchemaChangeEvent ?

@yuxiqian @Shawn-Hx Hi, Have you noticed that during fault tolerance, the same table will be flushed multiple times (related to the task parallelism).So I think SchemaChangeResponse#ResponseCode#DUPLICATE should not be deleted, but it should be strengthened.

Thanks for @gongzexin's report. IIUC, the root cause of this problem is PreTransformOperator invokes getUnionListState to store persistent schemas, all subTasks of SchemaOperators will obtain the same set of table schemas when restoring from state, and SchemaCoordinator is expected to receive $N$ duplicate requests ($N$ = parallelism). Worse still, UnionListState will block the checkpointing process when some subTasks have entered the FINISHED state (FLINK-37368).

I wonder if we can handle it in another PR, and focus on modifying the schema evolution request queueing logic here?

Copy link
Contributor

@lvyanquan lvyanquan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@lvyanquan
Copy link
Contributor

Hi, @leonardBang @ruanhang1993, could you take a look at this?

@leonardBang leonardBang merged commit 602abde into apache:master Mar 5, 2025
25 of 26 checks passed
SML0127 pushed a commit to SML0127/flink-cdc-connectors that referenced this pull request Mar 12, 2025
@nihaoya2025
Copy link

@yuxiqian I have a table that I want to delete and resynchronize with both full and incremental data. The schema recorded by the Schema Manager will not be automatically deleted when the table is deleted, skipping the new table creation statement. What should I do

 // For redundant schema change events (possibly coming from duplicate emitted
        // CreateTableEvents in snapshot stage), we just skip them.
        if (!SchemaUtils.isSchemaChangeEventRedundant(currentUpstreamSchema, originalEvent)) {
            schemaManager.applyOriginalSchemaChange(originalEvent);
            deducedSchemaChangeEvents.addAll(deduceEvolvedSchemaChanges(originalEvent));
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants