Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ public void setSuccessRecordKeys(List<String> successRecordKeys) {
this.successRecordKeys = successRecordKeys;
}

public double getFailureFraction() {
return failureFraction;
}

public boolean isTrackSuccessRecords() {
return trackSuccessRecords;
}

@Override
public String toString() {
return "PartitionPath " + partitionPath + ", FileID " + fileId + ", Success records "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,9 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("When true, spins up an instance of the timeline server (meta server that serves cached file listings, statistics),"
+ "running on each writer's driver process, accepting requests during the write from executors.");

public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty
public static final ConfigProperty<Boolean> EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty
.key("hoodie.embed.timeline.server.reuse.enabled")
.defaultValue("false")
.defaultValue(false)
.withDocumentation("Controls whether the timeline server instance should be cached and reused across the JVM (across task lifecycles)"
+ "to avoid startup costs. This should rarely be changed.");

Expand Down Expand Up @@ -1084,7 +1084,7 @@ public boolean isEmbeddedTimelineServerEnabled() {
}

public boolean isEmbeddedTimelineServerReuseEnabled() {
return Boolean.parseBoolean(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED));
return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do getBooleanOrDefault, otherwise it might NPE (due to unboxing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED is a ConfigProperty, getBoolean will handle this.

public <T> Boolean getBoolean(ConfigProperty<T> configProperty) {
    if (configProperty.hasDefaultValue()) {
      return getBooleanOrDefault(configProperty);
    }
    Option<Object> rawValue = getRawValue(configProperty);
    return rawValue.map(v -> Boolean.parseBoolean(v.toString())).orElse(null);
  }

}

public int getEmbeddedTimelineServerPort() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FutureUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand All @@ -47,8 +48,11 @@
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
Expand All @@ -68,9 +72,13 @@
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -98,10 +106,18 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final Hood
// execute clustering for each group async and collect WriteStatus
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
instantTime))
.map(inputGroup -> {
if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's abstract this as a method in WriteConfig

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieWriteConfig holds common hudi configure. while hoodie.datasource.write.row.writer.enable is specially for spark, moving this config from DataSourceOptions to HoodieWriteConfig maybe not good?

return runClusteringForGroupAsyncWithRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extract common expression (to shouldPreserveMetadata)

instantTime);
}
return runClusteringForGroupAsyncWithRDD(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
instantTime);
})
.collect(Collectors.toList()))
.join()
.stream();
Expand All @@ -113,6 +129,15 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final Hood
return writeMetadata;
}

/**
* Execute clustering to write inputRecords into new files based on strategyParams.
* Different from {@link performClusteringWithRecordsRDD}, this method take {@link Dataset<Row>}
* as inputs.
*/
public abstract HoodieData<WriteStatus> performClusteringWithRecordsRow(final Dataset<Row> inputRecords, final int numOutputGroups, final String instantTime,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This style of wrapping (while acceptable under recognized code style-guide) makes it quite hard to read on laptop screen:

Screen Shot 2022-09-12 at 5 48 53 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stacked one for comparison:

Screen Shot 2022-09-12 at 5 50 34 PM

final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata);

/**
* Execute clustering to write inputRecords into new files as defined by rules in strategy parameters.
* The number of new file groups created is bounded by numOutputGroups.
Expand All @@ -131,14 +156,25 @@ public abstract HoodieData<WriteStatus> performClusteringWithRecordsRDD(final Ho
final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata);

protected BulkInsertPartitioner<Dataset<Row>> getRowPartitioner(Map<String, String> strategyParams,
Schema schema) {
return getPartitioner(strategyParams, schema, true);
}

protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getRDDPartitioner(Map<String, String> strategyParams,
Schema schema) {
return getPartitioner(strategyParams, schema, false);
}

/**
* Create {@link BulkInsertPartitioner} based on strategy params.
*
* @param strategyParams Strategy parameters containing columns to sort the data by when clustering.
* @param schema Schema of the data including metadata fields.
* @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty.
*/
protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategyParams,
Schema schema,
boolean isRowPartitioner) {
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));
Expand All @@ -148,29 +184,34 @@ protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getPartitioner(Map<Str
switch (layoutOptStrategy) {
case ZORDER:
case HILBERT:
return new RDDSpatialCurveSortPartitioner(
return isRowPartitioner
? new RowSpatialCurveSortPartitioner(getWriteConfig())
: new RDDSpatialCurveSortPartitioner(
(HoodieSparkEngineContext) getEngineContext(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix alignment

orderByColumns,
layoutOptStrategy,
getWriteConfig().getLayoutOptimizationCurveBuildMethod(),
HoodieAvroUtils.addMetadataFields(schema));
case LINEAR:
return new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema),
return isRowPartitioner
? new RowCustomColumnsSortPartitioner(orderByColumns)
: new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema),
getWriteConfig().isConsistentLogicalTimestampEnabled());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

default:
throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
}
}).orElse(BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()));
}).orElse(isRowPartitioner ? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig().getBulkInsertSortMode()) :
BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()));
}

