Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,18 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
/**
* Hudi backed table metadata writer.
*
* @param hadoopConf - Hadoop configuration to use for the metadata writer
* @param writeConfig - Writer config
* @param engineContext - Engine context
* @param actionMetadata - Optional action metadata to help decide bootstrap operations
* @param <T> - Action metadata types extending Avro generated SpecificRecordBase
* @param hadoopConf - Hadoop configuration to use for the metadata writer
* @param writeConfig - Writer config
* @param engineContext - Engine context
* @param actionMetadata - Optional action metadata to help decide bootstrap operations
* @param <T> - Action metadata types extending Avro generated SpecificRecordBase
* @param inflightInstantTimestamp - Timestamp of any instant in progress
*/
protected <T extends SpecificRecordBase> HoodieBackedTableMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext,
Option<T> actionMetadata) {
Option<T> actionMetadata,
Option<String> inflightInstantTimestamp) {
this.dataWriteConfig = writeConfig;
this.engineContext = engineContext;
this.hadoopConf = new SerializableConfiguration(hadoopConf);
Expand All @@ -137,14 +139,19 @@ protected <T extends SpecificRecordBase> HoodieBackedTableMetadataWriter(Configu
initRegistry();
this.dataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
initialize(engineContext, actionMetadata);
initialize(engineContext, actionMetadata, inflightInstantTimestamp);
initTableMetadata();
} else {
enabled = false;
this.metrics = Option.empty();
}
}

public HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext) {
this(hadoopConf, writeConfig, engineContext, Option.empty(), Option.empty());
}

protected abstract void initRegistry();

/**
Expand Down Expand Up @@ -234,11 +241,17 @@ public HoodieBackedTableMetadata metadata() {

/**
* Initialize the metadata table if it does not exist.
* <p>
*
* If the metadata table does not exist, then file and partition listing is used to bootstrap the table.
*
* @param engineContext
* @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase
* @param inflightInstantTimestamp Timestamp of an instant in progress on the dataset. This instant is ignored
* while deciding to bootstrap the metadata table.
*/
protected abstract <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext,
Option<T> actionMetadata);
Option<T> actionMetadata,
Option<String> inflightInstantTimestamp);

public void initTableMetadata() {
try {
Expand All @@ -260,11 +273,13 @@ public void initTableMetadata() {
* @param dataMetaClient - Meta client for the data table
* @param actionMetadata - Optional action metadata
* @param <T> - Action metadata types extending Avro generated SpecificRecordBase
* @param inflightInstantTimestamp - Timestamp of an instant in progress on the dataset. This instant is ignored
* @throws IOException
*/
protected <T extends SpecificRecordBase> void bootstrapIfNeeded(HoodieEngineContext engineContext,
HoodieTableMetaClient dataMetaClient,
Option<T> actionMetadata) throws IOException {
Option<T> actionMetadata,
Option<String> inflightInstantTimestamp) throws IOException {
HoodieTimer timer = new HoodieTimer().startTimer();

boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(),
Expand All @@ -291,7 +306,7 @@ protected <T extends SpecificRecordBase> void bootstrapIfNeeded(HoodieEngineCont

if (!exists) {
// Initialize for the first time by listing partitions and files directly from the file system
if (bootstrapFromFilesystem(engineContext, dataMetaClient)) {
if (bootstrapFromFilesystem(engineContext, dataMetaClient, inflightInstantTimestamp)) {
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
}
}
Expand Down Expand Up @@ -347,23 +362,29 @@ private <T extends SpecificRecordBase> boolean isBootstrapNeeded(Option<HoodieIn
* Initialize the Metadata Table by listing files and partitions from the file system.
*
* @param dataMetaClient {@code HoodieTableMetaClient} for the dataset.
* @param inflightInstantTimestamp
*/
private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient) throws IOException {
private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient,
Option<String> inflightInstantTimestamp) throws IOException {
ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled");

// We can only bootstrap if there are no pending operations on the dataset
Option<HoodieInstant> pendingDataInstant = Option.fromJavaOptional(dataMetaClient.getActiveTimeline()
.getReverseOrderedInstants().filter(i -> !i.isCompleted()).findFirst());
if (pendingDataInstant.isPresent()) {
List<HoodieInstant> pendingDataInstant = dataMetaClient.getActiveTimeline()
.getInstants().filter(i -> !i.isCompleted())
.filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get()))
.collect(Collectors.toList());

if (!pendingDataInstant.isEmpty()) {
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
LOG.warn("Cannot bootstrap metadata table as operation is in progress in dataset: " + pendingDataInstant.get());
LOG.warn("Cannot bootstrap metadata table as operation(s) are in progress on the dataset: "
+ Arrays.toString(pendingDataInstant.toArray()));
return false;
}

// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the latest commit timestamp.
String createInstantTime = dataMetaClient.getActiveTimeline().getReverseOrderedInstants().findFirst()
.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
// Otherwise, we use the timestamp of the latest completed action.
String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants()
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);

HoodieTableMetaClient.withPropertyBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ <T extends SpecificRecordBase> FlinkHoodieBackedTableMetadataWriter(Configuratio
HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext,
Option<T> actionMetadata) {
super(hadoopConf, writeConfig, engineContext, actionMetadata);
super(hadoopConf, writeConfig, engineContext, actionMetadata, Option.empty());
}

@Override
Expand All @@ -78,10 +78,11 @@ protected void initRegistry() {

@Override
protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext,
Option<T> actionMetadata) {
Option<T> actionMetadata,
Option<String> inflightInstantTimestamp) {
try {
if (enabled) {
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata);
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp);
}
} catch (IOException e) {
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,32 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig client

@Deprecated
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
super(context, writeConfig);
this(context, writeConfig, Option.empty());
}

@Deprecated
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
this(context, writeConfig, timelineService);
}

public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
initializeMetadataTable(Option.empty());
}

private void initializeMetadataTable(Option<String> inflightInstantTimestamp) {
if (config.isMetadataTableEnabled()) {
// If the metadata table does not exist, it should be bootstrapped here
// TODO: Check if we can remove this requirement - auto bootstrap on commit
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context);
// Defer bootstrap if upgrade / downgrade is pending
HoodieTableMetaClient metaClient = createMetaClient(true);
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance());
if (!upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
// TODO: Check if we can remove this requirement - auto bootstrap on commit
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, Option.empty(),
inflightInstantTimestamp);
}
}
}

