Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Coll

@Override
public void close() {
super.close();
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
Expand Down Expand Up @@ -402,6 +401,11 @@ private void bufferRecord(HoodieRecord<?> value) {
}
}

private boolean hasData() {
return this.buckets.size() > 0
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}

@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);
Expand Down Expand Up @@ -435,7 +439,7 @@ private boolean flushBucket(DataBucket bucket) {

@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean endInput) {
this.currentInstant = instantToWrite(false);
this.currentInstant = instantToWrite(hasData());
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
throw new HoodieException("No inflight instant when flushing data!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.message.MessageBus;
import org.apache.hudi.sink.message.MessageDriver;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
Expand All @@ -42,6 +41,7 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -136,11 +136,6 @@ public class StreamWriteOperatorCoordinator
*/
private transient TableState tableState;

/**
* The message driver.
*/
private MessageDriver messageDriver;

/**
* Constructs a StreamingSinkOperatorCoordinator.
*
Expand Down Expand Up @@ -179,7 +174,6 @@ public void start() throws Exception {
if (tableState.syncMetadata) {
initMetadataSync();
}
this.messageDriver = MessageBus.getDriver(this.metaClient.getFs(), metaClient.getBasePath());
}

@Override
Expand All @@ -197,9 +191,6 @@ public void close() throws Exception {
writeClient.close();
}
this.eventBuffer = null;
if (this.messageDriver != null) {
this.messageDriver.close();
}
}

@Override
Expand Down Expand Up @@ -236,7 +227,7 @@ public void notifyCheckpointComplete(long checkpointId) {
writeClient.scheduleCompaction(Option.empty());
}
// start new instant.
startInstant(checkpointId);
startInstant();
// sync Hive if is enabled
syncHiveIfEnabled();
}
Expand All @@ -246,7 +237,12 @@ public void notifyCheckpointComplete(long checkpointId) {

@Override
public void notifyCheckpointAborted(long checkpointId) {
this.messageDriver.abortCkp(checkpointId);
// once the checkpoint was aborted, unblock the writer tasks to
// reuse the last instant.
if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
executor.execute(() -> sendCommitAckEvents(checkpointId),
"unblock data write with aborted checkpoint %s", checkpointId);
}
}

@Override
Expand Down Expand Up @@ -337,19 +333,12 @@ private void addEventToBuffer(WriteMetadataEvent event) {
}

private void startInstant() {
// the flink checkpoint id starts from 1,
// see AbstractStreamWriteFunction#ackInstant
startInstant(MessageBus.INITIAL_CKP_ID);
}

private void startInstant(long checkpoint) {
final String instant = HoodieActiveTimeline.createNewInstantTime();
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, instant);
this.writeClient.upgradeDowngrade(instant);
this.messageDriver.commitCkp(checkpoint, this.instant, instant);
this.instant = instant;
LOG.info("Create instant [{}] for table [{}] with type [{}]", instant,
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
this.writeClient.upgradeDowngrade(this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
}

Expand Down Expand Up @@ -408,6 +397,33 @@ private void handleWriteMetaEvent(WriteMetadataEvent event) {
addEventToBuffer(event);
}

/**
* The coordinator reuses the instant if there is no data for this round of checkpoint,
* sends the commit ack events to unblock the flushing.
*/
private void sendCommitAckEvents(long checkpointId) {
CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
.map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
.toArray(CompletableFuture<?>[]::new);
try {
CompletableFuture.allOf(futures).get();
} catch (Throwable throwable) {
if (!sendToFinishedTasks(throwable)) {
throw new HoodieException("Error while waiting for the commit ack events to finish sending", throwable);
}
}
}

/**
* Decides whether the given exception is caused by sending events to FINISHED tasks.
*
* <p>Ugly impl: the exception may change in the future.
*/
private static boolean sendToFinishedTasks(Throwable throwable) {
return throwable.getCause() instanceof TaskNotRunningException
|| throwable.getCause().getMessage().contains("running");
}

/**
* Commits the instant.
*/
Expand Down Expand Up @@ -435,7 +451,8 @@ private boolean commitInstant(String instant, long checkpointId) {
if (writeResults.size() == 0) {
// No data has written, reset the buffer and returns early
reset();
messageDriver.commitCkp(checkpointId, this.instant, this.instant);
// Send commit ack event to the write function to unblock the flushing
sendCommitAckEvents(checkpointId);
return false;
}
doCommit(instant, writeResults);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.message.MessageBus;
import org.apache.hudi.sink.message.MessageClient;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;

Expand All @@ -40,8 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -83,20 +79,25 @@ public class BulkInsertWriteFunction<I>
*/
private int taskID;

/**
* Meta Client.
*/
private transient HoodieTableMetaClient metaClient;

/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;

/**
* Gateway to send operator events to the operator coordinator.
* The initial inflight instant when start up.
*/
private transient OperatorEventGateway eventGateway;
private volatile String initInstant;

/**
* The message client.
* Gateway to send operator events to the operator coordinator.
*/
private MessageClient messageClient;
private transient OperatorEventGateway eventGateway;

/**
* Constructs a StreamingSinkFunction.
Expand All @@ -111,8 +112,9 @@ public BulkInsertWriteFunction(Configuration config, RowType rowType) {
@Override
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.metaClient = StreamerUtil.createMetaClient(this.config);
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.messageClient = MessageBus.getClient(config.getString(FlinkOptions.PATH));
this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient, false);
sendBootstrapEvent();
initWriterHelper();
}
Expand All @@ -128,9 +130,6 @@ public void close() {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
if (this.messageClient != null) {
this.messageClient.close();
}
}

/**
Expand Down Expand Up @@ -184,32 +183,23 @@ private void sendBootstrapEvent() {
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
}

/**
* Returns the next instant to write from the message bus.
*/
@Nullable
private String ackInstant() {
Option<MessageBus.CkpMessage> ckpMessageOption = this.messageClient.getCkpMessage(MessageBus.INITIAL_CKP_ID);
return ckpMessageOption.map(message -> message.inflightInstant).orElse(null);
}

private String instantToWrite() {
String instant = ackInstant();
String instant = StreamerUtil.getLastPendingInstant(this.metaClient);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
TimeWait timeWait = TimeWait.builder()
.timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
.action("instant initialize")
.throwsT(true)
.build();
while (instant == null) {
while (instant == null || instant.equals(this.initInstant)) {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change
// sleep for a while
timeWait.waitFor();
// refresh the inflight instant
instant = ackInstant();
instant = StreamerUtil.getLastPendingInstant(this.metaClient);
}
return instant;
}
Expand Down
Loading