/**
* Submit job to execute clustering for the group.
* Submit job to execute clustering for the group with RDD APIs.
*/
private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
boolean preserveHoodieMetadata, String instantTime) {
private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncWithRDD(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's stay consistent with the rest of the codebase how we identify row or avro workflows:

  • Avro/HoodieRecord keep their existing names
  • Row counterparts get AsRow suffix

In this case suggest going w/ runClusteringForGroupAsync for existing one and runClusteringForGroupAsyncAsRow for the new one.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, will change it to stay consistent with other codes(though I think adding RDD suffix look more clear, causing it takes the same params, as well as returning the HoodieData with AsRow method)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RDD suffix is misleading though (Row based ones are also relying on RDD internally)

boolean preserveHoodieMetadata, String instantTime) {
return CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime);
HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroupAsRDD(jsc, clusteringGroup, instantTime);
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
.map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))
Expand All @@ -179,10 +220,26 @@ private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(Ho
});
}

/**
* Submit job to execute clustering for the group with dataset APIs.
*/
private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncWithRow(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
boolean preserveHoodieMetadata, String instantTime) {
return CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
Dataset<Row> inputRecords = readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime);
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
.map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))
.collect(Collectors.toList());
return performClusteringWithRecordsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata);
});
}

/**
* Get RDD of all records for the group. This includes all records from file slice (Apply updates from log files, if any).
*/
private HoodieData<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
private HoodieData<HoodieRecord<T>> readRecordsForGroupAsRDD(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
if (hasLogFiles) {
Expand Down Expand Up @@ -273,6 +330,60 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContex
.map(record -> transform(record, writeConfig)));
}

/**
* Get dataset of all records for the group. This includes all records from file slice (Apply updates from log files, if any).
*/
private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc,
HoodieClusteringGroup clusteringGroup,
String instantTime) {
List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream()
.map(ClusteringOperation::create).collect(Collectors.toList());
boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
SQLContext sqlContext = new SQLContext(jsc.sc());

String[] baseFilePaths = clusteringOps
.stream()
.map(op -> {
ArrayList<String> readPaths = new ArrayList<>();
if (op.getBootstrapFilePath() != null) {
readPaths.add(op.getBootstrapFilePath());
}
if (op.getDataFilePath() != null) {
readPaths.add(op.getDataFilePath());
}
return readPaths;
})
.flatMap(Collection::stream)
.filter(path -> !path.isEmpty())
.toArray(String[]::new);
String[] deltaPaths = clusteringOps
.stream()
.filter(op -> !op.getDeltaFilePaths().isEmpty())
.flatMap(op -> op.getDeltaFilePaths().stream())
.toArray(String[]::new);

Dataset<Row> inputRecords;
if (hasLogFiles) {
String compactionFractor = Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only part that differs b/w these branches are the options composition. Let's extract the common part and only keep options composition under conditional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.orElse("0.75");
String[] paths = CollectionUtils.combine(baseFilePaths, deltaPaths);
inputRecords = sqlContext.read()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same feedback as with the write-path: we can't use Spark DataSource in here for mostly the same reasons -- it violates layering and could lead to subtle bugs.

Instead let's extract following portion of the createRelation method and reuse it directly here:
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala#L125

Like following:

val relation = DefaultSource.createRelation(...)
val df = sparkSession.baseRelationToDataFrame(relation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I was also thinking maybe we can change here as well when fixing the write-path. But I found it quite difficult, if we want to access DefaultSource here, we may need to move many dependent codes(it might be whole hudi-spark-common package) from package hudi-spark-common to hudi-spark-client(like BaseFileOnlyRelation, HoodieFileIndex, etc), and I'm not sure whether it works even we move them, it might meet other problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. We shouldn't be moving any of these classes, we can use SparkAdapter to provide us w/ an interface to access it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!I'll take a try

.format("org.apache.hudi")
.option("hoodie.datasource.query.type", "snapshot")
.option("compaction.memory.fraction", compactionFractor)
.option("as.of.instant", instantTime)
.option("hoodie.datasource.read.paths", String.join(",", paths))
.load();
} else {
inputRecords = sqlContext.read()
.format("org.apache.hudi")
.option("as.of.instant", instantTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already collect all parquet files which need to be clustering, do wil still need to set "as.of.instant" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just setting this to keep commits align with the parquet files, should no harm if we remove it, but we better align them same, what do you think?

.option("hoodie.datasource.read.paths", String.join(",", baseFilePaths))
.load();
}
return inputRecords;
}

/**
* Stream to array conversion with generic type is not straightforward.
* Implement a utility method to abstract high level logic. This needs to be improved in future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;

import org.apache.avro.Schema;
import org.apache.hudi.table.action.commit.SparkBulkInsertRowWriter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.List;
import java.util.Map;
Expand All @@ -54,6 +57,35 @@ public SparkSingleFileSortExecutionStrategy(HoodieTable table,
super(table, engineContext, writeConfig);
}

@Override
public HoodieData<WriteStatus> performClusteringWithRecordsRow(Dataset<Row> inputRecords,
int numOutputGroups,
String instantTime,
Map<String, String> strategyParams,
Schema schema,
List<HoodieFileGroupId> fileGroupIdList,
boolean preserveHoodieMetadata) {
if (numOutputGroups != 1 || fileGroupIdList.size() != 1) {
throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName());
}
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);

HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();

boolean shouldPreserveHoodieMetadata = preserveHoodieMetadata;
if (!newConfig.populateMetaFields() && preserveHoodieMetadata) {
LOG.warn("Will setting preserveHoodieMetadata to false as populateMetaFields is false");
shouldPreserveHoodieMetadata = false;
}

newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's duplicate the comment from the original method as well


return SparkBulkInsertRowWriter.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata);
}

@Override
public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> inputRecords,
int numOutputGroups,
Expand All @@ -74,6 +106,6 @@ public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<Hoodie
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));

return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
}
}
Loading