Skip to content

Commit 2fa29b6

Browse files
committed
fix ci
1 parent 2be3374 commit 2fa29b6

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,8 @@ private void applySchemaChange(int sourceSubTaskId) {
336336
LOG.info(
337337
"SchemaOperator {} has not submitted schema change request yet. Waiting...",
338338
sourceSubTaskId),
339-
Duration.ofMillis(100),
340-
rpcTimeout);
339+
rpcTimeout,
340+
Duration.ofMillis(100));
341341
} catch (TimeoutException e) {
342342
throw new RuntimeException(
343343
"Timeout waiting for schema change request from SchemaOperator.", e);

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,12 @@ private void processSchemaChangeEvent(
8787
// Create the flush event to process before the schema change event
8888
FlushEvent flushEvent = createFlushEvent(tableId, event);
8989

90-
// Send schema change request to coordinator
91-
schemaOperatorHarness.requestSchemaChangeEvent(tableId, event);
92-
9390
// Send flush event to SinkWriterOperator
9491
dataSinkWriterOperator.processElement(new StreamRecord<>(flushEvent));
9592

96-
// Wait for coordinator to complete the schema change and get the finished schema change
97-
// events
93+
// Send schema change request to coordinator
9894
SchemaChangeResponse schemaEvolveResponse =
99-
schemaOperatorHarness.requestSchemaChangeResult(tableId, event);
95+
schemaOperatorHarness.requestSchemaChangeEvent(tableId, event);
10096
List<SchemaChangeEvent> finishedSchemaChangeEvents =
10197
schemaEvolveResponse.getAppliedSchemaChangeEvents();
10298

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2424
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
2525
import org.apache.flink.cdc.common.event.TableId;
26+
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
2627
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2728
import org.apache.flink.cdc.common.schema.Schema;
2829
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
@@ -91,6 +92,9 @@ public class RegularEventOperatorTestHarness<OP extends AbstractStreamOperator<E
9192

9293
public static final OperatorID SINK_OPERATOR_ID = new OperatorID(15214L, 15514L);
9394

95+
private static final Duration DEFAULT_RPC_TIMEOUT =
96+
PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
97+
9498
private final OP operator;
9599
private final int numOutputs;
96100
private final SchemaCoordinator schemaRegistry;
@@ -130,7 +134,7 @@ RegularEventOperatorTestHarness<OP, E> with(OP operator, int numOutputs) {
130134
operator,
131135
numOutputs,
132136
null,
133-
null,
137+
DEFAULT_RPC_TIMEOUT,
134138
SchemaChangeBehavior.EVOLVE,
135139
Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()),
136140
Collections.emptySet());
@@ -143,7 +147,7 @@ RegularEventOperatorTestHarness<OP, E> withDuration(
143147
operator,
144148
numOutputs,
145149
evolveDuration,
146-
null,
150+
DEFAULT_RPC_TIMEOUT,
147151
SchemaChangeBehavior.EVOLVE,
148152
Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()),
149153
Collections.emptySet());
@@ -159,7 +163,7 @@ RegularEventOperatorTestHarness<OP, E> withDurationAndBehavior(
159163
operator,
160164
numOutputs,
161165
evolveDuration,
162-
null,
166+
DEFAULT_RPC_TIMEOUT,
163167
behavior,
164168
Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()),
165169
Collections.emptySet());
@@ -176,7 +180,7 @@ RegularEventOperatorTestHarness<OP, E> withDurationAndFineGrainedBehavior(
176180
operator,
177181
numOutputs,
178182
evolveDuration,
179-
null,
183+
DEFAULT_RPC_TIMEOUT,
180184
behavior,
181185
enabledEventTypes,
182186
Collections.emptySet());
@@ -195,7 +199,7 @@ RegularEventOperatorTestHarness<OP, E> withDurationAndFineGrainedBehaviorWithErr
195199
operator,
196200
numOutputs,
197201
evolveDuration,
198-
null,
202+
DEFAULT_RPC_TIMEOUT,
199203
behavior,
200204
enabledEventTypes,
201205
errorOnEventTypes);

0 commit comments

Comments
 (0)