1717
1818package org .apache .flink .cdc .runtime .operators .schema .regular ;
1919
20+ import org .apache .flink .api .java .tuple .Tuple2 ;
2021import org .apache .flink .cdc .common .event .CreateTableEvent ;
2122import org .apache .flink .cdc .common .event .SchemaChangeEvent ;
2223import org .apache .flink .cdc .common .event .TableId ;
6263import java .util .concurrent .ConcurrentHashMap ;
6364import java .util .concurrent .ExecutorService ;
6465import java .util .concurrent .Executors ;
66+ import java .util .concurrent .TimeoutException ;
6567import java .util .stream .Collectors ;
6668
6769import static org .apache .flink .cdc .runtime .operators .schema .common .CoordinationResponseUtils .wrap ;
@@ -74,15 +76,22 @@ public class SchemaCoordinator extends SchemaRegistry {
7476 private final ExecutorService schemaChangeThreadPool ;
7577
7678 /**
77- * Atomic flag indicating if current RequestHandler could accept more schema changes for now.
79+ * Sink writers which have sent flush success events for the request.<br>
80+ * {@code MapEntry.Key} is an {@code Integer}, indicating the upstream subTaskId that initiates
81+ * this schema change request.<br>
82+ * {@code MapEntry.Value} is a {@code Set<Integer>}, containing downstream subTasks' ID that
83+ * have acknowledged and successfully flushed pending events for this schema change event.
7884 */
79- private transient RequestStatus schemaChangeStatus ;
80-
81- /** Sink writers which have sent flush success events for the request. */
8285 private transient ConcurrentHashMap <Integer , Set <Integer >> flushedSinkWriters ;
8386
84- /** Currently handling request's completable future. */
85- private transient CompletableFuture <CoordinationResponse > pendingResponseFuture ;
87+ /**
88+ * Schema evolution requests that we're currently handling.<br>
89+ * For each subTask executing in parallelism, they may initiate requests simultaneously, so we
90+ * use each task's unique subTaskId as Map key to distinguish each of them.
91+ */
92+ private transient Map <
93+ Integer , Tuple2 <SchemaChangeRequest , CompletableFuture <CoordinationResponse >>>
94+ pendingRequests ;
8695
8796 // Static fields
8897 public SchemaCoordinator (
@@ -108,7 +117,7 @@ public SchemaCoordinator(
108117 public void start () throws Exception {
109118 super .start ();
110119 this .flushedSinkWriters = new ConcurrentHashMap <>();
111- this .schemaChangeStatus = RequestStatus . IDLE ;
120+ this .pendingRequests = new ConcurrentHashMap <>() ;
112121 }
113122
114123 @ Override
@@ -185,7 +194,7 @@ protected void handleCustomCoordinationRequest(
185194 }
186195
187196 @ Override
188- protected void handleFlushSuccessEvent (FlushSuccessEvent event ) {
197+ protected void handleFlushSuccessEvent (FlushSuccessEvent event ) throws TimeoutException {
189198 int sinkSubtask = event .getSinkSubTaskId ();
190199 int sourceSubtask = event .getSourceSubTaskId ();
191200 LOG .info (
@@ -200,16 +209,25 @@ protected void handleFlushSuccessEvent(FlushSuccessEvent event) {
200209 "Currently flushed sink writers for source task {} are: {}" ,
201210 sourceSubtask ,
202211 flushedSinkWriters .get (sourceSubtask ));
212+
213+ if (flushedSinkWriters .get (sourceSubtask ).size () >= currentParallelism ) {
214+ LOG .info (
215+ "Source SubTask {} have collected enough flush success event. Will start evolving schema changes..." ,
216+ sourceSubtask );
217+ flushedSinkWriters .remove (sourceSubtask );
218+ startSchemaChangesEvolve (sourceSubtask );
219+ }
203220 }
204221
205222 @ Override
206223 protected void handleUnrecoverableError (String taskDescription , Throwable t ) {
207224 super .handleUnrecoverableError (taskDescription , t );
208225
209- // There's a pending future, release it exceptionally before quitting
210- if (pendingResponseFuture != null ) {
211- pendingResponseFuture .completeExceptionally (t );
212- }
226+ // For each pending future, release it exceptionally before quitting
227+ pendingRequests .forEach (
228+ (index , tuple ) -> {
229+ tuple .f1 .completeExceptionally (t );
230+ });
213231 }
214232
215233 /**
@@ -219,73 +237,14 @@ protected void handleUnrecoverableError(String taskDescription, Throwable t) {
219237 */
220238 public void handleSchemaChangeRequest (
221239 SchemaChangeRequest request , CompletableFuture <CoordinationResponse > responseFuture ) {
222-
223- // We use subTaskId to identify each schema change request
224- int subTaskId = request .getSubTaskId ();
225-
226- if (schemaChangeStatus == RequestStatus .IDLE ) {
227- if (activeSinkWriters .size () < currentParallelism ) {
228- LOG .info (
229- "Not all active sink writers have been registered. Current {}, expected {}." ,
230- activeSinkWriters .size (),
231- currentParallelism );
232- responseFuture .complete (wrap (SchemaChangeResponse .waitingForFlush ()));
233- return ;
234- }
235-
236- if (!activeSinkWriters .equals (flushedSinkWriters .get (subTaskId ))) {
237- LOG .info (
238- "Not all active sink writers have completed flush. Flushed writers: {}, expected: {}." ,
239- flushedSinkWriters .get (subTaskId ),
240- activeSinkWriters );
241- responseFuture .complete (wrap (SchemaChangeResponse .waitingForFlush ()));
242- return ;
243- }
244-
245- LOG .info (
246- "All sink writers have flushed for subTaskId {}. Switching to APPLYING state and starting schema evolution..." ,
247- subTaskId );
248- flushedSinkWriters .remove (subTaskId );
249- schemaChangeStatus = RequestStatus .APPLYING ;
250- pendingResponseFuture = responseFuture ;
251- startSchemaChangesEvolve (request , responseFuture );
252- } else {
253- responseFuture .complete (wrap (SchemaChangeResponse .busy ()));
254- }
240+ pendingRequests .put (request .getSubTaskId (), Tuple2 .of (request , responseFuture ));
255241 }
256242
257- private void startSchemaChangesEvolve (
258- SchemaChangeRequest request , CompletableFuture <CoordinationResponse > responseFuture ) {
259- SchemaChangeEvent originalEvent = request .getSchemaChangeEvent ();
260- TableId originalTableId = originalEvent .tableId ();
261- Schema currentUpstreamSchema =
262- schemaManager .getLatestOriginalSchema (originalTableId ).orElse (null );
263-
264- List <SchemaChangeEvent > deducedSchemaChangeEvents = new ArrayList <>();
265-
266- // For redundant schema change events (possibly coming from duplicate emitted
267- // CreateTableEvents in snapshot stage), we just skip them.
268- if (!SchemaUtils .isSchemaChangeEventRedundant (currentUpstreamSchema , originalEvent )) {
269- schemaManager .applyOriginalSchemaChange (originalEvent );
270- deducedSchemaChangeEvents .addAll (deduceEvolvedSchemaChanges (originalEvent ));
271- } else {
272- LOG .info (
273- "Schema change event {} is redundant for current schema {}, just skip it." ,
274- originalEvent ,
275- currentUpstreamSchema );
276- }
277-
278- LOG .info (
279- "All sink subtask have flushed for table {}. Start to apply schema change request: \n \t {}\n that extracts to:\n \t {}" ,
280- request .getTableId ().toString (),
281- request ,
282- deducedSchemaChangeEvents .stream ()
283- .map (SchemaChangeEvent ::toString )
284- .collect (Collectors .joining ("\n \t " )));
243+ private void startSchemaChangesEvolve (int sourceSubTaskId ) {
285244 schemaChangeThreadPool .submit (
286245 () -> {
287246 try {
288- applySchemaChange (originalEvent , deducedSchemaChangeEvents );
247+ applySchemaChange (sourceSubTaskId );
289248 } catch (Throwable t ) {
290249 failJob (
291250 "Schema change applying task" ,
@@ -379,8 +338,54 @@ private List<SchemaChangeEvent> deduceEvolvedSchemaChanges(SchemaChangeEvent eve
379338 }
380339
381340 /** Applies the schema change to the external system. */
382- private void applySchemaChange (
383- SchemaChangeEvent originalEvent , List <SchemaChangeEvent > deducedSchemaChangeEvents ) {
341+ private void applySchemaChange (int sourceSubTaskId ) {
342+ try {
343+ loopUntil (
344+ () -> pendingRequests .containsKey (sourceSubTaskId ),
345+ () ->
346+ LOG .info (
347+ "SchemaOperator {} has not submitted schema change request yet. Waiting..." ,
348+ sourceSubTaskId ),
349+ rpcTimeout ,
350+ Duration .ofMillis (100 ));
351+ } catch (TimeoutException e ) {
352+ throw new RuntimeException (
353+ "Timeout waiting for schema change request from SchemaOperator." , e );
354+ }
355+
356+ Tuple2 <SchemaChangeRequest , CompletableFuture <CoordinationResponse >> requestBody =
357+ pendingRequests .get (sourceSubTaskId );
358+ SchemaChangeRequest request = requestBody .f0 ;
359+ CompletableFuture <CoordinationResponse > responseFuture = requestBody .f1 ;
360+
361+ SchemaChangeEvent originalEvent = request .getSchemaChangeEvent ();
362+
363+ TableId originalTableId = originalEvent .tableId ();
364+ Schema currentUpstreamSchema =
365+ schemaManager .getLatestOriginalSchema (originalTableId ).orElse (null );
366+
367+ List <SchemaChangeEvent > deducedSchemaChangeEvents = new ArrayList <>();
368+
369+ // For redundant schema change events (possibly coming from duplicate emitted
370+ // CreateTableEvents in snapshot stage), we just skip them.
371+ if (!SchemaUtils .isSchemaChangeEventRedundant (currentUpstreamSchema , originalEvent )) {
372+ schemaManager .applyOriginalSchemaChange (originalEvent );
373+ deducedSchemaChangeEvents .addAll (deduceEvolvedSchemaChanges (originalEvent ));
374+ } else {
375+ LOG .info (
376+ "Schema change event {} is redundant for current schema {}, just skip it." ,
377+ originalEvent ,
378+ currentUpstreamSchema );
379+ }
380+
381+ LOG .info (
382+ "All sink subtask have flushed for table {}. Start to apply schema change request: \n \t {}\n that extracts to:\n \t {}" ,
383+ request .getTableId ().toString (),
384+ request ,
385+ deducedSchemaChangeEvents .stream ()
386+ .map (SchemaChangeEvent ::toString )
387+ .collect (Collectors .joining ("\n \t " )));
388+
384389 if (SchemaChangeBehavior .EXCEPTION .equals (behavior )) {
385390 if (deducedSchemaChangeEvents .stream ()
386391 .anyMatch (evt -> !(evt instanceof CreateTableEvent ))) {
@@ -415,18 +420,14 @@ private void applySchemaChange(
415420 }
416421
417422 // And returns all successfully applied schema change events to SchemaOperator.
418- pendingResponseFuture .complete (
419- wrap (
420- SchemaChangeResponse .success (
421- appliedSchemaChangeEvents , refreshedEvolvedSchemas )));
422- pendingResponseFuture = null ;
423-
424- Preconditions .checkState (
425- schemaChangeStatus == RequestStatus .APPLYING ,
426- "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not "
427- + schemaChangeStatus );
428- schemaChangeStatus = RequestStatus .IDLE ;
429- LOG .info ("SchemaChangeStatus switched from APPLYING to IDLE." );
423+ responseFuture .complete (
424+ wrap (new SchemaChangeResponse (appliedSchemaChangeEvents , refreshedEvolvedSchemas )));
425+
426+ pendingRequests .remove (sourceSubTaskId );
427+ LOG .info (
428+ "Finished handling schema change request from {}. Pending requests: {}" ,
429+ sourceSubTaskId ,
430+ pendingRequests );
430431 }
431432
432433 private boolean applyAndUpdateEvolvedSchemaChange (SchemaChangeEvent schemaChangeEvent ) {
0 commit comments