Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoa
return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
.setProperties(config.getProps()).build();
}

public Option<EmbeddedTimelineService> getTimelineServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {

private Stream<HoodieInstant> getInstantsToArchive() {
Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
if (config.isMetastoreEnabled()) {
return Stream.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is isMetastoreEnabled turned on and Stream.empty() is returned here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For hudi metastore, timeline no more need to be archived

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For metastore implementation, is it possible to have something new like MetastoreHoodieTiimelineArchiver implement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetastoreHoodieTiimelineArchiver

This is a good idea and I think it can be implemented in steps.

}

// For archiving and cleaning instants, we need to include intermediate state files if they exist
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieMetastoreConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
Expand Down Expand Up @@ -495,6 +496,7 @@ public class HoodieWriteConfig extends HoodieConfig {
private FileSystemViewStorageConfig viewStorageConfig;
private HoodiePayloadConfig hoodiePayloadConfig;
private HoodieMetadataConfig metadataConfig;
private HoodieMetastoreConfig metastoreConfig;
private HoodieCommonConfig commonConfig;
private EngineType engineType;

Expand Down Expand Up @@ -886,6 +888,7 @@ protected HoodieWriteConfig(EngineType engineType, Properties props) {
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build();
this.metastoreConfig = HoodieMetastoreConfig.newBuilder().fromProperties(props).build();
this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build();
}

Expand Down Expand Up @@ -2100,6 +2103,13 @@ public HoodieStorageLayout.LayoutType getLayoutType() {
return HoodieStorageLayout.LayoutType.valueOf(getString(HoodieLayoutConfig.LAYOUT_TYPE));
}

/**
* Metastore configs.
*/
public boolean isMetastoreEnabled() {
return metastoreConfig.enableMetastore();
}

public static class Builder {

protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieW
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
.setProperties(config.getProps()).build();
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.config;

import javax.annotation.concurrent.Immutable;
import java.util.Properties;

/**
* Configurations used by the HUDI Metastore.
*/
@Immutable
@ConfigClassProperty(name = "Metastore Configs",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Configurations used by the Hudi Metastore.")
public class HoodieMetastoreConfig extends HoodieConfig {

public static final String METASTORE_PREFIX = "hoodie.metastore";

public static final ConfigProperty<Boolean> METASTORE_ENABLE = ConfigProperty
.key(METASTORE_PREFIX + ".enable")
.defaultValue(false)
.withDocumentation("Use metastore server to store hoodie table metadata");

public static final ConfigProperty<String> METASTORE_URLS = ConfigProperty
.key(METASTORE_PREFIX + ".uris")
.defaultValue("thrift://localhost:9090")
.withDocumentation("Metastore server uris");

public static final ConfigProperty<Integer> METASTORE_CONNECTION_RETRIES = ConfigProperty
.key(METASTORE_PREFIX + ".connect.retries")
.defaultValue(3)
.withDocumentation("Number of retries while opening a connection to metastore");

public static final ConfigProperty<Integer> METASTORE_CONNECTION_RETRY_DELAY = ConfigProperty
.key(METASTORE_PREFIX + ".connect.retry.delay")
.defaultValue(1)
.withDocumentation("Number of seconds for the client to wait between consecutive connection attempts");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we also need the parameter 'connectionlifetimeinmillis' to control MetaStore Client socket lifetime. WDYT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestions. Will add it in the following PR #5064

public static HoodieMetastoreConfig.Builder newBuilder() {
return new HoodieMetastoreConfig.Builder();
}

public boolean enableMetastore() {
return getBoolean(METASTORE_ENABLE);
}

public String getMetastoreUris() {
return getStringOrDefault(METASTORE_URLS);
}

public int getConnectionRetryLimit() {
return getIntOrDefault(METASTORE_CONNECTION_RETRIES);
}

public int getConnectionRetryDelay() {
return getIntOrDefault(METASTORE_CONNECTION_RETRY_DELAY);
}

public static class Builder {
private final HoodieMetastoreConfig config = new HoodieMetastoreConfig();

public Builder fromProperties(Properties props) {
this.config.getProps().putAll(props);
return this;
}

public Builder setUris(String uris) {
config.setValue(METASTORE_URLS, uris);
return this;
}

public HoodieMetastoreConfig build() {
config.setDefaults(HoodieMetastoreConfig.class.getName());
return config;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.table;

import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetastoreConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -38,6 +39,7 @@
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -98,21 +100,22 @@ public class HoodieTableMetaClient implements Serializable {
// NOTE: Since those two parameters lay on the hot-path of a lot of computations, we
// use tailored extension of the {@code Path} class allowing to avoid repetitive
// computations secured by its immutability
private SerializablePath basePath;
private SerializablePath metaPath;
protected SerializablePath basePath;
protected SerializablePath metaPath;

private transient HoodieWrapperFileSystem fs;
private boolean loadActiveTimelineOnLoad;
private SerializableConfiguration hadoopConf;
protected SerializableConfiguration hadoopConf;
private HoodieTableType tableType;
private TimelineLayoutVersion timelineLayoutVersion;
private HoodieTableConfig tableConfig;
private HoodieActiveTimeline activeTimeline;
protected HoodieTableConfig tableConfig;
protected HoodieActiveTimeline activeTimeline;
private HoodieArchivedTimeline archivedTimeline;
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
protected HoodieMetastoreConfig metastoreConfig;

private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
protected HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make these variables protected scope ? The class has no sub-classes ?

String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig) {
LOG.info("Loading HoodieTableMetaClient from " + basePath);
Expand Down Expand Up @@ -367,6 +370,13 @@ public synchronized HoodieArchivedTimeline getArchivedTimeline() {
return archivedTimeline;
}

public HoodieMetastoreConfig getMetastoreConfig() {
if (metastoreConfig == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method thread safe ?

metastoreConfig = new HoodieMetastoreConfig();
}
return metastoreConfig;
}

/**
* Returns fresh new archived commits as a timeline from startTs (inclusive).
*
Expand Down Expand Up @@ -451,7 +461,8 @@ public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hado
HoodieTableConfig.create(fs, metaPathDir, props);
// We should not use fs.getConf as this might be different from the original configuration
// used to create the fs in unit tests
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath)
.setProperties(props).build();
LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
return metaClient;
}
Expand Down Expand Up @@ -620,6 +631,21 @@ public void initializeBootstrapDirsIfNotExists() throws IOException {
initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(), getFs());
}

private static HoodieTableMetaClient newMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig, Properties props) {
HoodieMetastoreConfig metastoreConfig = null == props
? new HoodieMetastoreConfig.Builder().build()
: new HoodieMetastoreConfig.Builder().fromProperties(props).build();
return metastoreConfig.enableMetastore()
? (HoodieTableMetaClient) ReflectionUtils.loadClass("org.apache.hudi.common.table.HoodieTableMetastoreClient",
Copy link
Contributor

@XuQianJin-Stars XuQianJin-Stars May 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some instructions to remove this reflection after the HoodieTableMetastoreClient is fully integrated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieTableMetastoreClient is under hudi-metastore, so it has to use reflection here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hudi-metastore

after hudi-metastore ready, can remove this reflection. add some note for this code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hudi-metastore depends on the hudi-common, reflection is necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hudi-metastore depends on the hudi-common, reflection is necessary.

well, ok

new Class<?>[]{Configuration.class, ConsistencyGuardConfig.class, FileSystemRetryConfig.class, String.class, String.class, HoodieMetastoreConfig.class},
conf, consistencyGuardConfig, fileSystemRetryConfig,
props.getProperty(HoodieTableConfig.DATABASE_NAME.key()), props.getProperty(HoodieTableConfig.NAME.key()), metastoreConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check the NPE here ?

: new HoodieTableMetaClient(conf, basePath,
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig);
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -636,6 +662,7 @@ public static class Builder {
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
private Option<TimelineLayoutVersion> layoutVersion = Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
private Properties props;

public Builder setConf(Configuration conf) {
this.conf = conf;
Expand Down Expand Up @@ -672,11 +699,16 @@ public Builder setLayoutVersion(Option<TimelineLayoutVersion> layoutVersion) {
return this;
}

public Builder setProperties(Properties properties) {
this.props = properties;
return this;
}

public HoodieTableMetaClient build() {
ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init HoodieTableMetaClient");
ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient");
return new HoodieTableMetaClient(conf, basePath,
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig);
return newMetaClient(conf, basePath,
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig, props);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void deleteInstantFileIfExists(HoodieInstant instant) {
}
}

private void deleteInstantFile(HoodieInstant instant) {
protected void deleteInstantFile(HoodieInstant instant) {
LOG.info("Deleting instant " + instant);
Path inFlightCommitFilePath = getInstantFileNamePath(instant.getFileName());
try {
Expand Down Expand Up @@ -536,7 +536,7 @@ private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant,
transitionState(fromInstant, toInstant, data, false);
}

private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data,
protected void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data,
boolean allowRedundantTransitions) {
ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
try {
Expand Down Expand Up @@ -566,7 +566,7 @@ private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant,
}
}

private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) {
protected void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) {
ValidationUtils.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make these methods protected ?

Path inFlightCommitFilePath = getInstantFileNamePath(inflight.getFileName());
Path commitFilePath = getInstantFileNamePath(completed.getFileName());
Expand Down Expand Up @@ -632,7 +632,7 @@ public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> cont
}

/**
* Saves content for inflight/requested REPLACE instant.
* Saves content for requested REPLACE instant.
*/
public void saveToPendingReplaceCommit(HoodieInstant instant, Option<byte[]> content) {
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
Expand Down Expand Up @@ -719,7 +719,7 @@ public void saveToPendingIndexAction(HoodieInstant instant, Option<byte[]> conte
createFileInMetaPath(instant.getFileName(), content, false);
}

private void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
protected void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
Path fullPath = getInstantFileNamePath(filename);
if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieMetastoreConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableSupplier;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Functions.Function2;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.metadata.HoodieTableMetadata;
Expand Down Expand Up @@ -59,6 +61,8 @@
public class FileSystemViewManager {
private static final Logger LOG = LogManager.getLogger(FileSystemViewManager.class);

private static final String HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS = "org.apache.hudi.common.table.view.HoodieMetastoreFileSystemView";

private final SerializableConfiguration conf;
// The View Storage config used to store file-system views
private final FileSystemViewStorageConfig viewStorageConfig;
Expand Down Expand Up @@ -165,6 +169,11 @@ private static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieMeta
return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(),
metadataSupplier.get());
}
if (metaClient.getMetastoreConfig().enableMetastore()) {
return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS,
new Class<?>[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetastoreConfig.class},
metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig());
}
return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
}

Expand All @@ -184,6 +193,11 @@ public static HoodieTableFileSystemView createInMemoryFileSystemViewWithTimeline
if (metadataConfig.enabled()) {
return new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig);
}
if (metaClient.getMetastoreConfig().enableMetastore()) {
return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS,
new Class<?>[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetadataConfig.class},
metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig());
}
return new HoodieTableFileSystemView(metaClient, timeline);
}

Expand Down