diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index 3f208a0f86a09..b41747d83a85e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -135,7 +135,8 @@ protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoa return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath()) .setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) - .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build(); + .setFileSystemRetryConfig(config.getFileSystemRetryConfig()) + .setProperties(config.getProps()).build(); } public Option getTimelineServer() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 41bcf001a0b6d..2974cc2ef6d6f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -459,6 +459,9 @@ private Stream getCommitInstantsToArchive() { private Stream getInstantsToArchive() { Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); + if (config.isMetastoreEnabled()) { + return Stream.empty(); + } // For archiving and cleaning instants, we need to include intermediate state files if they exist HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 3eeb99044b29d..dd5c0bfd6ded3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieMetastoreConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.ConsistencyGuardConfig; @@ -495,6 +496,7 @@ public class HoodieWriteConfig extends HoodieConfig { private FileSystemViewStorageConfig viewStorageConfig; private HoodiePayloadConfig hoodiePayloadConfig; private HoodieMetadataConfig metadataConfig; + private HoodieMetastoreConfig metastoreConfig; private HoodieCommonConfig commonConfig; private EngineType engineType; @@ -886,6 +888,7 @@ protected HoodieWriteConfig(EngineType engineType, Properties props) { this.viewStorageConfig = clientSpecifiedViewStorageConfig; this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build(); this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build(); + this.metastoreConfig = HoodieMetastoreConfig.newBuilder().fromProperties(props).build(); this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build(); } @@ -2100,6 +2103,13 @@ public HoodieStorageLayout.LayoutType getLayoutType() { return HoodieStorageLayout.LayoutType.valueOf(getString(HoodieLayoutConfig.LAYOUT_TYPE)); } + /** + * Metastore configs. + */ + public boolean isMetastoreEnabled() { + return metastoreConfig.enableMetastore(); + } + public static class Builder { protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index 71efe89a055e1..20e3bd4c14ac3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -63,7 +63,8 @@ public static HoodieSparkTable create(HoodieW HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath()) .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) - .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build(); + .setFileSystemRetryConfig(config.getFileSystemRetryConfig()) + .setProperties(config.getProps()).build(); return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetastoreConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetastoreConfig.java new file mode 100644 index 0000000000000..36e2798a4d32a --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetastoreConfig.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.config; + +import javax.annotation.concurrent.Immutable; +import java.util.Properties; + +/** + * Configurations used by the HUDI Metastore. + */ +@Immutable +@ConfigClassProperty(name = "Metastore Configs", + groupName = ConfigGroups.Names.WRITE_CLIENT, + description = "Configurations used by the Hudi Metastore.") +public class HoodieMetastoreConfig extends HoodieConfig { + + public static final String METASTORE_PREFIX = "hoodie.metastore"; + + public static final ConfigProperty METASTORE_ENABLE = ConfigProperty + .key(METASTORE_PREFIX + ".enable") + .defaultValue(false) + .withDocumentation("Use metastore server to store hoodie table metadata"); + + public static final ConfigProperty METASTORE_URLS = ConfigProperty + .key(METASTORE_PREFIX + ".uris") + .defaultValue("thrift://localhost:9090") + .withDocumentation("Metastore server uris"); + + public static final ConfigProperty METASTORE_CONNECTION_RETRIES = ConfigProperty + .key(METASTORE_PREFIX + ".connect.retries") + .defaultValue(3) + .withDocumentation("Number of retries while opening a connection to metastore"); + + public static final ConfigProperty METASTORE_CONNECTION_RETRY_DELAY = ConfigProperty + .key(METASTORE_PREFIX + ".connect.retry.delay") + .defaultValue(1) + .withDocumentation("Number of seconds for the client to wait between consecutive connection attempts"); + + public static HoodieMetastoreConfig.Builder newBuilder() { + return new HoodieMetastoreConfig.Builder(); + } + + public boolean enableMetastore() { + return getBoolean(METASTORE_ENABLE); + } + + public String getMetastoreUris() { + return getStringOrDefault(METASTORE_URLS); + } + + public int getConnectionRetryLimit() { + return getIntOrDefault(METASTORE_CONNECTION_RETRIES); + } + + public int getConnectionRetryDelay() { + return getIntOrDefault(METASTORE_CONNECTION_RETRY_DELAY); + } + + public static class Builder { + private final HoodieMetastoreConfig config = new HoodieMetastoreConfig(); + + public Builder fromProperties(Properties props) { + this.config.getProps().putAll(props); + return this; + } + + public Builder setUris(String uris) { + config.setValue(METASTORE_URLS, uris); + return this; + } + + public HoodieMetastoreConfig build() { + config.setDefaults(HoodieMetastoreConfig.class.getName()); + return config; + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 546ddf7a30671..9945eb0650feb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.HoodieMetastoreConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; @@ -38,6 +39,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; @@ -98,21 +100,22 @@ public class HoodieTableMetaClient implements Serializable { // NOTE: Since those two parameters lay on the hot-path of a lot of computations, we // use tailored extension of the {@code Path} class allowing to avoid repetitive // computations secured by its immutability - private SerializablePath basePath; - private SerializablePath metaPath; + protected SerializablePath basePath; + protected SerializablePath metaPath; private transient HoodieWrapperFileSystem fs; private boolean loadActiveTimelineOnLoad; - private SerializableConfiguration hadoopConf; + protected SerializableConfiguration hadoopConf; private HoodieTableType tableType; private TimelineLayoutVersion timelineLayoutVersion; - private HoodieTableConfig tableConfig; - private HoodieActiveTimeline activeTimeline; + protected HoodieTableConfig tableConfig; + protected HoodieActiveTimeline activeTimeline; private HoodieArchivedTimeline archivedTimeline; private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build(); + protected HoodieMetastoreConfig metastoreConfig; - private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, + protected HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig) { LOG.info("Loading HoodieTableMetaClient from " + basePath); @@ -367,6 +370,13 @@ public synchronized HoodieArchivedTimeline getArchivedTimeline() { return archivedTimeline; } + public HoodieMetastoreConfig getMetastoreConfig() { + if (metastoreConfig == null) { + metastoreConfig = new HoodieMetastoreConfig(); + } + return metastoreConfig; + } + /** * Returns fresh new archived commits as a timeline from startTs (inclusive). * @@ -451,7 +461,8 @@ public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hado HoodieTableConfig.create(fs, metaPathDir, props); // We should not use fs.getConf as this might be different from the original configuration // used to create the fs in unit tests - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath) + .setProperties(props).build(); LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath); return metaClient; } @@ -620,6 +631,21 @@ public void initializeBootstrapDirsIfNotExists() throws IOException { initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(), getFs()); } + private static HoodieTableMetaClient newMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, + ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, + String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig, Properties props) { + HoodieMetastoreConfig metastoreConfig = null == props + ? new HoodieMetastoreConfig.Builder().build() + : new HoodieMetastoreConfig.Builder().fromProperties(props).build(); + return metastoreConfig.enableMetastore() + ? (HoodieTableMetaClient) ReflectionUtils.loadClass("org.apache.hudi.common.table.HoodieTableMetastoreClient", + new Class[]{Configuration.class, ConsistencyGuardConfig.class, FileSystemRetryConfig.class, String.class, String.class, HoodieMetastoreConfig.class}, + conf, consistencyGuardConfig, fileSystemRetryConfig, + props.getProperty(HoodieTableConfig.DATABASE_NAME.key()), props.getProperty(HoodieTableConfig.NAME.key()), metastoreConfig) + : new HoodieTableMetaClient(conf, basePath, + loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig); + } + public static Builder builder() { return new Builder(); } @@ -636,6 +662,7 @@ public static class Builder { private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build(); private Option layoutVersion = Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION); + private Properties props; public Builder setConf(Configuration conf) { this.conf = conf; @@ -672,11 +699,16 @@ public Builder setLayoutVersion(Option layoutVersion) { return this; } + public Builder setProperties(Properties properties) { + this.props = properties; + return this; + } + public HoodieTableMetaClient build() { ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init HoodieTableMetaClient"); ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient"); - return new HoodieTableMetaClient(conf, basePath, - loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig); + return newMetaClient(conf, basePath, + loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig, props); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index d912525fe9271..a62068e655e5d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -245,7 +245,7 @@ public void deleteInstantFileIfExists(HoodieInstant instant) { } } - private void deleteInstantFile(HoodieInstant instant) { + protected void deleteInstantFile(HoodieInstant instant) { LOG.info("Deleting instant " + instant); Path inFlightCommitFilePath = getInstantFileNamePath(instant.getFileName()); try { @@ -536,7 +536,7 @@ private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, transitionState(fromInstant, toInstant, data, false); } - private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data, + protected void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data, boolean allowRedundantTransitions) { ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp())); try { @@ -566,7 +566,7 @@ private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, } } - private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) { + protected void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) { ValidationUtils.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp())); Path inFlightCommitFilePath = getInstantFileNamePath(inflight.getFileName()); Path commitFilePath = getInstantFileNamePath(completed.getFileName()); @@ -632,7 +632,7 @@ public void saveToCompactionRequested(HoodieInstant instant, Option cont } /** - * Saves content for inflight/requested REPLACE instant. + * Saves content for requested REPLACE instant. */ public void saveToPendingReplaceCommit(HoodieInstant instant, Option content) { ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)); @@ -719,7 +719,7 @@ public void saveToPendingIndexAction(HoodieInstant instant, Option conte createFileInMetaPath(instant.getFileName(), content, false); } - private void createFileInMetaPath(String filename, Option content, boolean allowOverwrite) { + protected void createFileInMetaPath(String filename, Option content, boolean allowOverwrite) { Path fullPath = getInstantFileNamePath(filename); if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) { FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, content); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 4683fd6919ab4..35fda6c416ac7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -20,12 +20,14 @@ import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieMetastoreConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.function.SerializableSupplier; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Functions.Function2; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.metadata.HoodieMetadataFileSystemView; import org.apache.hudi.metadata.HoodieTableMetadata; @@ -59,6 +61,8 @@ public class FileSystemViewManager { private static final Logger LOG = LogManager.getLogger(FileSystemViewManager.class); + private static final String HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS = "org.apache.hudi.common.table.view.HoodieMetastoreFileSystemView"; + private final SerializableConfiguration conf; // The View Storage config used to store file-system views private final FileSystemViewStorageConfig viewStorageConfig; @@ -165,6 +169,11 @@ private static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieMeta return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), metadataSupplier.get()); } + if (metaClient.getMetastoreConfig().enableMetastore()) { + return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS, + new Class[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetastoreConfig.class}, + metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig()); + } return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled()); } @@ -184,6 +193,11 @@ public static HoodieTableFileSystemView createInMemoryFileSystemViewWithTimeline if (metadataConfig.enabled()) { return new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig); } + if (metaClient.getMetastoreConfig().enableMetastore()) { + return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS, + new Class[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetadataConfig.class}, + metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig()); + } return new HoodieTableFileSystemView(metaClient, timeline); }