Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0ebb868
Updated all usages to reference `HoodieSparkKryoRegistrar`
Jan 19, 2023
b52d080
Fixing MT table to be always created in a _reusable_ state by default…
Sep 28, 2022
eb4e2fb
Inlined "reuse" flag to be true by default for `HoodieBackedTableMeta…
Sep 28, 2022
b929ba5
Added `TransientLazy` utility (`Serializable` counterpart of `Lazy`)
Sep 28, 2022
091b81d
Rebased `HoodieTable` to rely on `TransientLazy` to manage lifecycle …
Sep 28, 2022
28c2b1c
Made `SparkRDDReadClient` closeable;
Sep 28, 2022
59df98b
Fixing compilation
Sep 28, 2022
9bf6a20
Fixing `TransientLazy` to disallow eager init
Sep 28, 2022
80525b3
Fixing compilation after rebase
Jan 11, 2023
f1979a2
Fixed `HoodieTable` to be broadcasted to make sure we have single obj…
Jan 12, 2023
c654bac
Revisited `Transient` to allow eager initialization
Jan 12, 2023
1ae9995
Fixed `HoodieTable` to use proper context
Jan 12, 2023
822e1dc
Revisited `HoodieTable` to hold
Jan 12, 2023
df71694
Make `HoodieBackedTableMetadata` serializable
Jan 12, 2023
c76049a
Added `LeakTrackingFSDataInputStream` to track "leaked" (non-closed) …
Jan 14, 2023
10516b1
Fixing compilation
Jan 18, 2023
9ba06fe
Reduced MT buffer size from 10Mb to 10Kb
Jan 14, 2023
e560c75
Fixed `HoodieTable` close to re-throw unchecked
Jan 20, 2023
47d2bcf
Avoid wiring `HoodieSparkSessionExtension` in "hudi-spark-client"
Jan 23, 2023
058a991
Killed superfluous Hadoop's `Configuration`
Jan 24, 2023
c8f0524
Reverted `HoodieTableFileSystemView` to be serializable in `HoodieBac…
Jan 24, 2023
8c7ffe0
Make sure `ByteBuffer` is being duplicated prior to being read out
Jan 24, 2023
9709b1b
Added `destroy` method to `Transient` allowing to free up any referen…
Jan 24, 2023
64f35dc
Revisited `HoodieSparkTable` to hold `Transient` relying on `Broadcas…
Jan 24, 2023
e63c45c
Avoid holding additional `SerializableConfiguration` refs
Jan 24, 2023
e3b6af9
Reverted `HoodieTable` to hold `HoodieTableMetadata` directly instead…
Jan 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
Expand Down Expand Up @@ -85,6 +84,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.util.Transient;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -114,32 +114,42 @@
* @param <K> Type of keys
* @param <O> Type of outputs
*/
public abstract class HoodieTable<T, I, K, O> implements Serializable {
public abstract class HoodieTable<T, I, K, O> implements Serializable, AutoCloseable {

private static final Logger LOG = LogManager.getLogger(HoodieTable.class);

protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final HoodieIndex<?, ?> index;
private SerializableConfiguration hadoopConfiguration;
protected final TaskContextSupplier taskContextSupplier;

private final HoodieTableMetadata metadata;

private final HoodieStorageLayout storageLayout;

private transient FileSystemViewManager viewManager;
protected final transient HoodieEngineContext context;
// NOTE: These are managed by {@code TransientLazy} to implement transient semantic,
// where corresponding values (if initialized) will be dropped when during serialization
// and later re-initialized when accessed again
private final Transient<FileSystemViewManager> viewManager;

private final Transient<HoodieEngineContext> context;

protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
this.config = config;
this.hadoopConfiguration = context.getHadoopConf();
this.context = context;
// NOTE: We keep context as [[Transient]] to make sure we can pass on [[HoodieTable]] object
// from the driver to the executors: we can't propagate whole context to the executor,
// and therefore instead we re-create it as [[HoodieLocalEngineContext]]
this.context = Transient.eager(context, () -> new HoodieLocalEngineContext(metaClient.getHadoopConf()));

HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(config.getMetadataConfig().getProps())
.build();
this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath(),
this.metadata = HoodieTableMetadata.create(context, config.getMetadataConfig(), config.getBasePath(),
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());

this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), () -> metadata);
this.viewManager = Transient.lazy(() ->
// NOTE: It's critical we use {@code getContext()} here since {@code context} is
// also a transient field
FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(),
config.getViewStorageConfig(), config.getCommonConfig(), this::getMetadataTable));

this.metaClient = metaClient;
this.index = getIndex(config, context);
this.storageLayout = getStorageLayout(config);
Expand All @@ -148,19 +158,12 @@ protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, Hoo

protected abstract HoodieIndex<?, ?> getIndex(HoodieWriteConfig config, HoodieEngineContext context);

protected HoodieStorageLayout getStorageLayout(HoodieWriteConfig config) {
return HoodieLayoutFactory.createLayout(config);
}

private synchronized FileSystemViewManager getViewManager() {
if (null == viewManager) {
viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), () -> metadata);
}
return viewManager;
public HoodieTableMetadata getMetadataTable() {
return metadata;
}

public HoodieTableMetadata getMetadata() {
return metadata;
protected HoodieStorageLayout getStorageLayout(HoodieWriteConfig config) {
return HoodieLayoutFactory.createLayout(config);
}

/**
Expand Down Expand Up @@ -303,21 +306,21 @@ public TableFileSystemView getFileSystemView() {
* Get the base file only view of the file system for this table.
*/
public BaseFileOnlyView getBaseFileOnlyView() {
return getViewManager().getFileSystemView(metaClient);
return viewManager.get().getFileSystemView(metaClient);
}

/**
* Get the full view of the file system for this table.
*/
public SliceView getSliceView() {
return getViewManager().getFileSystemView(metaClient);
return viewManager.get().getFileSystemView(metaClient);
}

/**
* Get complete view of the file system for this table with ability to force sync.
*/
public SyncableFileSystemView getHoodieView() {
return getViewManager().getFileSystemView(metaClient);
return viewManager.get().getFileSystemView(metaClient);
}

/**
Expand Down Expand Up @@ -622,8 +625,8 @@ private void rollbackInflightInstant(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
rollback(context, commitTime, inflightInstant, false, false);
scheduleRollback(getContext(), commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
rollback(getContext(), commitTime, inflightInstant, false, false);
getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
}

Expand All @@ -636,8 +639,8 @@ private void rollbackInflightInstant(HoodieInstant inflightInstant,
public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
rollback(context, commitTime, inflightInstant, true, false);
scheduleRollback(getContext(), commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
rollback(getContext(), commitTime, inflightInstant, true, false);
}

/**
Expand Down Expand Up @@ -677,7 +680,7 @@ private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map<Str
* Returns the possible invalid data file name with given marker files.
*/
protected Set<String> getInvalidDataPaths(WriteMarkers markers) throws IOException {
return markers.createdAndMergedDataPaths(context, config.getFinalizeWriteParallelism());
return markers.createdAndMergedDataPaths(getContext(), config.getFinalizeWriteParallelism());
}

/**
Expand Down Expand Up @@ -860,9 +863,7 @@ public boolean requireSortedRecords() {
}

public HoodieEngineContext getContext() {
// This is to handle scenarios where this is called at the executor tasks which do not have access
// to engine context, and it ends up being null (as its not serializable and marked transient here).
return context == null ? new HoodieLocalEngineContext(hadoopConfiguration.get()) : context;
return context.get();
}

/**
Expand Down Expand Up @@ -922,7 +923,7 @@ public void maybeDeleteMetadataTable() {
if (shouldExecuteMetadataTableDeletion()) {
try {
LOG.info("Deleting metadata table because it is disabled in writer.");
deleteMetadataTable(config.getBasePath(), context);
deleteMetadataTable(config.getBasePath(), getContext());
clearMetadataTablePartitionsConfig(Option.empty(), true);
} catch (HoodieMetadataException e) {
throw new HoodieException("Failed to delete metadata table.", e);
Expand All @@ -938,8 +939,8 @@ public void deleteMetadataIndexIfNecessary() {
if (shouldDeleteMetadataPartition(partitionType)) {
try {
LOG.info("Deleting metadata partition because it is disabled in writer: " + partitionType.name());
if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType)) {
deleteMetadataPartition(metaClient.getBasePath(), context, partitionType);
if (metadataPartitionExists(metaClient.getBasePath(), getContext(), partitionType)) {
deleteMetadataPartition(metaClient.getBasePath(), getContext(), partitionType);
}
clearMetadataTablePartitionsConfig(Option.of(partitionType), false);
} catch (HoodieMetadataException e) {
Expand Down Expand Up @@ -1003,11 +1004,25 @@ private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> pa
}
}

public HoodieTableMetadata getMetadataTable() {
return this.metadata;
}

public Runnable getPreExecuteRunnable() {
return Functions.noop();
}

@Override
public void close() {
try {
metadata.close();
} catch (Exception e) {
throw new HoodieException(e);
}
}

protected static HoodieTableMetadata createMetadataTable(HoodieEngineContext context, HoodieWriteConfig config) {
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(config.getMetadataConfig().getProps())
.build();

return HoodieTableMetadata.create(context, metadataConfig, config.getBasePath(),
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context
super(config, context, metaClient);
}

@Override
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just been moved; no changes (there were other changes, but these were reverted back)

protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return FlinkHoodieIndexFactory.createIndex((HoodieFlinkEngineContext) context, config);
}

/**
* Fetch instance of {@link HoodieTableMetadataWriter}.
*
* @return instance of {@link HoodieTableMetadataWriter}
*/
@Override
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) {
if (config.isMetadataTableEnabled()) {
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(getContext().getHadoopConf().get(), config,
getContext(), actionMetadata, Option.of(triggeringInstantTimestamp)));
} else {
return Option.empty();
}
}

public static <T> HoodieFlinkTable<T> create(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
Expand Down Expand Up @@ -87,27 +108,6 @@ public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
return metadata.clone(metadata.getWriteStatuses().collectAsList());
}

@Override
protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return FlinkHoodieIndexFactory.createIndex((HoodieFlinkEngineContext) context, config);
}

/**
* Fetch instance of {@link HoodieTableMetadataWriter}.
*
* @return instance of {@link HoodieTableMetadataWriter}
*/
@Override
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) {
if (config.isMetadataTableEnabled()) {
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
} else {
return Option.empty();
}
}

private static void setLatestInternalSchema(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
Option<InternalSchema> internalSchema = new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
if (internalSchema.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@

public abstract class HoodieJavaTable<T>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {

protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient);
}

@Override
protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No changes

return JavaHoodieIndexFactory.createIndex(config);
}

public static <T> HoodieJavaTable<T> create(HoodieWriteConfig config, HoodieEngineContext context) {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
Expand All @@ -66,9 +72,4 @@ public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
return metadata.clone(metadata.getWriteStatuses().collectAsList());
}

@Override
protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return JavaHoodieIndexFactory.createIndex(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
/**
* Provides an RDD based API for accessing/filtering Hoodie tables, based on keys.
*/
public class SparkRDDReadClient<T> implements Serializable {
public class SparkRDDReadClient<T> implements Serializable, AutoCloseable {

private static final long serialVersionUID = 1L;

Expand All @@ -67,7 +67,7 @@ public class SparkRDDReadClient<T> implements Serializable {
* base path pointing to the table. Until, then just always assume a BloomIndex
*/
private final transient HoodieIndex<?, ?> index;
private HoodieTable hoodieTable;
private final HoodieTable hoodieTable;
private transient Option<SQLContext> sqlContextOpt;
private final transient HoodieSparkEngineContext context;
private final transient Configuration hadoopConf;
Expand Down Expand Up @@ -230,4 +230,9 @@ public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions() {
instantWorkloadPair -> Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue()))
.collect(Collectors.toList());
}

@Override
public void close() throws Exception {
hoodieTable.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class HoodieMetadataBloomIndexCheckFunction implements
// Assuming each file bloom filter takes up 512K, sizing the max file count
// per batch so that the total fetched bloom filters would not cross 128 MB.
private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;

private final HoodieTable hoodieTable;

public HoodieMetadataBloomIndexCheckFunction(HoodieTable hoodieTable) {
Expand Down Expand Up @@ -92,7 +93,9 @@ protected List<HoodieKeyLookupResult> computeNext() {
final String partitionPath = entry._2.getPartitionPath();
final String fileId = entry._1;
if (!fileIDBaseFileMap.containsKey(fileId)) {
Option<HoodieBaseFile> baseFile = hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
Option<HoodieBaseFile> baseFile =
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);

if (!baseFile.isPresent()) {
throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath
+ ", fileId: " + fileId);
Expand Down
Loading