Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,44 +255,74 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, Sche
*/
public void refreshTimeline() throws IOException {
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build();
switch (meta.getTableType()) {
case COPY_ON_WRITE:
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
break;
case MERGE_ON_READ:
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants());
this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
break;
default:
throw new HoodieException("Unsupported table type :" + meta.getTableType());
try {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build();
switch (meta.getTableType()) {
case COPY_ON_WRITE:
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
break;
case MERGE_ON_READ:
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants());
this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
break;
default:
throw new HoodieException("Unsupported table type :" + meta.getTableType());
}
} catch (HoodieIOException e) {
LOG.warn("Full exception msg " + e.getMessage());
if (e.getMessage().contains("Could not load Hoodie properties") && e.getMessage().contains(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
String basePathWithForwardSlash = cfg.targetBasePath.endsWith("/") ? cfg.targetBasePath : String.format("%s/", cfg.targetBasePath);
String pathToHoodieProps = String.format("%s%s/%s", basePathWithForwardSlash, HoodieTableMetaClient.METAFOLDER_NAME, HoodieTableConfig.HOODIE_PROPERTIES_FILE);
String pathToHoodiePropsBackup = String.format("%s%s/%s", basePathWithForwardSlash, HoodieTableMetaClient.METAFOLDER_NAME, HoodieTableConfig.HOODIE_PROPERTIES_FILE_BACKUP);
boolean hoodiePropertiesExists = fs.exists(new Path(basePathWithForwardSlash))
&& fs.exists(new Path(pathToHoodieProps))
&& fs.exists(new Path(pathToHoodiePropsBackup));
if (!hoodiePropertiesExists) {
LOG.warn("Base path exists, but table is not fully initialized. Re-initializing again");
initializeEmptyTable();
// reload the timeline from metaClient and validate that its empty table. If there are any instants found, then we should fail the pipeline, bcoz hoodie.properties got deleted by mistake.
HoodieTableMetaClient metaClientToValidate = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).build();
if (metaClientToValidate.reloadActiveTimeline().getInstants().count() > 0) {
// Deleting the recreated hoodie.properties and throwing exception.
fs.delete(new Path(String.format("%s%s/%s", basePathWithForwardSlash, HoodieTableMetaClient.METAFOLDER_NAME, HoodieTableConfig.HOODIE_PROPERTIES_FILE)));
throw new HoodieIOException("hoodie.properties is missing. Likely due to some external entity. Please populate the hoodie.properties and restart the pipeline. ",
e.getIOException());
}
}
} else {
throw e;
}
}
} else {
this.commitTimelineOpt = Option.empty();
this.allCommitsTimelineOpt = Option.empty();
String partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(cfg.payloadClassName)
.setBaseFileFormat(cfg.baseFileFormat)
.setPartitionFields(partitionColumns)
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
.setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
SimpleKeyGenerator.class.getName()))
.setPreCombineField(cfg.sourceOrderingField)
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
.setShouldDropPartitionColumns(isDropPartitionColumns())
.initTable(new Configuration(jssc.hadoopConfiguration()),
cfg.targetBasePath);
initializeEmptyTable();
}
}

private void initializeEmptyTable() throws IOException {
this.commitTimelineOpt = Option.empty();
this.allCommitsTimelineOpt = Option.empty();
String partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(cfg.payloadClassName)
.setBaseFileFormat(cfg.baseFileFormat)
.setPartitionFields(partitionColumns)
.setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
.setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))
.setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(),
SimpleKeyGenerator.class.getName()))
.setPreCombineField(cfg.sourceOrderingField)
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
.setShouldDropPartitionColumns(isDropPartitionColumns())
.initTable(new Configuration(jssc.hadoopConfiguration()),
cfg.targetBasePath);
}

/**
* Run one round of delta sync and return new compaction instant if one got scheduled.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -85,6 +84,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.lang.String.format;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;

/**
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target
* table. Does not maintain any state, queries at runtime to see how far behind the target table is from the source
Expand Down Expand Up @@ -618,7 +620,7 @@ public static class DeltaSyncService extends HoodieAsyncService {
/**
* Table Type.
*/
private final HoodieTableType tableType;
private HoodieTableType tableType;

