Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ public static class Config {
.defaultValue(false)
.withDocumentation("Automatically submits offset to kafka.");

public static final ConfigProperty<Boolean> ENABLE_FAIL_ON_DATA_LOSS = ConfigProperty
.key("hoodie.deltastreamer.source.kafka.enable.failOnDataLoss")
.defaultValue(false)
.withDocumentation("Fail when checkpoint goes out of bounds instead of seeking to earliest offsets.");

public static final ConfigProperty<Long> MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = ConfigProperty
.key("hoodie.deltastreamer.kafka.source.maxEvents")
.defaultValue(5000000L)
Expand Down Expand Up @@ -329,9 +334,19 @@ private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer,
Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream()
.anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
if (isCheckpointOutOfBounds) {
if (this.props.getBoolean(Config.ENABLE_FAIL_ON_DATA_LOSS.key(), Config.ENABLE_FAIL_ON_DATA_LOSS.defaultValue())) {
throw new HoodieDeltaStreamerException("Some data may have been lost because they are not available in Kafka any more;"
+ " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.");
} else {
LOG.warn("Some data may have been lost because they are not available in Kafka any more;"
+ " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed."
+ " If you want delta streamer to fail on such cases, set \"" + Config.ENABLE_FAIL_ON_DATA_LOSS.key() + "\" to \"true\".");
}
}
return isCheckpointOutOfBounds ? earliestOffsets : checkpointOffsets;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;

Expand All @@ -48,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;

import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
Expand Down Expand Up @@ -367,4 +369,41 @@ public void testCommitOffsetToKafka() {
props.remove(ConsumerConfig.GROUP_ID_CONFIG);
assertThrows(HoodieNotSupportedException.class,() -> kafkaSource.getSource().onCommit(""));
}

@Test
public void testFailOnDataLoss() throws Exception {
// create a topic with very short retention
final String topic = TEST_TOPIC_PREFIX + "testFailOnDataLoss";
Properties topicConfig = new Properties();
topicConfig.setProperty("retention.ms", "10000");
testUtils.createTopic(topic, 1, topicConfig);

HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties failOnDataLossProps = createPropsForJsonSource(topic, null, "earliest");
failOnDataLossProps.setProperty(Config.ENABLE_FAIL_ON_DATA_LOSS.key(), Boolean.toString(true));

Source jsonSource = new JsonKafkaSource(failOnDataLossProps, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 10)));
// send 10 records, extract 2 records to generate a checkpoint
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 2);
assertEquals(2, fetch1.getBatch().get().count());

// wait for the checkpoint to expire
Thread.sleep(10001);
Throwable t = assertThrows(HoodieDeltaStreamerException.class, () -> {
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
});
assertEquals(
"Some data may have been lost because they are not available in Kafka any more;"
+ " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.",
t.getMessage());
t = assertThrows(HoodieDeltaStreamerException.class, () -> {
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
});
assertEquals(
"Some data may have been lost because they are not available in Kafka any more;"
+ " either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.",
t.getMessage());
}
}