diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupConfigurationException.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupConfigurationException.java
new file mode 100644
index 000000000000..4922edcd0d7d
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupConfigurationException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class BackupConfigurationException extends Exception {
+ public BackupConfigurationException(String message) {
+ super(message);
+ }
+
+ public BackupConfigurationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
new file mode 100644
index 000000000000..943d61e7c8cc
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class BackupFileSystemManager {
+ private static final Logger LOG = LoggerFactory.getLogger(BackupFileSystemManager.class);
+
+ public static final String WALS_DIR = "WALs";
+ public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
+ private final String peerId;
+ private final FileSystem backupFs;
+ private final Path backupRootDir;
+ private Path walsDir;
+ private Path bulkLoadFilesDir;
+
+ public BackupFileSystemManager(String peerId, Configuration conf, String backupRootDirStr)
+ throws IOException {
+ this.peerId = peerId;
+ this.backupRootDir = new Path(backupRootDirStr);
+ this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
+ initBackupDirectories();
+ }
+
+ private void initBackupDirectories() throws IOException {
+ LOG.info("{} Initializing backup directories under root: {}", Utils.logPeerId(peerId),
+ backupRootDir);
+ try {
+ walsDir = createDirectoryIfNotExists(WALS_DIR);
+ bulkLoadFilesDir = createDirectoryIfNotExists(BULKLOAD_FILES_DIR);
+ } catch (IOException e) {
+ LOG.error("{} Failed to initialize backup directories: {}", Utils.logPeerId(peerId),
+ e.getMessage(), e);
+ throw e;
+ }
+ }
+
+ private Path createDirectoryIfNotExists(String dirName) throws IOException {
+ Path dirPath = new Path(backupRootDir, dirName);
+ if (backupFs.exists(dirPath)) {
+ LOG.info("{} Directory already exists: {}", Utils.logPeerId(peerId), dirPath);
+ } else {
+ backupFs.mkdirs(dirPath);
+ LOG.info("{} Successfully created directory: {}", Utils.logPeerId(peerId), dirPath);
+ }
+ return dirPath;
+ }
+
+ public Path getWalsDir() {
+ return walsDir;
+ }
+
+ public Path getBulkLoadFilesDir() {
+ return bulkLoadFilesDir;
+ }
+
+ public FileSystem getBackupFs() {
+ return backupFs;
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
new file mode 100644
index 000000000000..f8c8ff37efae
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase replication.
+ *
+ * This utility class extracts and constructs the file paths of bulk-loaded files based on WAL
+ * entries. It processes bulk load descriptors and their associated store descriptors to generate
+ * the paths for each bulk-loaded file.
+ *
+ * The class is designed for scenarios where replicable bulk load operations need to be parsed and
+ * their file paths need to be determined programmatically.
+ *
+ */
+@InterfaceAudience.Private
+public final class BulkLoadProcessor {
+ private BulkLoadProcessor() {
+ }
+
+ public static List processBulkLoadFiles(TableName tableName, List walEntries)
+ throws IOException {
+ List bulkLoadFilePaths = new ArrayList<>();
+ String namespace = tableName.getNamespaceAsString();
+ String table = tableName.getQualifierAsString();
+
+ for (WAL.Entry entry : walEntries) {
+ WALEdit edit = entry.getEdit();
+ for (Cell cell : edit.getCells()) {
+ if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, table));
+ }
+ }
+ }
+ return bulkLoadFilePaths;
+ }
+
+ private static List processBulkLoadDescriptor(Cell cell, String namespace, String table)
+ throws IOException {
+ List bulkLoadFilePaths = new ArrayList<>();
+ WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+
+ if (bld == null || !bld.getReplicate() || bld.getEncodedRegionName() == null) {
+ return bulkLoadFilePaths; // Skip if not replicable
+ }
+
+ String regionName = bld.getEncodedRegionName().toStringUtf8();
+ for (WALProtos.StoreDescriptor storeDescriptor : bld.getStoresList()) {
+ bulkLoadFilePaths
+ .addAll(processStoreDescriptor(storeDescriptor, namespace, table, regionName));
+ }
+
+ return bulkLoadFilePaths;
+ }
+
+ private static List processStoreDescriptor(WALProtos.StoreDescriptor storeDescriptor,
+ String namespace, String table, String regionName) {
+ List paths = new ArrayList<>();
+ String columnFamily = storeDescriptor.getFamilyName().toStringUtf8();
+
+ for (String storeFile : storeDescriptor.getStoreFileList()) {
+ paths.add(new Path(namespace,
+ new Path(table, new Path(regionName, new Path(columnFamily, storeFile)))));
+ }
+
+ return paths;
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupManager.java
new file mode 100644
index 000000000000..7a1c1bbfc5f5
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupManager.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the continuous backup process for HBase WAL entries and bulk load files.
+ *
+ * This class is responsible for initializing backup components, processing WAL entries, staging
+ * them for backup, and committing the backup to the configured storage. It uses
+ * {@link BackupFileSystemManager} for handling file system operations and
+ * {@link ContinuousBackupStagingManager} for managing staging.
+ *
+ */
+@InterfaceAudience.Private
+public class ContinuousBackupManager {
+ private static final Logger LOG = LoggerFactory.getLogger(ContinuousBackupManager.class);
+ public static final String CONF_BACKUP_ROOT_DIR = "hbase.backup.root.dir";
+ public static final String CONF_BACKUP_MAX_WAL_SIZE = "hbase.backup.max.wal.size";
+ public static final long DEFAULT_MAX_WAL_SIZE = 128 * 1024 * 1024;
+ private final Configuration conf;
+ private final String peerId;
+ private final BackupFileSystemManager backupFileSystemManager;
+ private final ContinuousBackupStagingManager stagingManager;
+
+ /**
+ * Constructs a {@code ContinuousBackupManager} instance with the specified peer ID and
+ * configuration.
+ * @param peerId the unique identifier of the replication peer
+ * @param conf the HBase configuration object
+ * @throws BackupConfigurationException if the backup configuration is invalid
+ */
+ public ContinuousBackupManager(String peerId, Configuration conf)
+ throws BackupConfigurationException {
+ this.peerId = peerId;
+ this.conf = conf;
+ String backupRootDirStr = conf.get(CONF_BACKUP_ROOT_DIR);
+ if (backupRootDirStr == null || backupRootDirStr.isEmpty()) {
+ String errorMsg = Utils.logPeerId(peerId)
+ + " Backup root directory not specified. Set it using " + CONF_BACKUP_ROOT_DIR;
+ LOG.error(errorMsg);
+ throw new BackupConfigurationException(errorMsg);
+ }
+ LOG.debug("{} Backup root directory: {}", Utils.logPeerId(peerId), backupRootDirStr);
+
+ try {
+ this.backupFileSystemManager = new BackupFileSystemManager(peerId, conf, backupRootDirStr);
+ LOG.info("{} BackupFileSystemManager initialized successfully.", Utils.logPeerId(peerId));
+ } catch (IOException e) {
+ String errorMsg = Utils.logPeerId(peerId) + " Failed to initialize BackupFileSystemManager";
+ LOG.error(errorMsg, e);
+ throw new BackupConfigurationException(errorMsg, e);
+ }
+
+ try {
+ this.stagingManager = new ContinuousBackupStagingManager(conf, this);
+ LOG.info("{} ContinuousBackupStagingManager initialized successfully.",
+ Utils.logPeerId(peerId));
+ } catch (IOException e) {
+ String errorMsg = "Failed to initialize ContinuousBackupStagingManager";
+ LOG.error(errorMsg, e);
+ throw new BackupConfigurationException(errorMsg, e);
+ }
+ }
+
+ /**
+ * Backs up the provided WAL entries grouped by table.
+ *
+ * The method processes WAL entries, identifies bulk load files, stages them, and prepares them
+ * for backup.
+ *
+ * @param tableToEntriesMap a map of table names to WAL entries
+ * @throws IOException if an error occurs during the backup process
+ */
+ public void backup(Map> tableToEntriesMap) throws IOException {
+ LOG.debug("{} Starting backup process for {} table(s)", Utils.logPeerId(peerId),
+ tableToEntriesMap.size());
+
+ for (Map.Entry> entry : tableToEntriesMap.entrySet()) {
+ TableName tableName = entry.getKey();
+ List walEntries = entry.getValue();
+
+ LOG.debug("{} Processing {} WAL entries for table: {}", Utils.logPeerId(peerId),
+ walEntries.size(), tableName);
+
+ List bulkLoadFiles = BulkLoadProcessor.processBulkLoadFiles(tableName, walEntries);
+ LOG.debug("{} Identified {} bulk load file(s) for table: {}", Utils.logPeerId(peerId),
+ bulkLoadFiles.size(), tableName);
+
+ stagingManager.stageEntries(tableName, walEntries, bulkLoadFiles);
+ LOG.debug("{} Staged WAL entries and bulk load files for table: {}", Utils.logPeerId(peerId),
+ tableName);
+ }
+
+ LOG.debug("{} Backup process completed for all tables.", Utils.logPeerId(peerId));
+ }
+
+ /**
+ * Commits the backup for a given WAL file and its associated bulk load files.
+ *
+ * This method copies the WAL file and bulk load files from the staging area to the configured
+ * backup directory.
+ *
+ * @param sourceFs the source file system where the files are currently staged
+ * @param walFile the WAL file to back up
+ * @param bulkLoadFiles a list of bulk load files associated with the WAL file
+ * @throws IOException if an error occurs while committing the backup
+ */
+ public void commitBackup(FileSystem sourceFs, Path walFile, List bulkLoadFiles)
+ throws IOException {
+ LOG.debug("{} Starting commit for WAL file: {}", Utils.logPeerId(peerId), walFile);
+
+ Path sourcePath = stagingManager.getWalFileStagingPath(walFile);
+ Path backupWalPath = new Path(backupFileSystemManager.getWalsDir(), walFile);
+
+ try {
+ FileUtil.copy(sourceFs, sourcePath, backupFileSystemManager.getBackupFs(), backupWalPath,
+ false, conf);
+ LOG.info("{} WAL file {} successfully backed up to {}", Utils.logPeerId(peerId), walFile,
+ backupWalPath);
+ } catch (IOException e) {
+ LOG.error("{} Failed to back up WAL file: {}", Utils.logPeerId(peerId), walFile, e);
+ throw e;
+ }
+
+ uploadBulkLoadFiles(sourceFs, bulkLoadFiles);
+ LOG.debug("{} Commit completed for WAL file: {}", Utils.logPeerId(peerId), walFile);
+ }
+
+ private void uploadBulkLoadFiles(FileSystem sourceFs, List bulkLoadFiles)
+ throws IOException {
+ for (Path file : bulkLoadFiles) {
+ Path sourcePath = stagingManager.getBulkloadFileStagingPath(file);
+ Path destPath = new Path(backupFileSystemManager.getBulkLoadFilesDir(), file);
+
+ try {
+ FileUtil.copy(sourceFs, sourcePath, backupFileSystemManager.getBackupFs(), destPath, false,
+ conf);
+ LOG.info("{} Bulk load file {} successfully backed up to {}", Utils.logPeerId(peerId), file,
+ destPath);
+ } catch (IOException e) {
+ LOG.error("{} Failed to back up bulk load file: {}", Utils.logPeerId(peerId), file, e);
+ throw e;
+ }
+ }
+ }
+
+ public void close() {
+ stagingManager.close();
+ }
+
+ public String getPeerId() {
+ return peerId;
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
new file mode 100644
index 000000000000..9d7ad19aec81
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code ContinuousBackupReplicationEndpoint} is a custom implementation of
+ * {@link BaseReplicationEndpoint} designed to integrate with a continuous backup system for HBase's
+ * Write-Ahead Log (WAL) entries.
+ *
+ * This endpoint enables replication of WAL entries to a backup system for disaster recovery or
+ * archival purposes. It is initialized and managed within the HBase replication framework. The
+ * class handles the following tasks:
+ *
+ * - Initialization of the {@link ContinuousBackupManager} responsible for managing backup
+ * operations.
+ * - Processing WAL entries grouped by table and triggering backups via the
+ * {@code ContinuousBackupManager}.
+ * - Graceful startup and shutdown of the replication endpoint, ensuring proper resource
+ * management.
+ *
+ *
+ *
Configuration
The following configuration property is required for this endpoint:
+ *
+ * - {@code hbase.backup.wal.replication.peerUUID}: Specifies the UUID of the replication peer for
+ * this endpoint.
+ *
+ * @see BaseReplicationEndpoint
+ * @see ContinuousBackupManager
+ */
+@InterfaceAudience.Private
+public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContinuousBackupReplicationEndpoint.class);
+ public static final String CONF_PEER_UUID = "hbase.backup.wal.replication.peerUUID";
+ private ContinuousBackupManager continuousBackupManager;
+ private UUID peerUUID;
+
+ @Override
+ public void init(Context context) throws IOException {
+ super.init(context);
+ LOG.info("{} Initializing ContinuousBackupReplicationEndpoint.",
+ Utils.logPeerId(ctx.getPeerId()));
+ Configuration peerConf = this.ctx.getConfiguration();
+
+ setPeerUUID(peerConf);
+
+ Configuration conf = HBaseConfiguration.create(peerConf);
+
+ try {
+ continuousBackupManager = new ContinuousBackupManager(this.ctx.getPeerId(), conf);
+ LOG.info("{} ContinuousBackupManager initialized successfully.",
+ Utils.logPeerId(ctx.getPeerId()));
+ } catch (BackupConfigurationException e) {
+ LOG.error("{} Failed to initialize ContinuousBackupManager due to configuration issues.",
+ Utils.logPeerId(ctx.getPeerId()), e);
+ throw new IOException("Failed to initialize ContinuousBackupManager", e);
+ }
+ }
+
+ @Override
+ public UUID getPeerUUID() {
+ return peerUUID;
+ }
+
+ @Override
+ public void start() {
+ LOG.info("{} Starting ContinuousBackupReplicationEndpoint...",
+ Utils.logPeerId(ctx.getPeerId()));
+ startAsync();
+ }
+
+ @Override
+ protected void doStart() {
+ LOG.info("{} ContinuousBackupReplicationEndpoint started successfully.",
+ Utils.logPeerId(ctx.getPeerId()));
+ notifyStarted();
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ final List entries = replicateContext.getEntries();
+ if (entries.isEmpty()) {
+ LOG.debug("{} No WAL entries to backup.", Utils.logPeerId(ctx.getPeerId()));
+ return true;
+ }
+
+ LOG.info("{} Received {} WAL entries for backup.", Utils.logPeerId(ctx.getPeerId()),
+ entries.size());
+
+ Map> tableToEntriesMap = new HashMap<>();
+ for (WAL.Entry entry : entries) {
+ TableName tableName = entry.getKey().getTableName();
+ tableToEntriesMap.computeIfAbsent(tableName, key -> new ArrayList<>()).add(entry);
+ }
+ LOG.debug("{} WAL entries grouped by table: {}", Utils.logPeerId(ctx.getPeerId()),
+ tableToEntriesMap.keySet());
+
+ try {
+ LOG.debug("{} Starting backup for {} tables.", Utils.logPeerId(ctx.getPeerId()),
+ tableToEntriesMap.size());
+ continuousBackupManager.backup(tableToEntriesMap);
+ LOG.info("{} Backup completed successfully for all tables.",
+ Utils.logPeerId(ctx.getPeerId()));
+ } catch (IOException e) {
+ LOG.error("{} Backup failed for tables: {}. Error details: {}",
+ Utils.logPeerId(ctx.getPeerId()), tableToEntriesMap.keySet(), e.getMessage(), e);
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("{} Stopping ContinuousBackupReplicationEndpoint...",
+ Utils.logPeerId(ctx.getPeerId()));
+ stopAsync();
+ }
+
+ @Override
+ protected void doStop() {
+ if (continuousBackupManager != null) {
+ LOG.info("{} Closing ContinuousBackupManager.", Utils.logPeerId(ctx.getPeerId()));
+ continuousBackupManager.close();
+ }
+ LOG.info("{} ContinuousBackupReplicationEndpoint stopped successfully.",
+ Utils.logPeerId(ctx.getPeerId()));
+ notifyStopped();
+ }
+
+ private void setPeerUUID(Configuration conf) throws IOException {
+ String peerUUIDStr = conf.get(CONF_PEER_UUID);
+ if (peerUUIDStr == null || peerUUIDStr.isEmpty()) {
+ LOG.error("{} Peer UUID is missing. Please specify it with the {} configuration.",
+ Utils.logPeerId(ctx.getPeerId()), CONF_PEER_UUID);
+ throw new IOException("Peer UUID not specified in configuration");
+ }
+ try {
+ peerUUID = UUID.fromString(peerUUIDStr);
+ LOG.info("{} Peer UUID set to {}", Utils.logPeerId(ctx.getPeerId()), peerUUID);
+ } catch (IllegalArgumentException e) {
+ LOG.error("{} Invalid Peer UUID format: {}", Utils.logPeerId(ctx.getPeerId()), peerUUIDStr,
+ e);
+ throw new IOException("Invalid Peer UUID format", e);
+ }
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupStagedHFileCleaner.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupStagedHFileCleaner.java
new file mode 100644
index 000000000000..3ecb597b7f10
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupStagedHFileCleaner.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
+/**
+ * A custom HFile cleaner delegate for continuous backup scenarios in HBase. This cleaner prevents
+ * the deletion of HFiles that are staged for bulk loading as part of the continuous backup process.
+ * It interacts with the HBase `StagedBulkloadFileRegistry` to determine which files should be
+ * retained.
+ *
+ * Implements the {@link BaseHFileCleanerDelegate} for integrating with the HBase cleaner framework
+ * and the {@link Abortable} interface to handle error scenarios gracefully.
+ *
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class ContinuousBackupStagedHFileCleaner extends BaseHFileCleanerDelegate
+ implements Abortable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContinuousBackupStagedHFileCleaner.class);
+
+ private boolean stopped = false;
+ private boolean aborted;
+ private Configuration conf;
+ private Connection connection;
+
+ @Override
+ public void setConf(Configuration config) {
+ this.conf = config;
+ this.connection = null;
+ try {
+ // Establishing HBase connection
+ this.connection = ConnectionFactory.createConnection(conf);
+ LOG.info("HBase connection established successfully.");
+ } catch (IOException ioe) {
+ LOG.error("Couldn't establish connection to HBase", ioe);
+ }
+ }
+
+ @Override
+ public Iterable getDeletableFiles(Iterable files) {
+ if (conf == null) {
+ LOG.warn("Configuration is not set. Returning original list of files.");
+ return files;
+ }
+
+ if (connection == null) {
+ try {
+ connection = ConnectionFactory.createConnection(conf);
+ LOG.info("HBase connection re-established in getDeletableFiles.");
+ } catch (IOException e) {
+ LOG.error("Failed to re-establish HBase connection. Returning no deletable files.", e);
+ return Collections.emptyList();
+ }
+ }
+
+ try {
+ // Fetch staged files from HBase
+ Set stagedFiles = StagedBulkloadFileRegistry.listAllBulkloadFiles(connection);
+ LOG.debug("Fetched {} staged files from HBase.", stagedFiles.size());
+
+ // Extract file names from staged files
+ Set stagedFileNames =
+ stagedFiles.stream().map(path -> new Path(path).getName()).collect(Collectors.toSet());
+
+ // Filter files by checking their file names against staged file names
+ return Iterables.filter(files, file -> !stagedFileNames.contains(file.getPath().getName()));
+ } catch (IOException e) {
+ LOG.error("Failed to fetch staged bulkload files from HBase. Returning no deletable files.",
+ e);
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public boolean isFileDeletable(FileStatus fStat) {
+ // The actual deletion decision is made in getDeletableFiles, so returning true
+ return true;
+ }
+
+ @Override
+ public void stop(String why) {
+ if (stopped) {
+ LOG.debug("Stop method called but the cleaner is already stopped.");
+ return;
+ }
+
+ if (this.connection != null) {
+ try {
+ this.connection.close();
+ LOG.info("HBase connection closed.");
+ } catch (IOException ioe) {
+ LOG.debug("Error closing HBase connection", ioe);
+ }
+ }
+ stopped = true;
+ LOG.info("ContinuousBackupStagedHFileCleaner stopped. Reason: {}", why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.warn("Aborting ContinuousBackupStagedHFileCleaner because: " + why, e);
+ this.aborted = true;
+ stop(why);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupStagingManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupStagingManager.java
new file mode 100644
index 000000000000..c540abf81d51
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupStagingManager.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.*;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the staging and backup of Write-Ahead Logs (WALs) and bulk-loaded files as part of the
+ * continuous backup process in HBase. This class ensures that WALs are staged, flushed, and backed
+ * up safely to support backup and recovery workflows.
+ */
+@InterfaceAudience.Private
+public class ContinuousBackupStagingManager {
+ private static final Logger LOG = LoggerFactory.getLogger(ContinuousBackupStagingManager.class);
+
+ public static final String WALS_BACKUP_STAGING_DIR = "wal-backup-staging";
+ public static final String CONF_STAGED_WAL_FLUSH_INITIAL_DELAY =
+ "hbase.backup.staged.wal.flush.initial.delay.seconds";
+ public static final int DEFAULT_STAGED_WAL_FLUSH_INITIAL_DELAY_SECONDS = 5 * 60; // 5 minutes
+ public static final String CONF_STAGED_WAL_FLUSH_INTERVAL =
+ "hbase.backup.staged.wal.flush.interval.seconds";
+ public static final int DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS = 5 * 60; // 5 minutes
+ public static final int EXECUTOR_TERMINATION_TIMEOUT_SECONDS = 60; // TODO: configurable??
+ public static final String CONF_STATED_FILES_BACKUP_THREADS =
+ "hbase.backup.staged.files.backup.threads";
+ public static final int DEFAULT_STATED_FILES_BACKUP_THREADS = 3;
+
+ private final Configuration conf;
+ private final FileSystem walStagingFs;
+ private final Path walStagingDir;
+ private final ConcurrentHashMap walWriterMap =
+ new ConcurrentHashMap<>();
+ private final ContinuousBackupManager continuousBackupManager;
+ private ScheduledExecutorService flushExecutor;
+ private ExecutorService backupExecutor;
+ private final Set filesCurrentlyBeingBackedUp = ConcurrentHashMap.newKeySet();
+ private final ReentrantLock lock = new ReentrantLock();
+ private final StagedBulkloadFileRegistry stagedBulkloadFileRegistry;
+
+ /**
+ * Constructs a ContinuousBackupStagingManager with the specified configuration and backup
+ * manager.
+ * @param conf the HBase configuration
+ * @param continuousBackupManager the backup manager for continuous backup
+ * @throws IOException if there is an error initializing the WAL staging directory or related
+ * resources
+ */
+ public ContinuousBackupStagingManager(Configuration conf,
+ ContinuousBackupManager continuousBackupManager) throws IOException {
+ this.conf = conf;
+ this.continuousBackupManager = continuousBackupManager;
+ // TODO: configurable??
+ this.walStagingFs = CommonFSUtils.getRootDirFileSystem(conf);
+ this.walStagingDir = new Path(CommonFSUtils.getRootDir(conf),
+ new Path(WALS_BACKUP_STAGING_DIR, continuousBackupManager.getPeerId()));
+
+ ensureHFileCleanerPluginConfigured();
+ initWalStagingDir();
+ startWalFlushExecutor();
+ startBackupExecutor();
+
+ Connection conn = ConnectionFactory.createConnection(conf);
+ this.stagedBulkloadFileRegistry =
+ new StagedBulkloadFileRegistry(conn, continuousBackupManager.getPeerId());
+ }
+
+ private void ensureHFileCleanerPluginConfigured() throws IOException {
+ String plugins = conf.get(MASTER_HFILE_CLEANER_PLUGINS);
+ String cleanerClass = ContinuousBackupStagedHFileCleaner.class.getCanonicalName();
+ if (plugins == null || !plugins.contains(cleanerClass)) {
+ String errorMsg = Utils.logPeerId(continuousBackupManager.getPeerId())
+ + " Continuous Backup Bulk-loaded HFile's Cleaner plugin is invalid or missing: "
+ + cleanerClass;
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ }
+
+ private void initWalStagingDir() throws IOException {
+ if (walStagingFs.exists(walStagingDir)) {
+ LOG.debug("{} WALs staging directory already exists: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), walStagingDir);
+ } else {
+ walStagingFs.mkdirs(walStagingDir);
+ LOG.debug("{} WALs staging directory created: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), walStagingDir);
+ }
+ }
+
+ private void startWalFlushExecutor() {
+ int initialDelay = conf.getInt(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY,
+ DEFAULT_STAGED_WAL_FLUSH_INITIAL_DELAY_SECONDS);
+ int flushInterval =
+ conf.getInt(CONF_STAGED_WAL_FLUSH_INTERVAL, DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS);
+
+ flushExecutor = Executors.newSingleThreadScheduledExecutor();
+ flushExecutor.scheduleAtFixedRate(this::flushAndBackupSafely, initialDelay, flushInterval,
+ TimeUnit.SECONDS);
+ }
+
+ private void flushAndBackupSafely() {
+ try {
+ LOG.info("{} Periodic WAL flush triggered...",
+ Utils.logPeerId(continuousBackupManager.getPeerId()));
+ flushWalFiles();
+ backupWalFiles();
+ } catch (IOException e) {
+ LOG.error("{} Error during periodic WAL flush: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), e.getMessage(), e);
+ }
+ }
+
+ private void flushWalFiles() {
+ lock.lock();
+ try {
+ for (Map.Entry entry : walWriterMap.entrySet()) {
+ flushWalData(entry.getKey(), entry.getValue());
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void flushWalData(Path walDir, ContinuousBackupWalWriter writer) {
+ if (writer.hasAnyEntry()) {
+ LOG.debug("{} Flushing WAL data for {}", Utils.logPeerId(continuousBackupManager.getPeerId()),
+ walDir);
+ closeWriter(writer);
+ walWriterMap.put(walDir, createNewContinuousBackupWalWriter(walDir));
+ } else {
+ LOG.debug("{} No WAL data to flush for {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), walDir);
+ }
+ }
+
+ private void closeWriter(ContinuousBackupWalWriter writer) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ LOG.error("{} Error occurred while closing WAL writer: ",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), e);
+ }
+ }
+
+ private ContinuousBackupWalWriter createNewContinuousBackupWalWriter(Path walDir) {
+ try {
+ return new ContinuousBackupWalWriter(walStagingFs, walStagingDir, walDir, conf);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create WAL Writer for " + walDir, e);
+ }
+ }
+
+ private void startBackupExecutor() {
+ int threads =
+ conf.getInt(CONF_STATED_FILES_BACKUP_THREADS, DEFAULT_STATED_FILES_BACKUP_THREADS);
+ backupExecutor = Executors.newFixedThreadPool(threads);
+ }
+
+ /**
+ * Stages WAL entries and bulk-load files for a specific table. Ensures that WAL entries are
+ * written and bulk-loaded files are registered to prevent deletion by the HFileCleaner thread.
+ * @param tableName the name of the table
+ * @param walEntries the list of WAL entries to stage
+ * @param bulkLoadFiles the list of bulk-load files to stage
+ * @throws IOException if there is an error staging the entries or files
+ */
+ public void stageEntries(TableName tableName, List walEntries,
+ List bulkLoadFiles) throws IOException {
+ lock.lock();
+ try {
+ String namespace = tableName.getNamespaceAsString();
+ String table = tableName.getQualifierAsString();
+
+ Path walDir = WALUtils.getWalDir(namespace, table);
+ ContinuousBackupWalWriter continuousBackupWalWriter = getContinuousBackupWalWriter(walDir);
+
+ continuousBackupWalWriter.write(walEntries, bulkLoadFiles);
+
+ // prevent bulk-loaded files from deleting HFileCleaner thread
+ stagedBulkloadFileRegistry.addStagedFiles(bulkLoadFiles);
+
+ LOG.info("{} {} WAL entries staged for table {}:{}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), walEntries.size(), namespace, table);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private ContinuousBackupWalWriter getContinuousBackupWalWriter(Path walDir) throws IOException {
+ try {
+ ContinuousBackupWalWriter writer =
+ walWriterMap.computeIfAbsent(walDir, this::createNewContinuousBackupWalWriter);
+ if (shouldRollOver(writer)) {
+ LOG.debug("{} WAL Writer for {} is being rolled over.",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), walDir);
+ closeWriter(writer);
+ writer = createNewContinuousBackupWalWriter(walDir);
+ walWriterMap.put(walDir, writer);
+ }
+ return writer;
+ } catch (UncheckedIOException e) {
+ String errorMsg = Utils.logPeerId(continuousBackupManager.getPeerId())
+ + " Failed to get or create WAL Writer for " + walDir;
+ throw new IOException(errorMsg, e);
+ }
+ }
+
+ private boolean shouldRollOver(ContinuousBackupWalWriter writer) {
+ long maxWalSize = conf.getLong(ContinuousBackupManager.CONF_BACKUP_MAX_WAL_SIZE,
+ ContinuousBackupManager.DEFAULT_MAX_WAL_SIZE);
+ return writer.getLength() >= maxWalSize;
+ }
+
+ /**
+ * Returns the staging path for a bulk-load file, given its relative path from the namespace.
+ * @param relativePathFromNamespace the relative path of the bulk-load file
+ * @return the resolved staging path for the bulk-load file
+ * @throws IOException if there is an error resolving the staging path
+ */
+ public Path getBulkloadFileStagingPath(Path relativePathFromNamespace) throws IOException {
+ return WALUtils.getBulkloadFileStagingPath(conf, relativePathFromNamespace);
+ }
+
+ /**
+ * Returns the staging path for a WAL file, given its relative path.
+ * @param relativeWalPath the relative path of the WAL file
+ * @return the resolved staging path for the WAL file
+ */
+ public Path getWalFileStagingPath(Path relativeWalPath) {
+ return WALUtils.getWalFileStagingPath(walStagingDir, relativeWalPath);
+ }
+
+ private void backupWalFiles() throws IOException {
+ LOG.info("{} Starting backup of WAL files from staging directory: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), walStagingDir);
+
+ RemoteIterator fileStatusIterator =
+ walStagingFs.listFiles(walStagingDir, true);
+ while (fileStatusIterator.hasNext()) {
+ LocatedFileStatus fileStatus = fileStatusIterator.next();
+ Path filePath = fileStatus.getPath();
+ LOG.trace("{} Processing file: {}", Utils.logPeerId(continuousBackupManager.getPeerId()),
+ filePath);
+
+ // Skip directories, context files, or files already backed up
+ if (
+ fileStatus.isDirectory()
+ || ContinuousBackupWalWriter.isWalWriterContextFile(filePath.getName())
+ || isFileCurrentlyBeingBackedUp(filePath)
+ ) {
+ LOG.trace("{} Skipping file (directory/context/backed-up): {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), filePath);
+ continue;
+ }
+
+ // Check if the file is currently being written
+ if (isFileOpenForWriting(filePath)) {
+ LOG.info("{} Skipping file as it is currently being written: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), filePath);
+ continue;
+ }
+
+ // Backup file asynchronously
+ backupFileAsync(filePath);
+ }
+
+ LOG.info("{} Completed backup process for WAL files from staging directory: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), walStagingDir);
+ }
+
+ private boolean isFileCurrentlyBeingBackedUp(Path filePath) {
+ return filesCurrentlyBeingBackedUp.contains(filePath);
+ }
+
+ private boolean isFileOpenForWriting(Path filePath) {
+ for (ContinuousBackupWalWriter context : walWriterMap.values()) {
+ if (context.isWritingToFile(filePath)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void backupFileAsync(Path filePath) {
+ // Mark the file as currently being backed up
+ filesCurrentlyBeingBackedUp.add(filePath);
+
+ backupExecutor.submit(() -> {
+ try {
+ backupFile(filePath);
+ } catch (IOException e) {
+ LOG.error("Backup failed for {}", filePath, e);
+ } finally {
+ // Remove the file from the "being backed up" set once processing is done
+ filesCurrentlyBeingBackedUp.remove(filePath);
+ }
+ });
+ }
+
+ // Backups a single file
+ private void backupFile(Path filePath) throws IOException {
+ LOG.info("{} Starting backup for WAL file: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), filePath);
+
+ String tableName = filePath.getParent().getName();
+ String namespace = filePath.getParent().getParent().getName();
+ Path walDir = WALUtils.getWalDir(namespace, tableName);
+ String walFileName = filePath.getName();
+ Path walFilePath = new Path(walDir, walFileName);
+ String walWriterContextFileName =
+ walFileName + ContinuousBackupWalWriter.WAL_WRITER_CONTEXT_FILE_SUFFIX;
+ Path walWriterContextFileFullPath =
+ new Path(walStagingDir, new Path(walDir, walWriterContextFileName));
+
+ LOG.debug("{} Resolving bulkload files for WAL writer context: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), walWriterContextFileFullPath);
+ List bulkLoadFiles = ContinuousBackupWalWriter.getBulkloadFilesFromProto(walStagingFs,
+ walWriterContextFileFullPath);
+
+ continuousBackupManager.commitBackup(walStagingFs, walFilePath, bulkLoadFiles);
+
+ stagedBulkloadFileRegistry.removeStagedFiles(bulkLoadFiles);
+
+ LOG.debug("{} Cleaning up WAL and metadata files for WAL: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), filePath);
+ deleteFile(filePath);
+ deleteFile(walWriterContextFileFullPath);
+
+ LOG.info("{} Backup completed successfully for WAL: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), filePath);
+ }
+
+ private void deleteFile(Path filePath) {
+ try {
+ if (walStagingFs.exists(filePath)) {
+ if (walStagingFs.delete(filePath, false)) {
+ LOG.debug("{} Deleted file: {}", Utils.logPeerId(continuousBackupManager.getPeerId()),
+ filePath);
+ } else {
+ LOG.warn("{} Failed to delete file: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), filePath);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("{} Error while deleting file: {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), filePath, e);
+ }
+ }
+
+ /**
+ * Closes the manager, ensuring that all executors are properly terminated and resources are
+ * cleaned up.
+ */
+ public void close() {
+ // Shutdown the flush executor
+ if (flushExecutor != null) {
+ flushExecutor.shutdown();
+ try {
+ if (
+ !flushExecutor.awaitTermination(EXECUTOR_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ ) {
+ flushExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ flushExecutor.shutdownNow();
+ LOG.warn("Flush executor shutdown was interrupted.", e);
+ }
+ LOG.info("{} WAL flush thread stopped.",
+ Utils.logPeerId(continuousBackupManager.getPeerId()));
+ }
+
+ // Shutdown the backup executor
+ if (backupExecutor != null) {
+ backupExecutor.shutdown();
+ try {
+ if (
+ !backupExecutor.awaitTermination(EXECUTOR_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ ) {
+ backupExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ backupExecutor.shutdownNow();
+ LOG.warn("Backup executor shutdown was interrupted.", e);
+ }
+ LOG.info("{} Backup executor stopped.", Utils.logPeerId(continuousBackupManager.getPeerId()));
+ }
+
+ // Flush remaining writers safely
+ for (Map.Entry entry : walWriterMap.entrySet()) {
+ Path walDir = entry.getKey();
+ ContinuousBackupWalWriter writer = entry.getValue();
+
+ if (writer.hasAnyEntry()) {
+ LOG.debug("{} Flushing WAL data for {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), walDir);
+ closeWriter(writer);
+ } else {
+ LOG.debug("{} No WAL data to flush for {}",
+ Utils.logPeerId(continuousBackupManager.getPeerId()), walDir);
+
+ // Remove the empty writer and delete associated files
+ closeWriter(writer);
+ deleteEmptyWalFile(writer, walDir);
+ }
+ }
+ }
+
+ private void deleteEmptyWalFile(ContinuousBackupWalWriter writer, Path walDir) {
+ Path walFilePath = writer.getWalFullPath();
+ String walFileName = walFilePath.getName();
+ String walWriterContextFileName =
+ walFileName + ContinuousBackupWalWriter.WAL_WRITER_CONTEXT_FILE_SUFFIX;
+ Path walWriterContextFileFullPath =
+ new Path(walStagingDir, new Path(walDir, walWriterContextFileName));
+
+ try {
+ deleteFile(walFilePath);
+ } catch (Exception e) {
+ LOG.warn("Failed to delete WAL file: {}", walFilePath, e);
+ }
+
+ try {
+ deleteFile(walWriterContextFileFullPath);
+ } catch (Exception e) {
+ LOG.warn("Failed to delete WAL writer context file: {}", walWriterContextFileFullPath, e);
+ }
+ }
+
+ public static class WALUtils {
+ public static Path getWalDir(String namespace, String table) {
+ return new Path(namespace, table);
+ }
+
+ public static Path getWalFileStagingPath(Path walsStagingDir, Path relativeWalPath) {
+ return new Path(walsStagingDir, relativeWalPath);
+ }
+
+ public static Path getBulkloadFileStagingPath(Configuration conf,
+ Path relativePathFromNamespace) throws IOException {
+ FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
+ Path rootDir = CommonFSUtils.getRootDir(conf);
+ Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
+ Path baseNamespaceDir = new Path(rootDir, baseNSDir);
+ Path hFileArchiveDir =
+ new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
+
+ Path result =
+ findExistingPath(rootFs, baseNamespaceDir, hFileArchiveDir, relativePathFromNamespace);
+ if (result == null) {
+ throw new IOException(
+ "No Bulk loaded file found in relative path: " + relativePathFromNamespace);
+ }
+ return result;
+ }
+
+ private static Path findExistingPath(FileSystem rootFs, Path baseNamespaceDir,
+ Path hFileArchiveDir, Path filePath) throws IOException {
+ for (Path candidate : new Path[] { new Path(baseNamespaceDir, filePath),
+ new Path(hFileArchiveDir, filePath) }) {
+ if (rootFs.exists(candidate)) {
+ return candidate;
+ }
+ }
+ return null;
+ }
+ }
+
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupWalWriter.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupWalWriter.java
new file mode 100644
index 000000000000..a1a9ff153f2d
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupWalWriter.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
+
+/**
+ * Handles writing of Write-Ahead Log (WAL) entries and tracking bulk-load files for continuous
+ * backup.
+ *
+ * This class is responsible for managing the WAL files and their associated context files. It
+ * provides functionality to write WAL entries, persist backup-related metadata, and retrieve
+ * bulk-load files from context files.
+ *
+ */
+@InterfaceAudience.Private
+public class ContinuousBackupWalWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(ContinuousBackupWalWriter.class);
+ public static final String WAL_FILE_PREFIX = "wal_file.";
+ public static final String WAL_WRITER_CONTEXT_FILE_SUFFIX = ".context";
+ private final WALProvider.Writer writer;
+ private final FileSystem fileSystem;
+ private final Path rootDir;
+ private final Path walPath;
+ private final Path walWriterContextFilePath;
+ private final long initialWalFileSize;
+ private final List bulkLoadFiles = new ArrayList<>();
+
+ /**
+ * Constructs a ContinuousBackupWalWriter for a specific WAL directory.
+ * @param fs the file system instance to use for file operations
+ * @param rootDir the root directory for WAL files
+ * @param walDir the WAL directory path
+ * @param conf the HBase configuration object
+ * @throws IOException if an error occurs during initialization
+ */
+ public ContinuousBackupWalWriter(FileSystem fs, Path rootDir, Path walDir, Configuration conf)
+ throws IOException {
+ LOG.info("Initializing ContinuousBackupWalWriter for WAL directory: {}", walDir);
+ this.fileSystem = fs;
+ this.rootDir = rootDir;
+
+ // Create WAL file
+ long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+ Path walDirFullPath = new Path(rootDir, walDir);
+ if (!fileSystem.exists(walDirFullPath)) {
+ LOG.info("WAL directory {} does not exist. Creating it.", walDirFullPath);
+ fileSystem.mkdirs(walDirFullPath);
+ }
+ String walFileName = WAL_FILE_PREFIX + currentTime;
+ this.walPath = new Path(walDir, walFileName);
+ Path walFileFullPath = new Path(walDirFullPath, walFileName);
+ LOG.debug("Creating WAL file at path: {}", walFileFullPath);
+ this.writer = WALFactory.createWALWriter(fileSystem, walFileFullPath, conf);
+ this.initialWalFileSize = writer.getLength();
+
+ // Create WAL Writer Context file
+ String walWriterContextFileName = walPath.getName() + WAL_WRITER_CONTEXT_FILE_SUFFIX;
+ this.walWriterContextFilePath = new Path(walPath.getParent(), walWriterContextFileName);
+ persistWalWriterContext();
+
+ LOG.info("ContinuousBackupWalWriter initialized successfully with WAL file: {}", walPath);
+ }
+
+ /**
+ * Writes WAL entries to the WAL file and tracks associated bulk-load files.
+ * @param walEntries the list of WAL entries to write
+ * @param bulkLoadFiles the list of bulk-load files to track
+ * @throws IOException if an error occurs during writing
+ */
+ public void write(List walEntries, List bulkLoadFiles) throws IOException {
+ LOG.debug("Writing {} WAL entries to WAL file: {}", walEntries.size(), walPath);
+ for (WAL.Entry entry : walEntries) {
+ writer.append(entry);
+ }
+
+ writer.sync(true); // Ensure data is flushed to disk
+ this.bulkLoadFiles.addAll(bulkLoadFiles);
+ persistWalWriterContext();
+ }
+
+ /**
+ * Returns the full path of the WAL file.
+ */
+ public Path getWalFullPath() {
+ return new Path(rootDir, walPath);
+ }
+
+ /**
+ * Returns the current size of the WAL file.
+ */
+ public long getLength() {
+ return writer.getLength();
+ }
+
+ /**
+ * Closes the WAL writer, ensuring all resources are released.
+ * @throws IOException if an error occurs during closure
+ */
+ public void close() throws IOException {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ /**
+ * Checks if the WAL file has any entries written to it.
+ * @return {@code true} if the WAL file contains entries; {@code false} otherwise
+ */
+ public boolean hasAnyEntry() {
+ return writer.getLength() > initialWalFileSize;
+ }
+
+ private void persistWalWriterContext() throws IOException {
+ LOG.debug("Persisting WAL writer context for file: {}", walWriterContextFilePath);
+ BackupProtos.ContinuousBackupWalWriterContext.Builder protoBuilder =
+ BackupProtos.ContinuousBackupWalWriterContext.newBuilder().setWalPath(walPath.toString())
+ .setInitialWalFileSize(this.initialWalFileSize);
+
+ for (Path bulkLoadFile : bulkLoadFiles) {
+ protoBuilder.addBulkLoadFiles(bulkLoadFile.toString());
+ }
+
+ Path walWriterContextFileFullPath = new Path(rootDir, walWriterContextFilePath);
+ try (FSDataOutputStream outputStream = fileSystem.create(walWriterContextFileFullPath, true)) {
+ if (!fileSystem.exists(walWriterContextFileFullPath)) {
+ LOG.error("Failed to create context file: {}", walWriterContextFileFullPath);
+ throw new IOException("Context file creation failed.");
+ }
+ protoBuilder.build().writeTo(outputStream);
+ outputStream.flush();
+ LOG.info("Successfully persisted WAL writer context for file: {}",
+ walWriterContextFileFullPath);
+ }
+ }
+
+ /**
+ * Checks if the specified file is the WAL file being written by this writer.
+ * @param filePath the path of the file to check
+ * @return {@code true} if the specified file is being written by this writer; {@code false}
+ * otherwise
+ */
+ public boolean isWritingToFile(Path filePath) {
+ return filePath.equals(new Path(rootDir, walPath));
+ }
+
+ /**
+ * Determines if a file name corresponds to a WAL writer context file.
+ * @param fileName the name of the file to check
+ * @return {@code true} if the file is a WAL writer context file; {@code false} otherwise
+ */
+ public static boolean isWalWriterContextFile(String fileName) {
+ return fileName.contains(WAL_WRITER_CONTEXT_FILE_SUFFIX);
+ }
+
+ /**
+ * Retrieves bulk-load files from a WAL writer context proto file.
+ * @param fs the file system instance
+ * @param protoFilePath the path to the proto file
+ * @return a list of paths for the bulk-load files
+ * @throws IOException if an error occurs during retrieval
+ */
+ public static List getBulkloadFilesFromProto(FileSystem fs, Path protoFilePath)
+ throws IOException {
+ LOG.debug("Retrieving bulk load files from proto file: {}", protoFilePath);
+ List bulkloadFiles = new ArrayList<>();
+ try (FSDataInputStream inputStream = fs.open(protoFilePath)) {
+ BackupProtos.ContinuousBackupWalWriterContext proto =
+ BackupProtos.ContinuousBackupWalWriterContext.parseFrom(inputStream);
+ for (String bulkLoadFile : proto.getBulkLoadFilesList()) {
+ bulkloadFiles.add(new Path(bulkLoadFile));
+ }
+ LOG.info("Retrieved {} bulk load files from proto file: {}", bulkloadFiles.size(),
+ protoFilePath);
+ }
+ return bulkloadFiles;
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/StagedBulkloadFileRegistry.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/StagedBulkloadFileRegistry.java
new file mode 100644
index 000000000000..37b7be9d6c89
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/StagedBulkloadFileRegistry.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
+
+/**
+ * A registry for managing staged bulk load files associated with a replication peer in HBase. This
+ * class ensures the required table for storing bulk load file metadata exists and provides methods
+ * to add, remove, and retrieve staged files for a given peer.
+ */
+@InterfaceAudience.Private
+public class StagedBulkloadFileRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(StagedBulkloadFileRegistry.class);
+ private static final String TABLE_NAME = "staged_bulkload_files";
+ private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
+ private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("files");
+
+ private final Connection connection;
+ private final String peerId;
+
+ /**
+ * Constructs a registry for managing staged bulk load files. Ensures the required table exists in
+ * the HBase cluster.
+ * @param connection the HBase connection
+ * @param peerId the replication peer ID associated with this registry
+ * @throws IOException if an error occurs while ensuring the table exists
+ */
+ public StagedBulkloadFileRegistry(Connection connection, String peerId) throws IOException {
+ this.connection = connection;
+ this.peerId = peerId;
+
+ // Ensure the table exists
+ Admin admin = connection.getAdmin();
+ TableName tableName = TableName.valueOf(TABLE_NAME);
+
+ if (!admin.tableExists(tableName)) {
+ LOG.info("Table '{}' does not exist. Creating it now.", TABLE_NAME);
+ TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).build()).build();
+ admin.createTable(tableDescriptor);
+ LOG.info("Table '{}' created successfully.", TABLE_NAME);
+ }
+ admin.close();
+ }
+
+ /**
+ * Fetches the list of staged bulk load files for the current replication peer.
+ * @return a list of file paths as strings
+ * @throws IOException if an error occurs while fetching data from HBase
+ */
+ public List getStagedFiles() throws IOException {
+ LOG.debug("{} Fetching staged files.", Utils.logPeerId(peerId));
+ List stagedFiles = new ArrayList<>();
+ try (Table table = connection.getTable(TableName.valueOf(TABLE_NAME))) {
+ Get get = new Get(Bytes.toBytes(peerId));
+ get.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER);
+
+ Result result = table.get(get);
+ byte[] filesData = result.getValue(COLUMN_FAMILY, COLUMN_QUALIFIER);
+
+ if (filesData != null) {
+ stagedFiles = deserializeFiles(filesData);
+ LOG.debug("{} Fetched {} staged files.", Utils.logPeerId(peerId), stagedFiles.size());
+ } else {
+ LOG.debug("{} No staged files found.", Utils.logPeerId(peerId));
+ }
+ }
+ return stagedFiles;
+ }
+
+ /**
+ * Adds new staged bulk load files for the current replication peer. Existing files are preserved,
+ * and the new files are appended to the list.
+ * @param newFiles a list of file paths to add
+ * @throws IOException if an error occurs while updating HBase
+ */
+ public void addStagedFiles(List newFiles) throws IOException {
+ LOG.debug("{} Adding {} new staged files.", Utils.logPeerId(peerId), newFiles.size());
+ List existingFiles = getStagedFiles();
+ existingFiles.addAll(newFiles.stream().map(Path::toString).toList());
+
+ try (Table table = connection.getTable(TableName.valueOf(TABLE_NAME))) {
+ Put put = new Put(Bytes.toBytes(peerId));
+ put.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, serializeFiles(existingFiles));
+ table.put(put);
+ LOG.debug("{} Successfully added {} files.", Utils.logPeerId(peerId), newFiles.size());
+ }
+ }
+
+ /**
+ * Removes specified bulk load files from the staged files for the current replication peer.
+ * @param filesToRemove a list of file paths to remove
+ * @throws IOException if an error occurs while updating HBase
+ */
+ public void removeStagedFiles(List filesToRemove) throws IOException {
+ LOG.debug("{} Removing {} staged files.", Utils.logPeerId(peerId), filesToRemove.size());
+ List existingFiles = getStagedFiles();
+ existingFiles.removeAll(filesToRemove.stream().map(Path::toString).toList());
+
+ try (Table table = connection.getTable(TableName.valueOf(TABLE_NAME))) {
+ Put put = new Put(Bytes.toBytes(peerId));
+ put.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, serializeFiles(existingFiles));
+ table.put(put);
+ LOG.debug("{} Successfully removed {} files.", Utils.logPeerId(peerId), filesToRemove.size());
+ }
+ }
+
+ private byte[] serializeFiles(List files) {
+ LOG.trace("{} Serializing {} files.", Utils.logPeerId(peerId), files.size());
+ BackupProtos.StagedBulkloadFilesInfo.Builder protoBuilder =
+ BackupProtos.StagedBulkloadFilesInfo.newBuilder();
+ protoBuilder.addAllFiles(files);
+ return protoBuilder.build().toByteArray();
+ }
+
+ private List deserializeFiles(byte[] data) throws IOException {
+ LOG.trace("{} Deserializing staged bulkload files.", Utils.logPeerId(peerId));
+ BackupProtos.StagedBulkloadFilesInfo proto =
+ BackupProtos.StagedBulkloadFilesInfo.parseFrom(data);
+ return new ArrayList<>(proto.getFilesList());
+ }
+
+ /**
+ * Lists all staged bulk load files across all peers in the HBase cluster.
+ * @param connection the HBase connection
+ * @return a set of file paths as strings representing all staged bulk load files
+ * @throws IOException if an error occurs while scanning the table
+ */
+ public static Set listAllBulkloadFiles(Connection connection) throws IOException {
+ LOG.debug("Listing all staged bulkload files from table '{}'.", TABLE_NAME);
+ Set allFiles = new HashSet<>();
+
+ try (Admin admin = connection.getAdmin()) {
+ if (!admin.tableExists(TableName.valueOf(TABLE_NAME))) {
+ LOG.debug("Table '{}' does not exist. Returning empty set.", TABLE_NAME);
+ return allFiles;
+ }
+
+ try (Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
+ ResultScanner scanner = table.getScanner(new Scan())) {
+
+ for (Result result : scanner) {
+ byte[] filesData = result.getValue(COLUMN_FAMILY, COLUMN_QUALIFIER);
+ if (filesData != null) {
+ List files =
+ BackupProtos.StagedBulkloadFilesInfo.parseFrom(filesData).getFilesList();
+ allFiles.addAll(files);
+ }
+ }
+ LOG.debug("Listed a total of {} staged bulkload files.", allFiles.size());
+ }
+ }
+
+ return allFiles;
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java
new file mode 100644
index 000000000000..69365674acca
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class Utils {
+ private Utils() {
+ }
+
+ public static String logPeerId(String peerId) {
+ return "[Source for peer " + peerId + "]:";
+ }
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
new file mode 100644
index 000000000000..6e14880fc15b
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupManager.CONF_BACKUP_MAX_WAL_SIZE;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupManager.CONF_BACKUP_ROOT_DIR;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupStagingManager.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupStagingManager.CONF_STAGED_WAL_FLUSH_INTERVAL;
+import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestContinuousBackupReplicationEndpoint {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestContinuousBackupReplicationEndpoint.class);
+
+ private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+ private static final Configuration conf = TEST_UTIL.getConfiguration();
+ private static final byte[] QUALIFIER = Bytes.toBytes("my-qualifier");
+ static FileSystem fs = null;
+ Path root;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Set the configuration properties as required
+ conf.set(MASTER_HFILE_CLEANER_PLUGINS,
+ "org.apache.hadoop.hbase.backup.replication.ContinuousBackupStagedHFileCleaner");
+ conf.setLong(CONF_BACKUP_MAX_WAL_SIZE, 10240);
+ conf.setInt(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, 10);
+ conf.setInt(CONF_STAGED_WAL_FLUSH_INTERVAL, 10);
+ conf.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ conf.set(REPLICATION_CLUSTER_ID, "clusterId1");
+
+ TEST_UTIL.startMiniZKCluster();
+ TEST_UTIL.startMiniCluster(1);
+ fs = FileSystem.get(conf);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (fs != null) {
+ fs.close();
+ }
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setup() throws IOException {
+ root = TEST_UTIL.getDataTestDirOnTestFS();
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ fs.delete(root, true);
+ }
+
+ @Test
+ public void testWALAndBulkLoadFileBackup() throws IOException {
+ String tableName = "usertable";
+ String cfName = "cf";
+
+ ColumnFamilyDescriptor columnFamilyDescriptor =
+ ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cfName)).setScope(1).build();
+ TableDescriptor tableDescriptor = TableDescriptorBuilder
+ .newBuilder(TableName.valueOf(tableName)).setColumnFamily(columnFamilyDescriptor).build();
+
+ Admin admin = TEST_UTIL.getAdmin();
+ if (!admin.tableExists(TableName.valueOf(tableName))) {
+ admin.createTable(tableDescriptor);
+ }
+
+ String peerId = "peerId";
+ Map> tableMap = new HashMap<>();
+ tableMap.put(TableName.valueOf(tableName), new ArrayList<>());
+ String continuousBackupReplicationEndpoint =
+ "org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint";
+
+ Path backupRootDir = new Path(root, "testWALBackup");
+ fs.mkdirs(backupRootDir);
+
+ Map additionalArgs = new HashMap<>();
+ additionalArgs.put(CONF_PEER_UUID, "0c5672e3-0f96-4f5d-83d2-cceccebdfd42");
+ additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString());
+
+ ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+ .setReplicationEndpointImpl(continuousBackupReplicationEndpoint)
+ .setReplicateAllUserTables(false).setTableCFsMap(tableMap).putAllConfiguration(additionalArgs)
+ .build();
+
+ admin.addReplicationPeer(peerId, peerConfig);
+
+ try (Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName))) {
+ int rowSize = 32;
+ int totalRows = 100;
+ TEST_UTIL.loadRandomRows(table, Bytes.toBytes(cfName), rowSize, totalRows);
+
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily");
+ dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true);
+ byte[] from = Bytes.toBytes(cfName + "begin");
+ byte[] to = Bytes.toBytes(cfName + "end");
+
+ Path familyDir = new Path(dir, cfName);
+ HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, new Path(familyDir, "MyHFile"),
+ Bytes.toBytes(cfName), QUALIFIER, from, to, 1000);
+
+ BulkLoadHFiles loader = new BulkLoadHFilesTool(TEST_UTIL.getConfiguration());
+ loader.bulkLoad(table.getName(), dir);
+
+ assertEquals(1100, HBaseTestingUtil.countRows(table));
+
+ // Wait for 10 seconds here
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Thread was interrupted while waiting", e);
+ }
+
+ Path walDir = new Path(backupRootDir, "WALs/default/usertable");
+ assertTrue("WAL directory does not exist!", fs.exists(walDir));
+ FileStatus[] walFiles = fs.listStatus(walDir);
+ assertNotNull("No WAL files found!", walFiles);
+ assertTrue("Expected some WAL files but found none!", walFiles.length > 0);
+
+ Path bulkLoadFilesDir = new Path(backupRootDir, "bulk-load-files/default/usertable");
+ assertTrue("Bulk load files directory does not exist!", fs.exists(bulkLoadFilesDir));
+ FileStatus[] bulkLoadFiles = fs.listStatus(bulkLoadFilesDir);
+ assertNotNull("No Bulk load files found!", bulkLoadFiles);
+ assertTrue("Expected some Bulk load files but found none!", bulkLoadFiles.length > 0);
+ } finally {
+ TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
+ admin.disableTable(TableName.valueOf(tableName));
+ admin.deleteTable(TableName.valueOf(tableName));
+ }
+ }
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupStagedHFileCleaner.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupStagedHFileCleaner.java
new file mode 100644
index 000000000000..c2c1920ec12a
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupStagedHFileCleaner.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestContinuousBackupStagedHFileCleaner {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestContinuousBackupStagedHFileCleaner.class);
+
+ private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+ private static final Configuration conf = TEST_UTIL.getConfiguration();
+ static FileSystem fs = null;
+ Path root;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(1);
+ fs = FileSystem.get(conf);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (fs != null) {
+ fs.close();
+ }
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setup() throws IOException {
+ root = TEST_UTIL.getDataTestDirOnTestFS();
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ fs.delete(root, true);
+ }
+
+ @Test
+ public void testGetDeletableFiles() throws IOException {
+ // 1. Create a file
+ Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs");
+ fs.createNewFile(file);
+
+ // 2. Assert file is successfully created
+ assertTrue("Test file not created!", fs.exists(file));
+
+ ContinuousBackupStagedHFileCleaner cleaner = new ContinuousBackupStagedHFileCleaner();
+ cleaner.setConf(conf);
+
+ List stats = new ArrayList<>();
+ // Prime the cleaner
+ stats.add(fs.getFileStatus(file));
+ Iterable deletable = cleaner.getDeletableFiles(stats);
+
+ // 3. Assert that file as is, should be deletable
+ boolean found = false;
+ for (FileStatus stat1 : deletable) {
+ if (stat1.equals(fs.getFileStatus(file))) {
+ found = true;
+ break;
+ }
+ }
+ assertTrue("File should be deletable as it has no HFile references.", found);
+
+ // 4. Add the file as bulk load
+ List list = new ArrayList<>(1);
+ list.add(file);
+ String peerId = "peerId";
+ Connection conn = ConnectionFactory.createConnection(conf);
+ StagedBulkloadFileRegistry stagedBulkloadFileRegistry =
+ new StagedBulkloadFileRegistry(conn, peerId);
+ stagedBulkloadFileRegistry.addStagedFiles(list);
+
+ // 5. Assert file should not be deletable
+ deletable = cleaner.getDeletableFiles(stats);
+ found = false;
+ for (FileStatus stat1 : deletable) {
+ if (stat1.equals(fs.getFileStatus(file))) {
+ found = true;
+ }
+ }
+
+ assertFalse("File should not be deletable as it has an HFile reference.", found);
+ }
+}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Backup.proto b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
index afe43122f848..b51b8ce6a2e6 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Backup.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
@@ -119,3 +119,14 @@ message BackupInfo {
STORE_MANIFEST = 5;
}
}
+
+message ContinuousBackupWalWriterContext {
+ required string wal_path = 1;
+ required int64 initial_wal_file_size = 2;
+ repeated string bulk_load_files = 3;
+}
+
+message StagedBulkloadFilesInfo {
+ repeated string files = 1;
+}
+