Expand Down Expand Up @@ -213,7 +223,6 @@ public HoodieWriteResult insertOverwrite(JavaRDD<HoodieRecord<T>> records, final
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}


/**
* Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table.

Expand Down Expand Up @@ -369,7 +378,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstan
private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String clusteringCommitTime) {

List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());

Expand Down Expand Up @@ -439,6 +448,9 @@ protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<W
upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
}
metaClient.reloadActiveTimeline();

// re-bootstrap metadata table if required
initializeMetadataTable(Option.of(instantTime));
}
metaClient.validateTableProperties(config.getProps(), operationType);
return getTableAndInitCtx(metaClient, operationType, instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,41 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad

private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);

public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig,
HoodieEngineContext context) {
return create(conf, writeConfig, context, Option.empty());
}

/**
* Return a Spark based implementation of {@code HoodieTableMetadataWriter} which can be used to
* write to the metadata table.
*
* If the metadata table does not exist, an attempt is made to bootstrap it but there is no guarantted that
* table will end up bootstrapping at this time.
*
* @param conf
* @param writeConfig
* @param context
* @param actionMetadata
* @param inflightInstantTimestamp Timestamp of an instant which is in-progress. This instant is ignored while
* attempting to bootstrap the table.
* @return An instance of the {@code HoodieTableMetadataWriter}
*/
public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf,
HoodieWriteConfig writeConfig,
HoodieEngineContext context,
Option<T> actionMetadata) {
return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata);
Option<T> actionMetadata,
Option<String> inflightInstantTimestamp) {
return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata,
inflightInstantTimestamp);
}

public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig,
HoodieEngineContext context) {
return create(conf, writeConfig, context, Option.empty(), Option.empty());
}

<T extends SpecificRecordBase> SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext,
Option<T> actionMetadata) {
super(hadoopConf, writeConfig, engineContext, actionMetadata);
Option<T> actionMetadata,
Option<String> inflightInstantTimestamp) {
super(hadoopConf, writeConfig, engineContext, actionMetadata, inflightInstantTimestamp);
}

@Override
Expand All @@ -84,7 +102,8 @@ protected void initRegistry() {

@Override
protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext,
Option<T> actionMetadata) {
Option<T> actionMetadata,
Option<String> inflightInstantTimestamp) {
try {
metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
if (registry instanceof DistributedRegistry) {
Expand All @@ -94,7 +113,7 @@ protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext eng
});

if (enabled) {
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata);
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp);
}
} catch (IOException e) {
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetad
}
if (isMetadataTableAvailable) {
return Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context,
actionMetadata));
actionMetadata, Option.empty()));
} else {
return Option.empty();
}
Expand Down
Loading