diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java index 23edeccb404..8e0d626ef31 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.runtime.operators.schema.regular; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; @@ -62,6 +63,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils.wrap; @@ -74,15 +76,22 @@ public class SchemaCoordinator extends SchemaRegistry { private final ExecutorService schemaChangeThreadPool; /** - * Atomic flag indicating if current RequestHandler could accept more schema changes for now. + * Sink writers which have sent flush success events for the request.
+ * {@code MapEntry.Key} is an {@code Integer}, indicating the upstream subTaskId that initiates + * this schema change request.
+ * {@code MapEntry.Value} is a {@code Set}, containing downstream subTasks' ID that + * have acknowledged and successfully flushed pending events for this schema change event. */ - private transient RequestStatus schemaChangeStatus; - - /** Sink writers which have sent flush success events for the request. */ private transient ConcurrentHashMap> flushedSinkWriters; - /** Currently handling request's completable future. */ - private transient CompletableFuture pendingResponseFuture; + /** + * Schema evolution requests that we're currently handling.
+ * For each subTask executing in parallelism, they may initiate requests simultaneously, so we + * use each task's unique subTaskId as Map key to distinguish each of them. + */ + private transient Map< + Integer, Tuple2>> + pendingRequests; // Static fields public SchemaCoordinator( @@ -108,7 +117,7 @@ public SchemaCoordinator( public void start() throws Exception { super.start(); this.flushedSinkWriters = new ConcurrentHashMap<>(); - this.schemaChangeStatus = RequestStatus.IDLE; + this.pendingRequests = new ConcurrentHashMap<>(); } @Override @@ -185,7 +194,7 @@ protected void handleCustomCoordinationRequest( } @Override - protected void handleFlushSuccessEvent(FlushSuccessEvent event) { + protected void handleFlushSuccessEvent(FlushSuccessEvent event) throws TimeoutException { int sinkSubtask = event.getSinkSubTaskId(); int sourceSubtask = event.getSourceSubTaskId(); LOG.info( @@ -200,16 +209,25 @@ protected void handleFlushSuccessEvent(FlushSuccessEvent event) { "Currently flushed sink writers for source task {} are: {}", sourceSubtask, flushedSinkWriters.get(sourceSubtask)); + + if (flushedSinkWriters.get(sourceSubtask).size() >= currentParallelism) { + LOG.info( + "Source SubTask {} have collected enough flush success event. Will start evolving schema changes...", + sourceSubtask); + flushedSinkWriters.remove(sourceSubtask); + startSchemaChangesEvolve(sourceSubtask); + } } @Override protected void handleUnrecoverableError(String taskDescription, Throwable t) { super.handleUnrecoverableError(taskDescription, t); - // There's a pending future, release it exceptionally before quitting - if (pendingResponseFuture != null) { - pendingResponseFuture.completeExceptionally(t); - } + // For each pending future, release it exceptionally before quitting + pendingRequests.forEach( + (index, tuple) -> { + tuple.f1.completeExceptionally(t); + }); } /** @@ -219,73 +237,14 @@ protected void handleUnrecoverableError(String taskDescription, Throwable t) { */ public void handleSchemaChangeRequest( SchemaChangeRequest request, CompletableFuture responseFuture) { - - // We use subTaskId to identify each schema change request - int subTaskId = request.getSubTaskId(); - - if (schemaChangeStatus == RequestStatus.IDLE) { - if (activeSinkWriters.size() < currentParallelism) { - LOG.info( - "Not all active sink writers have been registered. Current {}, expected {}.", - activeSinkWriters.size(), - currentParallelism); - responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush())); - return; - } - - if (!activeSinkWriters.equals(flushedSinkWriters.get(subTaskId))) { - LOG.info( - "Not all active sink writers have completed flush. Flushed writers: {}, expected: {}.", - flushedSinkWriters.get(subTaskId), - activeSinkWriters); - responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush())); - return; - } - - LOG.info( - "All sink writers have flushed for subTaskId {}. Switching to APPLYING state and starting schema evolution...", - subTaskId); - flushedSinkWriters.remove(subTaskId); - schemaChangeStatus = RequestStatus.APPLYING; - pendingResponseFuture = responseFuture; - startSchemaChangesEvolve(request, responseFuture); - } else { - responseFuture.complete(wrap(SchemaChangeResponse.busy())); - } + pendingRequests.put(request.getSubTaskId(), Tuple2.of(request, responseFuture)); } - private void startSchemaChangesEvolve( - SchemaChangeRequest request, CompletableFuture responseFuture) { - SchemaChangeEvent originalEvent = request.getSchemaChangeEvent(); - TableId originalTableId = originalEvent.tableId(); - Schema currentUpstreamSchema = - schemaManager.getLatestOriginalSchema(originalTableId).orElse(null); - - List deducedSchemaChangeEvents = new ArrayList<>(); - - // For redundant schema change events (possibly coming from duplicate emitted - // CreateTableEvents in snapshot stage), we just skip them. - if (!SchemaUtils.isSchemaChangeEventRedundant(currentUpstreamSchema, originalEvent)) { - schemaManager.applyOriginalSchemaChange(originalEvent); - deducedSchemaChangeEvents.addAll(deduceEvolvedSchemaChanges(originalEvent)); - } else { - LOG.info( - "Schema change event {} is redundant for current schema {}, just skip it.", - originalEvent, - currentUpstreamSchema); - } - - LOG.info( - "All sink subtask have flushed for table {}. Start to apply schema change request: \n\t{}\nthat extracts to:\n\t{}", - request.getTableId().toString(), - request, - deducedSchemaChangeEvents.stream() - .map(SchemaChangeEvent::toString) - .collect(Collectors.joining("\n\t"))); + private void startSchemaChangesEvolve(int sourceSubTaskId) { schemaChangeThreadPool.submit( () -> { try { - applySchemaChange(originalEvent, deducedSchemaChangeEvents); + applySchemaChange(sourceSubTaskId); } catch (Throwable t) { failJob( "Schema change applying task", @@ -379,8 +338,54 @@ private List deduceEvolvedSchemaChanges(SchemaChangeEvent eve } /** Applies the schema change to the external system. */ - private void applySchemaChange( - SchemaChangeEvent originalEvent, List deducedSchemaChangeEvents) { + private void applySchemaChange(int sourceSubTaskId) { + try { + loopUntil( + () -> pendingRequests.containsKey(sourceSubTaskId), + () -> + LOG.info( + "SchemaOperator {} has not submitted schema change request yet. Waiting...", + sourceSubTaskId), + rpcTimeout, + Duration.ofMillis(100)); + } catch (TimeoutException e) { + throw new RuntimeException( + "Timeout waiting for schema change request from SchemaOperator.", e); + } + + Tuple2> requestBody = + pendingRequests.get(sourceSubTaskId); + SchemaChangeRequest request = requestBody.f0; + CompletableFuture responseFuture = requestBody.f1; + + SchemaChangeEvent originalEvent = request.getSchemaChangeEvent(); + + TableId originalTableId = originalEvent.tableId(); + Schema currentUpstreamSchema = + schemaManager.getLatestOriginalSchema(originalTableId).orElse(null); + + List deducedSchemaChangeEvents = new ArrayList<>(); + + // For redundant schema change events (possibly coming from duplicate emitted + // CreateTableEvents in snapshot stage), we just skip them. + if (!SchemaUtils.isSchemaChangeEventRedundant(currentUpstreamSchema, originalEvent)) { + schemaManager.applyOriginalSchemaChange(originalEvent); + deducedSchemaChangeEvents.addAll(deduceEvolvedSchemaChanges(originalEvent)); + } else { + LOG.info( + "Schema change event {} is redundant for current schema {}, just skip it.", + originalEvent, + currentUpstreamSchema); + } + + LOG.info( + "All sink subtask have flushed for table {}. Start to apply schema change request: \n\t{}\nthat extracts to:\n\t{}", + request.getTableId().toString(), + request, + deducedSchemaChangeEvents.stream() + .map(SchemaChangeEvent::toString) + .collect(Collectors.joining("\n\t"))); + if (SchemaChangeBehavior.EXCEPTION.equals(behavior)) { if (deducedSchemaChangeEvents.stream() .anyMatch(evt -> !(evt instanceof CreateTableEvent))) { @@ -415,18 +420,14 @@ private void applySchemaChange( } // And returns all successfully applied schema change events to SchemaOperator. - pendingResponseFuture.complete( - wrap( - SchemaChangeResponse.success( - appliedSchemaChangeEvents, refreshedEvolvedSchemas))); - pendingResponseFuture = null; - - Preconditions.checkState( - schemaChangeStatus == RequestStatus.APPLYING, - "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not " - + schemaChangeStatus); - schemaChangeStatus = RequestStatus.IDLE; - LOG.info("SchemaChangeStatus switched from APPLYING to IDLE."); + responseFuture.complete( + wrap(new SchemaChangeResponse(appliedSchemaChangeEvents, refreshedEvolvedSchemas))); + + pendingRequests.remove(sourceSubTaskId); + LOG.info( + "Finished handling schema change request from {}. Pending requests: {}", + sourceSubTaskId, + pendingRequests); } private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java index 7d62d4c0737..3ef720d5fc9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java @@ -57,7 +57,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; @@ -172,45 +171,26 @@ private void handleSchemaChangeEvent(SchemaChangeEvent originalEvent) throws Exc // Then, queue to request schema change to SchemaCoordinator. SchemaChangeResponse response = requestSchemaChange(tableId, originalEvent); - if (response.isSuccess()) { - LOG.info("{}> Successfully requested schema change.", subTaskId); - LOG.info( - "{}> Finished schema change events: {}", - subTaskId, - response.getAppliedSchemaChangeEvents()); - LOG.info("{}> Refreshed evolved schemas: {}", subTaskId, response.getEvolvedSchemas()); + LOG.info("{}> Successfully requested schema change.", subTaskId); + LOG.info( + "{}> Finished schema change events: {}", + subTaskId, + response.getAppliedSchemaChangeEvents()); + LOG.info("{}> Refreshed evolved schemas: {}", subTaskId, response.getEvolvedSchemas()); - // After this request got successfully applied to DBMS, we can... - List finishedSchemaChangeEvents = - response.getAppliedSchemaChangeEvents(); + // After this request got successfully applied to DBMS, we can... + List finishedSchemaChangeEvents = + response.getAppliedSchemaChangeEvents(); - // Update local evolved schema map's cache - evolvedSchemaMap.putAll(response.getEvolvedSchemas()); + // Update local evolved schema map's cache + evolvedSchemaMap.putAll(response.getEvolvedSchemas()); - // and emit the finished event to downstream - for (SchemaChangeEvent finishedEvent : finishedSchemaChangeEvents) { - output.collect(new StreamRecord<>(finishedEvent)); - } - - schemaOperatorMetrics.increaseFinishedSchemaChangeEvents( - finishedSchemaChangeEvents.size()); - } else if (response.isDuplicate()) { - LOG.info( - "{}> Schema change event {} has been handled in another subTask already.", - subTaskId, - originalEvent); - - schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(1); - } else if (response.isIgnored()) { - LOG.info( - "{}> Schema change event {} has been ignored. No schema evolution needed.", - subTaskId, - originalEvent); - - schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(1); - } else { - throw new IllegalStateException("Unexpected response status: " + response); + // and emit the finished event to downstream + for (SchemaChangeEvent finishedEvent : finishedSchemaChangeEvents) { + output.collect(new StreamRecord<>(finishedEvent)); } + + schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(finishedSchemaChangeEvents.size()); } private void handleDataChangeEvent(DataChangeEvent dataChangeEvent) { @@ -243,31 +223,9 @@ private void handleDataChangeEvent(DataChangeEvent dataChangeEvent) { } private SchemaChangeResponse requestSchemaChange( - TableId tableId, SchemaChangeEvent schemaChangeEvent) - throws InterruptedException, TimeoutException { - long deadline = System.currentTimeMillis() + rpcTimeout.toMillis(); - while (true) { - SchemaChangeResponse response = - sendRequestToCoordinator( - new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId)); - if (System.currentTimeMillis() < deadline) { - if (response.isRegistryBusy()) { - LOG.info( - "{}> Schema Registry is busy now, waiting for next request...", - subTaskId); - Thread.sleep(1000); - } else if (response.isWaitingForFlush()) { - LOG.info( - "{}> Schema change event has not collected enough flush success events from writers, waiting...", - subTaskId); - Thread.sleep(1000); - } else { - return response; - } - } else { - throw new TimeoutException("Timeout when requesting schema change."); - } - } + TableId tableId, SchemaChangeEvent schemaChangeEvent) { + return sendRequestToCoordinator( + new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId)); } private diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java index 282557552c3..f8e747bf746 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java @@ -24,7 +24,6 @@ import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -44,62 +43,13 @@ public class SchemaChangeResponse implements CoordinationResponse { private final Map evolvedSchemas; - private final ResponseCode responseCode; - - public static SchemaChangeResponse success( - List schemaChangeEvents, Map evolvedSchemas) { - return new SchemaChangeResponse(ResponseCode.SUCCESS, schemaChangeEvents, evolvedSchemas); - } - - public static SchemaChangeResponse busy() { - return new SchemaChangeResponse(ResponseCode.BUSY); - } - - public static SchemaChangeResponse duplicate() { - return new SchemaChangeResponse(ResponseCode.DUPLICATE); - } - - public static SchemaChangeResponse ignored() { - return new SchemaChangeResponse(ResponseCode.IGNORED); - } - - public static SchemaChangeResponse waitingForFlush() { - return new SchemaChangeResponse(ResponseCode.WAITING_FOR_FLUSH); - } - - private SchemaChangeResponse(ResponseCode responseCode) { - this(responseCode, Collections.emptyList(), Collections.emptyMap()); - } - - private SchemaChangeResponse( - ResponseCode responseCode, + public SchemaChangeResponse( List appliedSchemaChangeEvents, Map evolvedSchemas) { - this.responseCode = responseCode; this.appliedSchemaChangeEvents = appliedSchemaChangeEvents; this.evolvedSchemas = evolvedSchemas; } - public boolean isSuccess() { - return ResponseCode.SUCCESS.equals(responseCode); - } - - public boolean isRegistryBusy() { - return ResponseCode.BUSY.equals(responseCode); - } - - public boolean isDuplicate() { - return ResponseCode.DUPLICATE.equals(responseCode); - } - - public boolean isIgnored() { - return ResponseCode.IGNORED.equals(responseCode); - } - - public boolean isWaitingForFlush() { - return ResponseCode.WAITING_FOR_FLUSH.equals(responseCode); - } - public List getAppliedSchemaChangeEvents() { return appliedSchemaChangeEvents; } @@ -118,43 +68,21 @@ public boolean equals(Object o) { } SchemaChangeResponse response = (SchemaChangeResponse) o; return Objects.equals(appliedSchemaChangeEvents, response.appliedSchemaChangeEvents) - && responseCode == response.responseCode; + && Objects.equals(evolvedSchemas, response.evolvedSchemas); } @Override public int hashCode() { - return Objects.hash(appliedSchemaChangeEvents, responseCode); + return Objects.hash(appliedSchemaChangeEvents, evolvedSchemas); } @Override public String toString() { return "SchemaChangeResponse{" - + "schemaChangeEvents=" + + "appliedSchemaChangeEvents=" + appliedSchemaChangeEvents - + ", responseCode=" - + responseCode + + ", evolvedSchemas=" + + evolvedSchemas + '}'; } - - /** - * Schema Change Response status code. - * - *

- Accepted: Requested schema change request has been accepted exclusively. Any other - * schema change requests will be blocked. - * - *

- Busy: Schema registry is currently busy processing another schema change request. - * - *

- Duplicate: This schema change request has been submitted before, possibly by another - * paralleled subTask. - * - *

- Ignored: This schema change request has been assessed, but no actual evolution is - * required. Possibly caused by LENIENT mode or merging table strategies. - */ - public enum ResponseCode { - SUCCESS, - BUSY, - DUPLICATE, - IGNORED, - WAITING_FOR_FLUSH - } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java index fbeb325066f..19753675101 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java @@ -87,16 +87,12 @@ private void processSchemaChangeEvent( // Create the flush event to process before the schema change event FlushEvent flushEvent = createFlushEvent(tableId, event); - // Send schema change request to coordinator - schemaOperatorHarness.requestSchemaChangeEvent(tableId, event); - // Send flush event to SinkWriterOperator dataSinkWriterOperator.processElement(new StreamRecord<>(flushEvent)); - // Wait for coordinator to complete the schema change and get the finished schema change - // events + // Send schema change request to coordinator SchemaChangeResponse schemaEvolveResponse = - schemaOperatorHarness.requestSchemaChangeResult(tableId, event); + schemaOperatorHarness.requestSchemaChangeEvent(tableId, event); List finishedSchemaChangeEvents = schemaEvolveResponse.getAppliedSchemaChangeEvents(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java index 11a97064984..ea4de7a14e7 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils; @@ -66,10 +67,8 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; import static org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils.unwrap; /** @@ -91,6 +90,9 @@ public class RegularEventOperatorTestHarness with(OP operator, int numOutputs) { operator, numOutputs, null, - null, + DEFAULT_RPC_TIMEOUT, SchemaChangeBehavior.EVOLVE, Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()), Collections.emptySet()); @@ -143,7 +145,7 @@ RegularEventOperatorTestHarness withDuration( operator, numOutputs, evolveDuration, - null, + DEFAULT_RPC_TIMEOUT, SchemaChangeBehavior.EVOLVE, Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()), Collections.emptySet()); @@ -159,7 +161,7 @@ RegularEventOperatorTestHarness withDurationAndBehavior( operator, numOutputs, evolveDuration, - null, + DEFAULT_RPC_TIMEOUT, behavior, Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()), Collections.emptySet()); @@ -176,7 +178,7 @@ RegularEventOperatorTestHarness withDurationAndFineGrainedBehavior( operator, numOutputs, evolveDuration, - null, + DEFAULT_RPC_TIMEOUT, behavior, enabledEventTypes, Collections.emptySet()); @@ -195,7 +197,7 @@ RegularEventOperatorTestHarness withDurationAndFineGrainedBehaviorWithErr operator, numOutputs, evolveDuration, - null, + DEFAULT_RPC_TIMEOUT, behavior, enabledEventTypes, errorOnEventTypes); @@ -240,31 +242,6 @@ public SchemaChangeResponse requestSchemaChangeEvent(TableId tableId, SchemaChan .get()); } - public SchemaChangeResponse requestSchemaChangeResult(TableId tableId, SchemaChangeEvent event) - throws ExecutionException, InterruptedException, TimeoutException { - long rpcTimeOutInMillis = DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis(); - long deadline = System.currentTimeMillis() + rpcTimeOutInMillis; - while (true) { - LOG.info("request schema change result"); - SchemaChangeResponse response = requestSchemaChangeEvent(tableId, event); - if (System.currentTimeMillis() < deadline) { - if (response.isRegistryBusy()) { - LOG.info("{}> Schema Registry is busy now, waiting for next request...", 0); - Thread.sleep(1000); - } else if (response.isWaitingForFlush()) { - LOG.info( - "{}> Schema change event has not collected enough flush success events from writers, waiting...", - 0); - Thread.sleep(1000); - } else { - return response; - } - } else { - throw new TimeoutException("Timeout when requesting schema change."); - } - } - } - public Schema getLatestOriginalSchema(TableId tableId) throws Exception { return ((GetOriginalSchemaResponse) unwrap(