/**
* Delta Sync.
Expand All @@ -638,32 +640,35 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config
TerminationStrategyUtils.createPostWriteTerminationStrategy(properties.get(), cfg.postWriteTerminationStrategyClass);

if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
tableType = meta.getTableType();
// This will guarantee there is no surprise with table type
ValidationUtils.checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.tableType)),
"Hoodie table is of type " + tableType + " but passed in CLI argument is " + cfg.tableType);

// Load base file format
// This will guarantee there is no surprise with base file type
String baseFileFormat = meta.getTableConfig().getBaseFileFormat().toString();
ValidationUtils.checkArgument(baseFileFormat.equals(cfg.baseFileFormat) || cfg.baseFileFormat == null,
"Hoodie table's base file format is of type " + baseFileFormat + " but passed in CLI argument is "
+ cfg.baseFileFormat);
cfg.baseFileFormat = baseFileFormat;
this.cfg.baseFileFormat = baseFileFormat;
Map<String,String> propsToValidate = new HashMap<>();
properties.get().forEach((k,v) -> propsToValidate.put(k.toString(),v.toString()));
HoodieWriterUtils.validateTableConfig(this.sparkSession, org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), meta.getTableConfig());
} else {
tableType = HoodieTableType.valueOf(cfg.tableType);
if (cfg.baseFileFormat == null) {
cfg.baseFileFormat = "PARQUET"; // default for backward compatibility
try {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
tableType = meta.getTableType();
// This will guarantee there is no surprise with table type
checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.tableType)), "Hoodie table is of type " + tableType + " but passed in CLI argument is " + cfg.tableType);

// Load base file format
// This will guarantee there is no surprise with base file type
String baseFileFormat = meta.getTableConfig().getBaseFileFormat().toString();
checkArgument(baseFileFormat.equals(cfg.baseFileFormat) || cfg.baseFileFormat == null,
format("Hoodie table's base file format is of type %s but passed in CLI argument is %s", baseFileFormat, cfg.baseFileFormat));
cfg.baseFileFormat = baseFileFormat;
this.cfg.baseFileFormat = baseFileFormat;
Map<String, String> propsToValidate = new HashMap<>();
properties.get().forEach((k, v) -> propsToValidate.put(k.toString(), v.toString()));
HoodieWriterUtils.validateTableConfig(this.sparkSession, org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), meta.getTableConfig());
} catch (HoodieIOException e) {
LOG.warn("Full exception msg " + e.getLocalizedMessage() + ", msg " + e.getMessage());
if (e.getMessage().contains("Could not load Hoodie properties") && e.getMessage().contains(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
initializeTableTypeAndBaseFileFormat();
} else {
throw e;
}
}
} else {
initializeTableTypeAndBaseFileFormat();
}

ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != WriteOperationType.UPSERT,
checkArgument(!cfg.filterDupes || cfg.operation != WriteOperationType.UPSERT,
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");

this.props = properties.get();
Expand All @@ -681,6 +686,13 @@ public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, F
this(cfg, jssc, fs, conf, Option.empty());
}

private void initializeTableTypeAndBaseFileFormat() {
tableType = HoodieTableType.valueOf(cfg.tableType);
if (cfg.baseFileFormat == null) {
cfg.baseFileFormat = "PARQUET"; // default for backward compatibility
}
}

public DeltaSync getDeltaSync() {
return deltaSync;
}
Expand Down Expand Up @@ -824,7 +836,7 @@ protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) {
.setBasePath(cfg.targetBasePath)
.setLoadActiveTimelineOnLoad(true).build();
List<HoodieInstant> pending = ClusteringUtils.getPendingClusteringInstantTimes(meta);
LOG.info(String.format("Found %d pending clustering instants ", pending.size()));
LOG.info(format("Found %d pending clustering instants ", pending.size()));
pending.forEach(hoodieInstant -> asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
asyncClusteringService.get().start(error -> true);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -1853,6 +1854,62 @@ public void testParquetDFSSourceForEmptyBatch() throws Exception {
testParquetDFSSource(false, null, true);
}

@Test
public void testDeltaStreamerRestartAfterMissingHoodieProps() throws Exception {
testDeltaStreamerRestartAfterMissingHoodieProps(true);
}

@Test
public void testDeltaStreamerRestartAfterMissingHoodiePropsAfterValidCommit() throws Exception {
testDeltaStreamerRestartAfterMissingHoodieProps(false);
}

private void testDeltaStreamerRestartAfterMissingHoodieProps(boolean testInitFailure) throws Exception {
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfs" + testNum;
int parquetRecordsCount = 10;
boolean hasTransformer = false;
boolean useSchemaProvider = false;
prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null);
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false, "partition_path", "0");

String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testInitFailure ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();

if (testInitFailure) {
FileStatus[] fileStatuses = dfs.listStatus(new Path(tableBasePath + "/.hoodie/"));
Arrays.stream(fileStatuses).filter(entry -> entry.getPath().getName().contains("commit") || entry.getPath().getName().contains("inflight")).forEach(entry -> {
try {
dfs.delete(entry.getPath());
} catch (IOException e) {
e.printStackTrace();
}
});
}
// delete hoodie.properties
dfs.delete(new Path(tableBasePath + "/.hoodie/hoodie.properties"));

// restart the pipeline.
if (testInitFailure) { // should succeed.
deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
} else {
assertThrows(org.apache.hudi.exception.HoodieIOException.class, () -> new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc));
}
testNum++;
}

@Test
public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
Expand Down