diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 48893ccbcf4f..7ddca1f61d1b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -1542,6 +1542,66 @@ default void snapshot(String snapshotName, TableName tableName, void snapshot(SnapshotDescription snapshot) throws IOException, SnapshotCreationException, IllegalArgumentException; + /** + * Take a snapshot and wait for the server to complete that snapshot (blocking). It's same as + * {@link org.apache.hadoop.hbase.client.Admin#snapshot(String, TableName)} for users. The + * difference between the two methods is that + * {@link org.apache.hadoop.hbase.client.Admin#snapshotTable(String, TableName)} is based on + * proc-v2. + * @param snapshotName name to give the snapshot on the filesystem. Must be unique from all other + * snapshots stored on the cluster + * @param tableName name of the table to snapshot + * @throws IOException we fail to reach the master + * @throws SnapshotCreationException if snapshot creation failed + * @throws IllegalArgumentException if the snapshot request is formatted incorrectly + */ + default void snapshotTable(String snapshotName, TableName tableName) + throws IOException, SnapshotCreationException, IllegalArgumentException{ + snapshotTable(snapshotName, tableName, SnapshotType.FLUSH); + } + + /** + * Create typed snapshot of the table. + * @param snapshotName name to give the snapshot on the filesystem. Must be unique from all other + * snapshots stored on the cluster + * @param tableName name of the table to snapshot + * @param type type of snapshot to take + * @throws IOException we fail to reach the master + * @throws SnapshotCreationException if snapshot creation failed + * @throws IllegalArgumentException if the snapshot request is formatted incorrectly + */ + default void snapshotTable(String snapshotName, TableName tableName, SnapshotType type) + throws IOException, SnapshotCreationException, IllegalArgumentException { + snapshotTable(new SnapshotDescription(snapshotName, tableName, type)); + } + + /** + * Create typed snapshot of the table. + * @param snapshotName name to give the snapshot on the filesystem. Must be unique from all other + * snapshots stored on the cluster + * @param tableName name of the table to snapshot + * @param type type of snapshot to take + * @param snapshotProps snapshot additional properties e.g. TTL + * @throws IOException we fail to reach the master + * @throws SnapshotCreationException if snapshot creation failed + * @throws IllegalArgumentException if the snapshot request is formatted incorrectly + */ + default void snapshotTable(String snapshotName, TableName tableName, + SnapshotType type, Map snapshotProps) + throws IOException, SnapshotCreationException, IllegalArgumentException { + snapshot(new SnapshotDescription(snapshotName, tableName, type, snapshotProps)); + } + + /** + * Take a snapshot and wait for the server to complete that snapshot (blocking). + * @param snapshot snapshot to take + * @throws IOException we fail to reach the master + * @throws SnapshotCreationException if snapshot creation failed + * @throws IllegalArgumentException if the snapshot request is formatted incorrectly + */ + void snapshotTable(SnapshotDescription snapshot) + throws IOException, SnapshotCreationException, IllegalArgumentException; + /** * Take a snapshot without waiting for the server to complete that snapshot (asynchronous). * Snapshots are considered unique based on the name of the snapshot. Snapshots are taken diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java index 08de9797a16a..b4618c5a6eef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java @@ -625,6 +625,12 @@ public void snapshot(SnapshotDescription snapshot) get(admin.snapshot(snapshot)); } + @Override + public void snapshotTable(SnapshotDescription snapshot) + throws IOException, SnapshotCreationException, IllegalArgumentException { + get(admin.snapshotTable(snapshot)); + } + @Override public Future snapshotAsync(SnapshotDescription snapshot) throws IOException, SnapshotCreationException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index c366d3e6a7cc..ea25c9737384 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -825,6 +825,20 @@ default CompletableFuture snapshot(String snapshotName, TableName tableNam */ CompletableFuture snapshot(SnapshotDescription snapshot); + /** + * Take a snapshot and wait for the server to complete that snapshot asynchronously. + */ + default CompletableFuture snapshotTable(String snapshotName, TableName tableName) { + return snapshot(new SnapshotDescription(snapshotName, tableName, SnapshotType.FLUSH)); + } + + default CompletableFuture snapshotTable(String snapshotName, TableName tableName, + SnapshotType type) { + return snapshot(new SnapshotDescription(snapshotName, tableName, type)); + } + + CompletableFuture snapshotTable(SnapshotDescription snapshot); + /** * Check the current state of the passed snapshot. There are three possible states: *
    diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index f0f564be1281..d36a8cb3c925 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -476,6 +476,11 @@ public CompletableFuture snapshot(SnapshotDescription snapshot) { return wrap(rawAdmin.snapshot(snapshot)); } + @Override + public CompletableFuture snapshotTable(SnapshotDescription snapshot) { + return wrap(rawAdmin.snapshotTable(snapshot)); + } + @Override public CompletableFuture isSnapshotFinished(SnapshotDescription snapshot) { return wrap(rawAdmin.isSnapshotFinished(snapshot)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 292c9cb19bbd..52eed18ec6b0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -265,6 +265,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; @@ -1870,11 +1872,12 @@ public CompletableFuture snapshot(SnapshotDescription snapshotDesc) { } catch (IllegalArgumentException e) { return failedFuture(e); } + CompletableFuture future = new CompletableFuture<>(); final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build(); addListener(this. newMasterCaller() - .action((controller, stub) -> this. call(controller, - stub, request, (s, c, req, done) -> s.snapshot(c, req, done), + .action((controller, stub) -> this. call( + controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp.getExpectedTimeout())) .call(), (expectedTimeout, err) -> { if (err != null) { @@ -1916,6 +1919,23 @@ public void run(Timeout timeout) throws Exception { return future; } + @Override + public CompletableFuture snapshotTable(SnapshotDescription snapshotDesc) { + SnapshotProtos.SnapshotDescription snapshot = + ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); + try { + ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); + } catch (IllegalArgumentException e) { + return failedFuture(e); + } + + SnapshotTableRequest snapshotTableRequest = SnapshotTableRequest.newBuilder() + .setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(); + return this. procedureCall(snapshotTableRequest, + (s, c, req, done) -> s.snapshotTable(c, req, done), (resp) -> resp.getProcId(), + new SnapshotTableProcedureBiConsumer(TableName.valueOf(snapshot.getTable()))); + } + @Override public CompletableFuture isSnapshotFinished(SnapshotDescription snapshot) { return this @@ -2738,6 +2758,17 @@ String getOperationType() { } } + private static class SnapshotTableProcedureBiConsumer extends TableProcedureBiConsumer { + SnapshotTableProcedureBiConsumer(TableName tableName) { + super(tableName); + } + + @Override + String getOperationType() { + return "SNAPSHOT"; + } + } + private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { private final String peerId; private final Supplier getOperation; diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index ea09e116cc71..9340b87518e7 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -443,6 +443,16 @@ message SnapshotResponse { required int64 expected_timeout = 1; } +message SnapshotTableRequest { + required SnapshotDescription snapshot = 1; + optional uint64 nonce_group = 2 [default = 0]; + optional uint64 nonce = 3 [default = 0]; +} + +message SnapshotTableResponse { + optional int64 proc_id = 1; +} + message GetCompletedSnapshotsRequest { } @@ -932,6 +942,11 @@ service MasterService { */ rpc Snapshot(SnapshotRequest) returns(SnapshotResponse); + /** + * Create a snapshot for the given table. + */ + rpc SnapshotTable(SnapshotTableRequest) returns(SnapshotTableResponse); + /** * Get completed snapshots. * Returns a list of snapshot descriptors for completed snapshots diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 4f1a3cf1635f..c166a9256fa3 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -191,6 +191,46 @@ message RestoreParentToChildRegionsPair { required string child2_region_name = 3; } +enum SnapshotState { + SNAPSHOT_PREPARE = 1; + SNAPSHOT_PRE_OPERATION = 2; + SNAPSHOT_WRITE_SNAPSHOT_INFO = 3; + SNAPSHOT_SNAPSHOT_ONLINE_REGIONS = 4; + SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS = 5; + SNAPSHOT_SNAPSHOT_MOB_REGION = 6; + SNAPSHOT_CONSOLIDATE_SNAPSHOT = 7; + SNAPSHOT_VERIFIER_SNAPSHOT = 8; + SNAPSHOT_COMPLETE_SNAPSHOT = 9; + SNAPSHOT_POST_OPERATION = 10; +} + +message SnapshotProcedureStateData { + required SnapshotDescription snapshot = 1; +} + +message SnapshotRegionProcedureStateData { + required RegionInfo region = 1; + required SnapshotDescription snapshot = 2; +} + +message SnapshotRegionParameter { + required RegionInfo region = 1; + required SnapshotDescription snapshot = 2; +} + +message SnapshotVerifyProcedureStateData { + required SnapshotDescription snapshot = 1; + repeated RegionInfo region = 2; + required ServerName target_server = 3; + required uint32 expected_num_region = 4; +} + +message SnapshotVerifyParameter { + required SnapshotDescription snapshot = 1; + repeated RegionInfo region = 2; + required uint32 expected_num_region = 3; +} + enum CloneSnapshotState { CLONE_SNAPSHOT_PRE_OPERATION = 1; CLONE_SNAPSHOT_WRITE_FS_LAYOUT = 2; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index a39493cc2628..0b608be369a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -301,7 +301,21 @@ public enum EventType { * * RS_CLAIM_REPLICATION_QUEUE */ - RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE); + RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE), + + /** + * RS snapshot regions.
    + * + * RS_SNAPSHOT_REGIONS + */ + RS_SNAPSHOT_REGIONS(87, ExecutorType.RS_SNAPSHOT_OPERATIONS), + + /** + * RS verify snapshot.
    + * + * RS_VERIFY_SNAPSHOT + */ + RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 120f9bea5b7b..cbecb3e8619f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -53,7 +53,8 @@ public enum ExecutorType { RS_REPLAY_SYNC_REPLICATION_WAL(32), RS_SWITCH_RPC_THROTTLE(33), RS_IN_MEMORY_COMPACTION(34), - RS_CLAIM_REPLICATION_QUEUE(35); + RS_CLAIM_REPLICATION_QUEUE(35), + RS_SNAPSHOT_OPERATIONS(36); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 46bc8c2158eb..c92a439fb362 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -329,6 +329,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; @@ -1719,6 +1721,27 @@ public SnapshotResponse snapshot(RpcController controller, } } + @Override + public SnapshotTableResponse snapshotTable(RpcController controller, + SnapshotTableRequest request) throws ServiceException { + try { + server.checkInitialized(); + server.snapshotManager.checkSnapshotSupport(); + LOG.info(server.getClientIdAuditPrefix() + " snapshot request for:" + + ClientSnapshotDescriptionUtils.toString(request.getSnapshot())); + + SnapshotDescription snapshot = SnapshotDescriptionUtils.validate( + request.getSnapshot(), server.getConfiguration()); + long procId = server.snapshotManager + .takeSnapshot(snapshot, request.getNonceGroup(), request.getNonce()); + return SnapshotTableResponse.newBuilder().setProcId(procId).build(); + } catch (ForeignException e) { + throw new ServiceException(e.getCause()); + } catch (IOException e) { + throw new ServiceException(e); + } + } + @Override public StopMasterResponse stopMaster(RpcController controller, StopMasterRequest request) throws ServiceException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java index fb57cb92dc2f..c3b66f2d831c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java @@ -446,7 +446,7 @@ protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) { private boolean prepareMergeRegion(final MasterProcedureEnv env) throws IOException { // Fail if we are taking snapshot for the given table TableName tn = regionsToMerge[0].getTable(); - if (env.getMasterServices().getSnapshotManager().isTakingSnapshot(tn)) { + if (env.getMasterServices().getSnapshotManager().isTakingSnapshot(tn, true)) { throw new MergeRegionException("Skip merging regions " + RegionInfo.getShortNameToLog(regionsToMerge) + ", because we are snapshotting " + tn); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index 0a15e36a16af..5394a5569940 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -500,7 +500,7 @@ private byte[] getSplitRow() { public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException { // Fail if we are taking snapshot for the given table if (env.getMasterServices().getSnapshotManager() - .isTakingSnapshot(getParentRegion().getTable())) { + .isTakingSnapshot(getParentRegion().getTable(), true)) { setFailure(new IOException("Skip splitting region " + getParentRegion().getShortNameToLog() + ", because we are taking snapshot for the table " + getParentRegion().getTable())); return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index a7abfdc13f5b..16d45f2e3f9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -50,7 +50,12 @@ public enum ServerOperationType { /** * send the claim replication queue request to region server to actually assign it */ - CLAIM_REPLICATION_QUEUE_REMOTE + CLAIM_REPLICATION_QUEUE_REMOTE, + + /** + * send verify snapshot request to region server and handle the response + */ + VERIFY_SNAPSHOT } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 726ee14c979f..2cbceb22e514 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -40,6 +40,7 @@ public boolean requireExclusiveLock(Procedure proc) { case SPLIT_WAL_REMOTE: case CLAIM_REPLICATION_QUEUES: case CLAIM_REPLICATION_QUEUE_REMOTE: + case VERIFY_SNAPSHOT: return false; default: break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java new file mode 100644 index 000000000000..7798010e04ed --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MetricsSnapshot; +import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; + +/** + * A procedure used to take snapshot on tables. + */ +@InterfaceAudience.Private +public class SnapshotProcedure + extends AbstractStateMachineTableProcedure { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class); + private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot(); + + private Configuration conf; + private SnapshotDescription snapshot; + private Path rootDir; + private Path snapshotDir; + private Path workingDir; + private FileSystem workingDirFS; + private TableName snapshotTable; + private MonitoredTask status; + private SnapshotManifest snapshotManifest; + private TableDescriptor htd; + private ForeignExceptionDispatcher monitor; + + public SnapshotProcedure() { } + + public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) { + super(env); + this.snapshot = snapshot; + } + + @Override + public TableName getTableName() { + return TableName.valueOf(snapshot.getTable()); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.SNAPSHOT; + } + + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + // AbstractStateMachineTableProcedure acquires exclusive table lock by default, + // but we may need to downgrade it to shared lock for some reasons: + // a. exclusive lock has a negative effect on assigning region. See HBASE-21480 for details. + // b. we want to support taking multiple different snapshots on same table on the same time. + if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(MasterProcedureEnv env) { + env.getProcedureScheduler().wakeTableSharedLock(this, getTableName()); + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + // In order to avoid enabling/disabling/modifying/deleting table during snapshot, + // we don't release lock during suspend + return true; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + LOG.info("{} execute state={}", this, state); + + try { + switch (state) { + case SNAPSHOT_PREPARE: + prepareSnapshot(env); + setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_PRE_OPERATION: + preSnapshot(env); + setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_WRITE_SNAPSHOT_INFO: + SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS); + TableState tableState = + env.getMasterServices().getTableStateManager().getTableState(snapshotTable); + if (tableState.isEnabled()) { + setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); + } else if (tableState.isDisabled()) { + setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS); + } + return Flow.HAS_MORE_STATE; + case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS: + addChildProcedure(createRemoteSnapshotProcedures(env)); + setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS: + snapshotOfflineRegions(env); + if (MobUtils.hasMobColumns(htd)) { + setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); + } else { + setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT); + } + return Flow.HAS_MORE_STATE; + case SNAPSHOT_SNAPSHOT_MOB_REGION: + snapshotMobRegion(); + setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_CONSOLIDATE_SNAPSHOT: + // flush the in-memory state, and write the single manifest + status.setStatus("Consolidate snapshot: " + snapshot.getName()); + snapshotManifest.consolidate(); + setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_VERIFIER_SNAPSHOT: + status.setStatus("Verifying snapshot: " + snapshot.getName()); + verifySnapshot(env); + setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_COMPLETE_SNAPSHOT: + completeSnapshot(env); + setNextState(SnapshotState.SNAPSHOT_POST_OPERATION); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_POST_OPERATION: + postSnapshot(env); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (Exception e) { + setFailure("master-snapshot", e); + LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e); + status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable); + return Flow.NO_MORE_STATE; + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, SnapshotState state) + throws IOException, InterruptedException { + if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) { + try { + if (!workingDirFS.delete(workingDir, true)) { + LOG.error("Couldn't delete snapshot working directory {}", workingDir); + } + } catch (IOException e) { + LOG.error("Couldn't delete snapshot working directory {}", workingDir, e); + } + } + } + + @Override + protected boolean isRollbackSupported(SnapshotState state) { + return true; + } + + @Override + protected SnapshotState getState(final int stateId) { + return SnapshotState.forNumber(stateId); + } + + @Override + protected int getStateId(SnapshotState state) { + return state.getNumber(); + } + + @Override + protected SnapshotState getInitialState() { + return SnapshotState.SNAPSHOT_PREPARE; + } + + private void prepareSnapshot(MasterProcedureEnv env) throws IOException { + this.conf = env.getMasterConfiguration(); + this.snapshotTable = TableName.valueOf(snapshot.getTable()); + this.htd = loadTableDescriptorSnapshot(env); + this.rootDir = CommonFSUtils.getRootDir(conf); + this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); + this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf); + this.workingDirFS = workingDir.getFileSystem(conf); + this.status = TaskMonitor.get() + .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable); + this.monitor = new ForeignExceptionDispatcher(snapshot.getName()); + this.snapshotManifest = SnapshotManifest.create(conf, + workingDirFS, workingDir, snapshot, monitor, status); + this.snapshotManifest.addTableDescriptor(htd); + } + + private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException { + TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable); + if (htd == null) { + throw new IOException("TableDescriptor missing for " + snapshotTable); + } + if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) { + return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE, + Long.toString(this.snapshot.getMaxFileSize())).build(); + } + return htd; + } + + private void preSnapshot(MasterProcedureEnv env) throws IOException { + env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot); + + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd); + } + } + + private void postSnapshot(MasterProcedureEnv env) throws IOException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd); + } + } + + private void verifySnapshot(MasterProcedureEnv env) throws IOException { + int verifyThreshold = env.getMasterConfiguration() + .getInt("hbase.snapshot.remote.verify.threshold", 10000); + int numRegions = (int) env.getAssignmentManager() + .getTableRegions(snapshotTable, false) + .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).count(); + + if (numRegions >= verifyThreshold) { + addChildProcedure(createRemoteVerifyProcedures(env)); + } else { + MasterSnapshotVerifier verifier = + new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS); + verifier.verifySnapshot(); + } + } + + private void completeSnapshot(MasterProcedureEnv env) throws IOException { + // complete the snapshot, atomically moving from tmp to .snapshot dir. + SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir, + env.getMasterFileSystem().getFileSystem(), workingDirFS, conf); + // update metric. when master restarts, the metric value is wrong + metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime()); + if (env.getMasterCoprocessorHost() != null) { + env.getMasterCoprocessorHost() + .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd); + } + status.markComplete("Snapshot " + snapshot.getName() + " completed"); + } + + private void snapshotOfflineRegions(MasterProcedureEnv env) throws IOException { + List regions = env.getAssignmentManager() + .getTableRegions(snapshotTable, false).stream() + .filter(r -> RegionReplicaUtil.isDefaultReplica(r)) + .filter(RegionInfo::isSplit).collect(Collectors.toList()); + + ThreadPoolExecutor exec = SnapshotManifest + .createExecutor(env.getMasterConfiguration(), "OfflineRegionsSnapshotPool"); + try { + ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { + @Override + public void editRegion(final RegionInfo regionInfo) throws IOException { + snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), regionInfo); + } + }); + } finally { + exec.shutdown(); + } + status.setStatus("Completed referencing offline regions of table: " + snapshotTable); + } + + private void snapshotMobRegion() throws IOException { + RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName()); + snapshotManifest.addMobRegion(mobRegionInfo); + status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable); + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(SnapshotProcedureStateData + .newBuilder().setSnapshot(this.snapshot).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class); + this.snapshot = data.getSnapshot(); + } + + private Procedure[] createRemoteSnapshotProcedures(MasterProcedureEnv env) { + return env.getAssignmentManager().getTableRegions(snapshotTable, true) + .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)) + .map(r -> new SnapshotRegionProcedure(snapshot, r)) + .toArray(SnapshotRegionProcedure[]::new); + } + + // here we assign region snapshot manifest verify tasks to all region servers + // in cluster with the help of master load balancer. + private Procedure[] createRemoteVerifyProcedures(MasterProcedureEnv env) + throws IOException { + List regions = env + .getAssignmentManager().getTableRegions(snapshotTable, false); + List servers = env + .getMasterServices().getServerManager().getOnlineServersList(); + return env.getMasterServices().getLoadBalancer() + .roundRobinAssignment(regions, servers).entrySet().stream() + .map(e -> new SnapshotVerifyProcedure(snapshot, e.getValue(), e.getKey(), regions.size())) + .toArray(SnapshotVerifyProcedure[]::new); + } + + @Override + public void toStringClassDetails(StringBuilder builder) { + builder.append(getClass().getName()) + .append(", id=").append(getProcId()) + .append(", snapshot=").append(ClientSnapshotDescriptionUtils.toString(snapshot)); + } + + public SnapshotDescription getSnapshotDesc() { + return snapshot; + } + + @Override + protected void afterReplay(MasterProcedureEnv env) { + try { + prepareSnapshot(env); + } catch (IOException e) { + LOG.error("Failed replaying {}, mark procedure as failed", this, e); + setFailure("master-snapshot", e); + } + } + + public SnapshotDescription getSnapshot() { + return snapshot; + } + + synchronized void reportSnapshotCorrupted(SnapshotVerifyProcedure subProc, + CorruptedSnapshotException e) { + if (isFailed()) { + LOG.warn("Sub procedure {} reports snapshot corrupted while Parent procedure {} has been" + + " marked as failed before.", subProc, this, e); + } else { + LOG.warn("Sub procedure {} reports snapshot corrupted. Mark {} as failed.", subProc, this, e); + setFailure("master-snapshot", e); + } + } +} + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java new file mode 100644 index 000000000000..d6c3ee9e81b7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import com.google.errorprone.annotations.RestrictedApi; +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.assignment.RegionStateNode; +import org.apache.hadoop.hbase.master.assignment.RegionStates; +import org.apache.hadoop.hbase.master.assignment.ServerState; +import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.regionserver.SnapshotRegionCallable; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotRegionProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; + +/** + * A remote procedure which is used to send region snapshot request to region server. + * The basic logic of SnapshotRegionProcedure is similar like {@link ServerRemoteProcedure}, + * only with a little difference, when {@link FailedRemoteDispatchException} was thrown, + * SnapshotRegionProcedure will sleep some time and continue retrying until success. + */ +@InterfaceAudience.Private +public class SnapshotRegionProcedure extends Procedure + implements TableProcedureInterface, RemoteProcedure { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotRegionProcedure.class); + + private SnapshotDescription snapshot; + private ProcedureEvent event; + private RegionInfo region; + private boolean dispatched; + private boolean succ; + private RetryCounter retryCounter; + + public SnapshotRegionProcedure() { + } + + public SnapshotRegionProcedure(SnapshotDescription snapshot, RegionInfo region) { + this.snapshot = snapshot; + this.region = region; + } + + @Override + protected LockState acquireLock(final MasterProcedureEnv env) { + if (env.getProcedureScheduler().waitRegions(this, getTableName(), region)) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureScheduler().wakeRegions(this, getTableName(), region); + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return false; + } + + @Override + public Optional remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { + return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), + SnapshotRegionCallable.class, MasterProcedureProtos.SnapshotRegionParameter.newBuilder() + .setRegion(ProtobufUtil.toRegionInfo(region)).setSnapshot(snapshot).build().toByteArray())); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOException e) { + complete(env, e); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException e) { + complete(env, e); + } + + // keep retrying until success + private void complete(MasterProcedureEnv env, Throwable error) { + if (isFinished()) { + LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); + return; + } + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + if (error == null) { + LOG.info("finish snapshot {} on region {}", snapshot.getName(), region.getEncodedName()); + succ = true; + } + + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public TableName getTableName() { + return region.getTable(); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.REGION_SNAPSHOT; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + dispatched = false; + } + + RegionStates regionStates = env.getAssignmentManager().getRegionStates(); + RegionStateNode regionNode = regionStates.getRegionStateNode(region); + regionNode.lock(); + try { + if (regionNode.getProcedure() != null) { + setTimeoutForSuspend(env, String.format("region %s has a TRSP attached %s", + region.getRegionNameAsString(), regionNode.getProcedure())); + throw new ProcedureSuspendedException(); + } + if (!regionNode.getState().matches(RegionState.State.OPEN)) { + setTimeoutForSuspend(env, String.format("region state of %s is %s", + region.getRegionNameAsString(), regionNode.getState())); + throw new ProcedureSuspendedException(); + } + ServerName targetServer = regionNode.getRegionLocation(); + if (targetServer == null) { + setTimeoutForSuspend(env, String.format("target server of region %s is null", + region.getRegionNameAsString())); + throw new ProcedureSuspendedException(); + } + ServerState serverState = regionStates.getServerNode(targetServer).getState(); + if (serverState != ServerState.ONLINE) { + setTimeoutForSuspend(env, String.format("target server of region %s %s is in state %s", + region.getRegionNameAsString(), targetServer, serverState)); + throw new ProcedureSuspendedException(); + } + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } catch (FailedRemoteDispatchException e) { + setTimeoutForSuspend(env, "Failed send request to " + targetServer); + throw new ProcedureSuspendedException(); + } + } finally { + regionNode.unlock(); + } + } + + @Override + protected void rollback(MasterProcedureEnv env) { + // nothing to rollback + } + + private void setTimeoutForSuspend(MasterProcedureEnv env, String reason) { + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.warn("{} can not run currently because {}, wait {} ms to retry", this, reason, backoff); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + SnapshotRegionProcedureStateData.Builder builder = + SnapshotRegionProcedureStateData.newBuilder(); + builder.setSnapshot(snapshot); + builder.setRegion(ProtobufUtil.toRegionInfo(region)); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + SnapshotRegionProcedureStateData data = serializer.deserialize( + SnapshotRegionProcedureStateData.class); + this.snapshot = data.getSnapshot(); + this.region = ProtobufUtil.toRegionInfo(data.getRegion()); + } + + @Override + public String getProcName() { + return getClass().getSimpleName() + " " + region.getEncodedName(); + } + + @Override + protected void toStringClassDetails(StringBuilder builder) { + builder.append(getProcName()); + } + + @Override + protected boolean waitInitialized(MasterProcedureEnv env) { + return env.waitInitialized(this); + } + + public RegionInfo getRegion() { + return region; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*(/src/test/.*|TestSnapshotProcedure).java") + boolean inRetrying() { + return retryCounter != null; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java new file mode 100644 index 000000000000..6e28dbd9e2b1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java @@ -0,0 +1,206 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.SnapshotVerifyCallable; +import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; + +/** + * A remote procedure which is used to send verify snapshot request to region server. + */ +@InterfaceAudience.Private +public class SnapshotVerifyProcedure + extends ServerRemoteProcedure implements ServerProcedureInterface { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class); + private SnapshotDescription snapshot; + private List regions; + private int expectedNumRegion; + + public SnapshotVerifyProcedure() {} + + public SnapshotVerifyProcedure(SnapshotDescription snapshot, List regions, + ServerName targetServer, int expectedNumRegion) { + this.targetServer = targetServer; + this.snapshot = snapshot; + this.regions = regions; + this.expectedNumRegion = expectedNumRegion; + } + + @Override + protected void complete(MasterProcedureEnv env, Throwable error) { + if (error != null) { + Throwable realError = error.getCause(); + if (realError instanceof CorruptedSnapshotException) { + getParent(env).ifPresent(p -> p.reportSnapshotCorrupted( + this, (CorruptedSnapshotException) realError)); + this.succ = true; + } else { + if (realError instanceof DoNotRetryIOException) { + newTargetServer(env).ifPresent(s -> { + LOG.warn("Failed send request to {}, try new target server {}", + targetServer, s, realError); + this.targetServer = s; + this.dispatched = false; + }); + } + this.succ = false; + } + } else { + this.succ = true; + } + } + + @Override + protected synchronized Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + // Regardless of success or failure, ServerRemoteProcedure returns and leaves the parent + // procedure to find out and handle failures. In this case, SnapshotProcedure doesn't + // care which region server the task is assigned to, so we push down the choice of + // new target server to SnapshotVerifyProcedure. + Procedure[] res = super.execute(env); + if (res == null) { + if (succ) { + // remote task has finished + return null; + } else { + // can not send request to remote server, we will try another server + newTargetServer(env).ifPresent(s -> { + LOG.warn("retry {} on new target server {}", this, s); + this.targetServer = s; + dispatched = false; + }); + throw new ProcedureYieldException(); + } + } else { + return res; + } + } + + @Override + protected void rollback(MasterProcedureEnv env) { + // nothing to rollback + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + SnapshotVerifyProcedureStateData.Builder builder = + SnapshotVerifyProcedureStateData.newBuilder(); + builder.setTargetServer(ProtobufUtil.toServerName(targetServer)); + builder.setSnapshot(snapshot); + for (RegionInfo region : regions) { + builder.addRegion(ProtobufUtil.toRegionInfo(region)); + } + builder.setExpectedNumRegion(expectedNumRegion); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + SnapshotVerifyProcedureStateData data = + serializer.deserialize(SnapshotVerifyProcedureStateData.class); + this.targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + this.snapshot = data.getSnapshot(); + this.regions = data.getRegionList().stream() + .map(ProtobufUtil::toRegionInfo).collect(Collectors.toList()); + this.expectedNumRegion = data.getExpectedNumRegion(); + } + + @Override + public Optional remoteCallBuild( + MasterProcedureEnv env, ServerName serverName) { + SnapshotVerifyParameter.Builder builder = SnapshotVerifyParameter.newBuilder(); + builder.setSnapshot(snapshot); + for (RegionInfo region : regions) { + builder.addRegion(ProtobufUtil.toRegionInfo(region)); + } + builder.setExpectedNumRegion(expectedNumRegion); + return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), + SnapshotVerifyCallable.class, builder.build().toByteArray())); + } + + @Override + public ServerName getServerName() { + return targetServer; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.VERIFY_SNAPSHOT; + } + + private Optional getParent(MasterProcedureEnv env) { + SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor() + .getProcedure(SnapshotProcedure.class, getParentProcId()); + if (parent == null) { + LOG.warn("Parent procedure of {} does not exist. Is it created without " + + "a SnapshotProcedure as parent?", this); + return Optional.empty(); + } else { + return Optional.of(parent); + } + } + + private Optional newTargetServer(MasterProcedureEnv env) { + List onlineServers = + env.getMasterServices().getServerManager().getOnlineServersList(); + onlineServers.remove(targetServer); + Collections.shuffle(onlineServers); + if (onlineServers.isEmpty()) { + LOG.warn("There is no online server, maybe the cluster is shutting down."); + return Optional.empty(); + } else { + return Optional.of(onlineServers.get(0)); + } + } + + @Override + protected void toStringClassDetails(StringBuilder builder) { + builder.append(getClass().getSimpleName()) + .append(", snapshot=").append(snapshot.getName()) + .append(", targetServer=").append(targetServer); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java index 7e47586ffd92..d7d8d380b1f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java @@ -34,7 +34,7 @@ public interface TableProcedureInterface { public static final TableName DUMMY_NAMESPACE_TABLE_NAME = TableName.NAMESPACE_TABLE_NAME; public enum TableOperationType { - CREATE, DELETE, DISABLE, EDIT, ENABLE, READ, + CREATE, DELETE, DISABLE, EDIT, ENABLE, READ, SNAPSHOT, REGION_SNAPSHOT, REGION_EDIT, REGION_SPLIT, REGION_MERGE, REGION_ASSIGN, REGION_UNASSIGN, REGION_GC, MERGED_REGIONS_GC/* region operations */ } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java index 6fb147e1d6a7..ece22ec6e1e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java @@ -56,6 +56,8 @@ private static boolean requireTableExclusiveLock(TableProcedureInterface proc) { // we allow concurrent edit on the ns family in meta table return !proc.getTableName().equals(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME); case READ: + case SNAPSHOT: + case REGION_SNAPSHOT: return false; // region operations are using the shared-lock on the table // and then they will grab an xlock on the region. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java index 90602edc5a09..0a7f1f16d81c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java @@ -19,59 +19,20 @@ import java.io.IOException; import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; -import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; -import org.apache.hadoop.hbase.snapshot.SnapshotManifest; -import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; -import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotVerifyUtil; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; -import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; -/** - * General snapshot verification on the master. - *

    - * This is a light-weight verification mechanism for all the files in a snapshot. It doesn't - * attempt to verify that the files are exact copies (that would be paramount to taking the - * snapshot again!), but instead just attempts to ensure that the files match the expected - * files and are the same length. - *

    - * Taking an online snapshots can race against other operations and this is an last line of - * defense. For example, if meta changes between when snapshots are taken not all regions of a - * table may be present. This can be caused by a region split (daughters present on this scan, - * but snapshot took parent), or move (snapshots only checks lists of region servers, a move could - * have caused a region to be skipped or done twice). - *

    - * Current snapshot files checked: - *

      - *
    1. SnapshotDescription is readable
    2. - *
    3. Table info is readable
    4. - *
    5. Regions
    6. - *
    - *
      - *
    • Matching regions in the snapshot as currently in the table
    • - *
    • {@link RegionInfo} matches the current and stored regions
    • - *
    • All referenced hfiles have valid names
    • - *
    • All the hfiles are present (either in .archive directory in the region)
    • - *
    • All recovered.edits files are present (by name) and have the correct file size
    • - *
    - */ + @InterfaceAudience.Private @InterfaceStability.Unstable public final class MasterSnapshotVerifier { @@ -88,7 +49,7 @@ public final class MasterSnapshotVerifier { * @param workingDirFs the file system containing the temporary snapshot information */ public MasterSnapshotVerifier(MasterServices services, - SnapshotDescription snapshot, FileSystem workingDirFs) { + SnapshotDescription snapshot, FileSystem workingDirFs) { this.workingDirFs = workingDirFs; this.services = services; this.snapshot = snapshot; @@ -103,120 +64,13 @@ public MasterSnapshotVerifier(MasterServices services, * @throws CorruptedSnapshotException if the snapshot is invalid * @throws IOException if there is an unexpected connection issue to the filesystem */ - public void verifySnapshot(Path snapshotDir, Set snapshotServers) - throws CorruptedSnapshotException, IOException { - SnapshotManifest manifest = SnapshotManifest.open(services.getConfiguration(), workingDirFs, - snapshotDir, snapshot); - // verify snapshot info matches - verifySnapshotDescription(snapshotDir); - - // check that tableinfo is a valid table description - verifyTableInfo(manifest); - - // check that each region is valid - verifyRegions(manifest); - } - - /** - * Check that the snapshot description written in the filesystem matches the current snapshot - * @param snapshotDir snapshot directory to check - */ - private void verifySnapshotDescription(Path snapshotDir) throws CorruptedSnapshotException { - SnapshotDescription found = SnapshotDescriptionUtils.readSnapshotInfo(workingDirFs, - snapshotDir); - if (!this.snapshot.equals(found)) { - throw new CorruptedSnapshotException( - "Snapshot read (" + found + ") doesn't equal snapshot we ran (" + snapshot + ").", - ProtobufUtil.createSnapshotDesc(snapshot)); - } - } - - /** - * Check that the table descriptor for the snapshot is a valid table descriptor - * @param manifest snapshot manifest to inspect - */ - private void verifyTableInfo(final SnapshotManifest manifest) throws IOException { - TableDescriptor htd = manifest.getTableDescriptor(); - if (htd == null) { - throw new CorruptedSnapshotException("Missing Table Descriptor", - ProtobufUtil.createSnapshotDesc(snapshot)); - } - - if (!htd.getTableName().getNameAsString().equals(snapshot.getTable())) { - throw new CorruptedSnapshotException( - "Invalid Table Descriptor. Expected " + snapshot.getTable() + " name, got " - + htd.getTableName().getNameAsString(), ProtobufUtil.createSnapshotDesc(snapshot)); - } - } - - /** - * Check that all the regions in the snapshot are valid, and accounted for. - * @param manifest snapshot manifest to inspect - * @throws IOException if we can't reach hbase:meta or read the files from the FS - */ - private void verifyRegions(final SnapshotManifest manifest) throws IOException { + public void verifySnapshot() throws CorruptedSnapshotException, IOException { List regions = services.getAssignmentManager().getTableRegions(tableName, false); + // Remove the non-default regions RegionReplicaUtil.removeNonDefaultRegions(regions); - Map regionManifests = manifest.getRegionManifestsMap(); - if (regionManifests == null) { - String msg = "Snapshot " + ClientSnapshotDescriptionUtils.toString(snapshot) + " looks empty"; - LOG.error(msg); - throw new CorruptedSnapshotException(msg); - } - - String errorMsg = ""; - boolean hasMobStore = false; - // the mob region is a dummy region, it's not a real region in HBase. - // the mob region has a special name, it could be found by the region name. - if (regionManifests.get(MobUtils.getMobRegionInfo(tableName).getEncodedName()) != null) { - hasMobStore = true; - } - int realRegionCount = hasMobStore ? regionManifests.size() - 1 : regionManifests.size(); - if (realRegionCount != regions.size()) { - errorMsg = "Regions moved during the snapshot '" + - ClientSnapshotDescriptionUtils.toString(snapshot) + "'. expected=" + - regions.size() + " snapshotted=" + realRegionCount + "."; - LOG.error(errorMsg); - } - - // Verify RegionInfo - for (RegionInfo region : regions) { - SnapshotRegionManifest regionManifest = regionManifests.get(region.getEncodedName()); - if (regionManifest == null) { - // could happen due to a move or split race. - String mesg = " No snapshot region directory found for region:" + region; - if (errorMsg.isEmpty()) errorMsg = mesg; - LOG.error(mesg); - continue; - } - - verifyRegionInfo(region, regionManifest); - } - - if (!errorMsg.isEmpty()) { - throw new CorruptedSnapshotException(errorMsg); - } - - // Verify Snapshot HFiles - // Requires the root directory file system as HFiles are stored in the root directory - SnapshotReferenceUtil.verifySnapshot(services.getConfiguration(), - CommonFSUtils.getRootDirFileSystem(services.getConfiguration()), manifest); - } - - /** - * Verify that the regionInfo is valid - * @param region the region to check - * @param manifest snapshot manifest to inspect - */ - private void verifyRegionInfo(final RegionInfo region, - final SnapshotRegionManifest manifest) throws IOException { - RegionInfo manifestRegionInfo = ProtobufUtil.toRegionInfo(manifest.getRegionInfo()); - if (RegionInfo.COMPARATOR.compare(region, manifestRegionInfo) != 0) { - String msg = "Manifest region info " + manifestRegionInfo + - "doesn't match expected region:" + region; - throw new CorruptedSnapshotException(msg, ProtobufUtil.createSnapshotDesc(snapshot)); - } + SnapshotVerifyUtil.verifySnapshot(services.getConfiguration(), snapshot, tableName, regions, + regions.size()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index cdacff7607aa..21b89f534016 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -35,6 +35,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -49,7 +51,6 @@ import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.executor.ExecutorService; -import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; @@ -59,7 +60,9 @@ import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner; import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure; +import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure; import org.apache.hadoop.hbase.procedure.MasterProcedureManager; import org.apache.hadoop.hbase.procedure.Procedure; import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; @@ -180,6 +183,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable // snapshot using Procedure-V2. private Map restoreTableToProcIdMap = new HashMap<>(); + // SnapshotDescription -> SnapshotProcId + private final Map snapshotToProcIdMap = new HashMap<>(); + private Path rootDir; private ExecutorService executorService; @@ -291,18 +297,41 @@ private List getCompletedSnapshots(Path snapshotDir, boolea } /** - * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed - * snapshot attempts. + * Cleans up any zk-coordinated snapshots in the snapshot/.tmp directory that were left from + * failed snapshot attempts. For unfinished procedure2-coordinated snapshots, keep the working + * directory. * * @throws IOException if we can't reach the filesystem */ private void resetTempDir() throws IOException { - // cleanup any existing snapshots. + Set workingProcedureCoordinatedSnapshotNames = + snapshotToProcIdMap.keySet().stream().map(s -> s.getName()).collect(Collectors.toSet()); + Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, master.getConfiguration()); FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration()); - if (!tmpFs.delete(tmpdir, true)) { - LOG.warn("Couldn't delete working snapshot directory: " + tmpdir); + FileStatus[] workingSnapshotDirs = CommonFSUtils.listStatus(tmpFs, tmpdir); + if (workingSnapshotDirs == null) { + return; + } + for (FileStatus workingSnapshotDir : workingSnapshotDirs) { + String workingSnapshotName = workingSnapshotDir.getPath().getName(); + if (!workingProcedureCoordinatedSnapshotNames.contains(workingSnapshotName)) { + try { + if (tmpFs.delete(workingSnapshotDir.getPath(), true)) { + LOG.info("delete unfinished zk-coordinated snapshot working directory {}", + workingSnapshotDir.getPath()); + } else { + LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}", + workingSnapshotDir.getPath()); + } + } catch (IOException e) { + LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}", + workingSnapshotDir.getPath(), e); + } + } else { + LOG.debug("find working directory of unfinished procedure {}", workingSnapshotName); + } } } @@ -420,10 +449,12 @@ public boolean isSnapshotDone(SnapshotDescription expected) throws IOException { * @return true if there is a snapshot in progress with the same name or on the same * table. */ - synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) { - TableName snapshotTable = TableName.valueOf(snapshot.getTable()); - if (isTakingSnapshot(snapshotTable)) { - return true; + synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot, boolean checkTable) { + if (checkTable) { + TableName snapshotTable = TableName.valueOf(snapshot.getTable()); + if (isTakingSnapshot(snapshotTable, false)) { + return true; + } } Iterator> it = this.snapshotHandlers.entrySet().iterator(); while (it.hasNext()) { @@ -433,6 +464,14 @@ synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) { return true; } } + Iterator> spIt = snapshotToProcIdMap.entrySet().iterator(); + while (spIt.hasNext()) { + Map.Entry entry = spIt.next(); + if (snapshot.getName().equals(entry.getKey().getName()) + && !master.getMasterProcedureExecutor().isFinished(entry.getValue())) { + return true; + } + } return false; } @@ -442,9 +481,35 @@ synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) { * @param tableName name of the table being snapshotted. * @return true if there is a snapshot in progress on the specified table. */ - public boolean isTakingSnapshot(final TableName tableName) { + public synchronized boolean isTakingSnapshot(final TableName tableName) { + return isTakingSnapshot(tableName, false); + } + + /** + * Check to see if the specified table has a snapshot in progress. Since we introduce + * SnapshotProcedure, it is a little bit different from before. For zk-coordinated + * snapshot, we can just consider tables in snapshotHandlers only, but for + * {@link org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure} and + * {@link org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure} we need + * to consider tables in snapshotToProcIdMap also. + * @param tableName name of the table being snapshotted. + * @param checkProcedure true if we should check tables in snapshotToProcIdMap + * @return true if there is a snapshot in progress on the specified table. + */ + public synchronized boolean isTakingSnapshot(TableName tableName, boolean checkProcedure) { SnapshotSentinel handler = this.snapshotHandlers.get(tableName); - return handler != null && !handler.isFinished(); + if (handler != null && !handler.isFinished()) { + return true; + } + if (checkProcedure) { + for (Map.Entry entry : snapshotToProcIdMap.entrySet()) { + if (TableName.valueOf(entry.getKey().getTable()).equals(tableName) + && !master.getMasterProcedureExecutor().isFinished(entry.getValue())) { + return true; + } + } + } + return false; } /** @@ -453,31 +518,10 @@ public boolean isTakingSnapshot(final TableName tableName) { * @param snapshot description of the snapshot we want to start * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot */ - private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot) + public synchronized void prepareWorkingDirectory(SnapshotDescription snapshot) throws HBaseSnapshotException { Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, master.getConfiguration()); - TableName snapshotTable = - TableName.valueOf(snapshot.getTable()); - - // make sure we aren't already running a snapshot - if (isTakingSnapshot(snapshot)) { - SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable); - throw new SnapshotCreationException("Rejected taking " - + ClientSnapshotDescriptionUtils.toString(snapshot) - + " because we are already running another snapshot " - + (handler != null ? ("on the same table " + - ClientSnapshotDescriptionUtils.toString(handler.getSnapshot())) - : "with the same name"), ProtobufUtil.createSnapshotDesc(snapshot)); - } - - // make sure we aren't running a restore on the same table - if (isRestoringTable(snapshotTable)) { - throw new SnapshotCreationException("Rejected taking " - + ClientSnapshotDescriptionUtils.toString(snapshot) - + " because we are already have a restore in progress on the same snapshot."); - } - try { FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration()); // delete the working directory, since we aren't running the snapshot. Likely leftovers @@ -507,7 +551,7 @@ private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot) private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) throws IOException { // setup the snapshot - prepareToTakeSnapshot(snapshot); + prepareWorkingDirectory(snapshot); // set the snapshot to be a disabled snapshot, since the client doesn't know about that snapshot = snapshot.toBuilder().setType(Type.DISABLED).build(); @@ -527,7 +571,7 @@ private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) private synchronized void snapshotEnabledTable(SnapshotDescription snapshot) throws IOException { // setup the snapshot - prepareToTakeSnapshot(snapshot); + prepareWorkingDirectory(snapshot); // Take the snapshot of the enabled table EnabledTableSnapshotHandler handler = @@ -582,7 +626,9 @@ public ReadWriteLock getTakingSnapshotLock() { * @return true to indicate that there're some running snapshots. */ public synchronized boolean isTakingAnySnapshot() { - return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0; + return this.takingSnapshotLock.getReadHoldCount() > 0 + || this.snapshotHandlers.size() > 0 + || this.snapshotToProcIdMap.size() > 0; } /** @@ -601,48 +647,7 @@ public void takeSnapshot(SnapshotDescription snapshot) throws IOException { } private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException { - // check to see if we already completed the snapshot - if (isSnapshotCompleted(snapshot)) { - throw new SnapshotExistsException( - "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.", - ProtobufUtil.createSnapshotDesc(snapshot)); - } - - LOG.debug("No existing snapshot, attempting snapshot..."); - - // stop tracking "abandoned" handlers - cleanupSentinels(); - - // check to see if the table exists - TableDescriptor desc = null; - try { - desc = master.getTableDescriptors().get( - TableName.valueOf(snapshot.getTable())); - } catch (FileNotFoundException e) { - String msg = "Table:" + snapshot.getTable() + " info doesn't exist!"; - LOG.error(msg); - throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot)); - } catch (IOException e) { - throw new SnapshotCreationException( - "Error while geting table description for table " + snapshot.getTable(), e, - ProtobufUtil.createSnapshotDesc(snapshot)); - } - if (desc == null) { - throw new SnapshotCreationException( - "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.", - ProtobufUtil.createSnapshotDesc(snapshot)); - } - SnapshotDescription.Builder builder = snapshot.toBuilder(); - // if not specified, set the snapshot format - if (!snapshot.hasVersion()) { - builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION); - } - RpcServer.getRequestUser().ifPresent(user -> { - if (AccessChecker.isAuthorizationSupported(master.getConfiguration())) { - builder.setOwner(user.getShortName()); - } - }); - snapshot = builder.build(); + TableDescriptor desc = sanityCheckBeforeSnapshot(snapshot, true); // call pre coproc hook MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); @@ -691,6 +696,66 @@ else if (master.getTableStateManager().isTableState(snapshotTable, } } + /** + * Check if the snapshot can be taken. Currently we have some limitations, for zk-coordinated + * snapshot, we don't allow snapshot with same name or taking multiple snapshots of a table at + * the same time, for procedure-coordinated snapshot, we don't allow snapshot with same name. + * @param snapshot description of the snapshot being checked. + * @param checkTable check if the table is already taking a snapshot + * @return the table descriptor of the table + */ + private synchronized TableDescriptor sanityCheckBeforeSnapshot(SnapshotDescription snapshot, + boolean checkTable) throws IOException { + // check to see if we already completed the snapshot + if (isSnapshotCompleted(snapshot)) { + throw new SnapshotExistsException( + "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.", + ProtobufUtil.createSnapshotDesc(snapshot)); + } + LOG.debug("No existing snapshot, attempting snapshot..."); + + // stop tracking "abandoned" handlers + cleanupSentinels(); + + TableName snapshotTable = + TableName.valueOf(snapshot.getTable()); + // make sure we aren't already running a snapshot + if (isTakingSnapshot(snapshot, checkTable)) { + throw new SnapshotCreationException("Rejected taking " + + ClientSnapshotDescriptionUtils.toString(snapshot) + + " because we are already running another snapshot" + + " on the same table or with the same name"); + } + + // make sure we aren't running a restore on the same table + if (isRestoringTable(snapshotTable)) { + throw new SnapshotCreationException("Rejected taking " + + ClientSnapshotDescriptionUtils.toString(snapshot) + + " because we are already have a restore in progress on the same snapshot."); + } + + // check to see if the table exists + TableDescriptor desc = null; + try { + desc = master.getTableDescriptors().get( + TableName.valueOf(snapshot.getTable())); + } catch (FileNotFoundException e) { + String msg = "Table:" + snapshot.getTable() + " info doesn't exist!"; + LOG.error(msg); + throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot)); + } catch (IOException e) { + throw new SnapshotCreationException( + "Error while geting table description for table " + snapshot.getTable(), e, + ProtobufUtil.createSnapshotDesc(snapshot)); + } + if (desc == null) { + throw new SnapshotCreationException( + "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.", + ProtobufUtil.createSnapshotDesc(snapshot)); + } + return desc; + } + /** * Set the handler for the current snapshot *

    @@ -738,6 +803,32 @@ private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOExcep } } + public synchronized long takeSnapshot(SnapshotDescription snapshot, + long nonceGroup, long nonce) throws IOException { + this.takingSnapshotLock.readLock().lock(); + try { + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(master, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + sanityCheckBeforeSnapshot(snapshot, false); + + long procId = submitProcedure(new SnapshotProcedure( + master.getMasterProcedureExecutor().getEnvironment(), snapshot)); + + getMaster().getSnapshotManager().registerSnapshotProcedure(snapshot, procId); + } + + @Override + protected String getDescription() { + return "SnapshotTableProcedure"; + } + }); + } finally { + this.takingSnapshotLock.readLock().unlock(); + } + } + /** * Clone the specified snapshot. * The clone will fail if the destination table has a snapshot or restore in progress. @@ -791,7 +882,7 @@ synchronized long cloneSnapshot(final SnapshotDescription snapshot, TableName tableName = tableDescriptor.getTableName(); // make sure we aren't running a snapshot on the same table - if (isTakingSnapshot(tableName)) { + if (isTakingSnapshot(tableName, true)) { throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName); } @@ -926,7 +1017,7 @@ private synchronized long restoreSnapshot(final SnapshotDescription snapshot, final TableName tableName = tableDescriptor.getTableName(); // make sure we aren't running a snapshot on the same table - if (isTakingSnapshot(tableName)) { + if (isTakingSnapshot(tableName, true)) { throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName); } @@ -1014,6 +1105,7 @@ private synchronized SnapshotSentinel removeSentinelIfFinished( private void cleanupSentinels() { cleanupSentinels(this.snapshotHandlers); cleanupCompletedRestoreInMap(); + cleanupCompletedSnapshotInMap(); } /** @@ -1052,6 +1144,21 @@ private synchronized void cleanupCompletedRestoreInMap() { } } + /** + * Remove the procedures that are marked as finished + */ + private synchronized void cleanupCompletedSnapshotInMap() { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + Iterator> it = snapshotToProcIdMap.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + Long procId = entry.getValue(); + if (procExec.isRunning() && procExec.isFinished(procId)) { + it.remove(); + } + } + } + // // Implementing Stoppable interface // @@ -1207,11 +1314,23 @@ public void initialize(MasterServices master, MetricsMaster metricsMaster) throw this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); this.executorService = master.getExecutorService(); + restoreUnfinishedSnapshotProcedure(); resetTempDir(); snapshotHandlerChoreCleanerTask = scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS); } + private void restoreUnfinishedSnapshotProcedure() { + master.getMasterProcedureExecutor() + .getActiveProceduresNoCopy() + .stream().filter(p -> p instanceof SnapshotProcedure) + .filter(p -> !p.isFinished()).map(p -> (SnapshotProcedure) p) + .forEach(p -> { + registerSnapshotProcedure(p.getSnapshot(), p.getProcId()); + LOG.info("restore unfinished snapshot procedure {}", p); + }); + } + @Override public String getProcedureSignature() { return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION; @@ -1257,4 +1376,10 @@ private SnapshotDescription toSnapshotDescription(ProcedureDescription desc) builder.setType(SnapshotDescription.Type.FLUSH); return builder.build(); } + + public synchronized void registerSnapshotProcedure(SnapshotDescription snapshot, long procId) { + snapshotToProcIdMap.put(snapshot, procId); + LOG.debug("register snapshot={}, snapshot procedure id = {}", + ClientSnapshotDescriptionUtils.toString(snapshot), procId); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java index a6dd3f8e7bb7..a5a08ad9fced 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java @@ -217,7 +217,7 @@ public void process() { // verify the snapshot is valid status.setStatus("Verifying snapshot: " + snapshot.getName()); - verifier.verifySnapshot(this.workingDir, serverNames); + verifier.verifySnapshot(); // complete the snapshot, atomically moving from tmp to .snapshot dir. SnapshotDescriptionUtils.completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7af717bff49e..14879014ded8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1783,6 +1783,10 @@ private void startServices() throws IOException { conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( ExecutorType.RS_CLAIM_REPLICATION_QUEUE).setCorePoolSize(claimReplicationQueueThreads)); + final int rsSnapshotOperationThreads = + conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_SNAPSHOT_OPERATIONS).setCorePoolSize(rsSnapshotOperationThreads)); Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index b13ff64d5b2d..862ef25c29bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -3833,6 +3833,7 @@ public ExecuteProceduresResponse executeProcedures(RpcController controller, if (request.getCloseRegionCount() > 0) { request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); } + if (request.getProcCount() > 0) { request.getProcList().forEach(this::executeProcedures); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java new file mode 100644 index 000000000000..4bfaa67afec2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotRegionParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; + +@InterfaceAudience.Private +public class SnapshotRegionCallable extends BaseRSProcedureCallable { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotRegionCallable.class); + + private SnapshotDescription snapshot; + private RegionInfo regionInfo; + private ForeignExceptionDispatcher monitor; + + @Override + protected void doCall() throws Exception { + HRegion region = rs.getRegion(regionInfo.getEncodedName()); + if (region == null) { + throw new NotServingRegionException( + "snapshot=" + snapshot.getName() + ", region=" + regionInfo.getRegionNameAsString()); + } + LOG.debug("Starting snapshot operation on {}", region); + region.startRegionOperation(Region.Operation.SNAPSHOT); + try { + if (snapshot.getType() == SnapshotDescription.Type.FLUSH) { + boolean succeeded = false; + long readPt = region.getReadPoint(IsolationLevel.READ_COMMITTED); + int retryTimes = rs.getConfiguration().getInt("hbase.snapshot.flush.retryTimes", 3); + for (int i = 0; i < retryTimes; i++) { + HRegion.FlushResult res = region.flush(true); + if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) { + region.waitForFlushes(); + if (region.getMaxFlushedSeqId() >= readPt) { + succeeded = true; + break; + } + } else { + succeeded = true; + break; + } + } + if (!succeeded) { + throw new IOException( + "Unable to complete flush " + regionInfo.getRegionNameAsString() + + " after " + retryTimes + " attempts"); + } + LOG.debug("Snapshotting region {} for {} completed.", region, snapshot.getName()); + region.addRegionToSnapshot(snapshot, monitor); + } + } finally { + LOG.debug("Closing snapshot operation on {}", region); + region.closeRegionOperation(Region.Operation.SNAPSHOT); + } + } + + @Override + protected void initParameter(byte[] parameter) throws Exception { + SnapshotRegionParameter param = SnapshotRegionParameter.parseFrom(parameter); + this.snapshot = param.getSnapshot(); + this.regionInfo = ProtobufUtil.toRegionInfo(param.getRegion()); + this.monitor = new ForeignExceptionDispatcher(snapshot.getName()); + } + + @Override + public EventType getEventType() { + return EventType.RS_SNAPSHOT_REGIONS; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java new file mode 100644 index 000000000000..f080560bf28d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; +import org.apache.hadoop.hbase.snapshot.SnapshotVerifyUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; + +@InterfaceAudience.Private +public class SnapshotVerifyCallable extends BaseRSProcedureCallable { + private SnapshotDescription snapshot; + private List regions; + private int expectedNumRegion; + + @Override + protected void doCall() throws Exception { + Configuration conf = rs.getConfiguration(); + TableName tableName = TableName.valueOf(snapshot.getTable()); + SnapshotVerifyUtil.verifySnapshot(conf, snapshot, tableName, regions, expectedNumRegion); + } + + @Override + protected void initParameter(byte[] parameter) throws Exception { + SnapshotVerifyParameter param = SnapshotVerifyParameter.parseFrom(parameter); + this.snapshot = param.getSnapshot(); + this.regions = param.getRegionList().stream() + .map(ProtobufUtil::toRegionInfo).collect(Collectors.toList()); + this.expectedNumRegion = param.getExpectedNumRegion(); + } + + @Override + public EventType getEventType() { + return EventType.RS_VERIFY_SNAPSHOT; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java index c059792ca68e..e285c689f847 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.PermissionStorage; import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; import org.apache.hadoop.hbase.security.access.UserPermission; @@ -47,7 +49,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; /** @@ -296,15 +297,15 @@ public static SnapshotDescription validate(SnapshotDescription snapshot, Configu "Descriptor doesn't apply to a table, so we can't build it."); } + SnapshotDescription.Builder builder = snapshot.toBuilder(); + // set the creation time, if one hasn't been set long time = snapshot.getCreationTime(); if (time == SnapshotDescriptionUtils.NO_SNAPSHOT_START_TIME_SPECIFIED) { time = EnvironmentEdgeManager.currentTime(); LOG.debug("Creation time not specified, setting to:" + time + " (current time:" + EnvironmentEdgeManager.currentTime() + ")."); - SnapshotDescription.Builder builder = snapshot.toBuilder(); builder.setCreationTime(time); - snapshot = builder.build(); } long ttl = snapshot.getTtl(); @@ -318,9 +319,22 @@ public static SnapshotDescription validate(SnapshotDescription snapshot, Configu defaultSnapshotTtl); } ttl = defaultSnapshotTtl; + builder.setTtl(ttl); } - SnapshotDescription.Builder builder = snapshot.toBuilder(); - builder.setTtl(ttl); + + if (!snapshot.hasVersion()) { + builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION); + LOG.debug("Snapshot {} VERSION not specified, setting to {}", snapshot.getName(), + SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION); + } + + RpcServer.getRequestUser().ifPresent(user -> { + if (AccessChecker.isAuthorizationSupported(conf)) { + builder.setOwner(user.getShortName()); + LOG.debug("Set {} as owner of Snapshot", user.getShortName()); + } + }); + snapshot = builder.build(); // set the acl to snapshot if security feature is enabled. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java index b6d3c4893660..d097f38e2e30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java @@ -178,6 +178,17 @@ public void storeFile(final RegionInfo regionInfo, final String family, }); } + /** + * Verify the validity of the snapshot. + * + * @param visitor user-specified store file visitor + */ + public static void verifySnapshot(final Configuration conf, final FileSystem fs, + final SnapshotManifest manifest, final StoreFileVisitor visitor) throws IOException { + concurrentVisitReferencedFiles(conf, fs, manifest, "VerifySnapshot", visitor); + } + + public static void concurrentVisitReferencedFiles(final Configuration conf, final FileSystem fs, final SnapshotManifest manifest, final String desc, final StoreFileVisitor visitor) throws IOException { @@ -249,7 +260,7 @@ public static void concurrentVisitReferencedFiles(final Configuration conf, fina * @throws CorruptedSnapshotException if the snapshot is corrupted * @throws IOException if an error occurred while scanning the directory */ - private static void verifyStoreFile(final Configuration conf, final FileSystem fs, + public static void verifyStoreFile(final Configuration conf, final FileSystem fs, final Path snapshotDir, final SnapshotDescription snapshot, final RegionInfo regionInfo, final String family, final SnapshotRegionManifest.StoreFile storeFile) throws IOException { TableName table = TableName.valueOf(snapshot.getTable()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotVerifyUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotVerifyUtil.java new file mode 100644 index 000000000000..8ee3158b51c7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotVerifyUtil.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.snapshot; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; + +/** + * General snapshot verification. + *

    + * This is a light-weight verification mechanism for all the files in a snapshot. It doesn't + * attempt to verify that the files are exact copies (that would be paramount to taking the + * snapshot again!), but instead just attempts to ensure that the files match the expected + * files and are the same length. + *

    + * Taking an online snapshots can race against other operations and this is an last line of + * defense. For example, if meta changes between when snapshots are taken not all regions of a + * table may be present. This can be caused by a region split (daughters present on this scan, + * but snapshot took parent), or move (snapshots only checks lists of region servers, a move could + * have caused a region to be skipped or done twice). + *

    + * Current snapshot files checked: + *

      + *
    1. SnapshotDescription is readable
    2. + *
    3. Table info is readable
    4. + *
    5. Regions
    6. + *
    + *
      + *
    • Matching regions in the snapshot as currently in the table
    • + *
    • {@link RegionInfo} matches the current and stored regions
    • + *
    • All referenced hfiles have valid names
    • + *
    • All the hfiles are present (either in .archive directory in the region)
    • + *
    • All recovered.edits files are present (by name) and have the correct file size
    • + *
    + */ + +@InterfaceAudience.Private +public final class SnapshotVerifyUtil { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotVerifyUtil.class); + + private SnapshotVerifyUtil() {} + + /** + * Check that the snapshot description written in the filesystem matches the current snapshot + * @param conf configuration of service + * @param snapshot the snapshot need to be verified + * @param tableName the table of snapshot + * @param regions the regions whose region info and store files need to be verified. If we use + * master to verify snapshot, this will be the whole regions of table. If we use + * SnapshotVerifyProcedure to verify snapshot, this will be part of the whole + * regions. + * @param expectedNumRegion total num region of table taking snapshot, both include online + * regions and offline regions + */ + public static void verifySnapshot(Configuration conf, SnapshotDescription snapshot, + TableName tableName, List regions, int expectedNumRegion) throws IOException { + Path rootDir = CommonFSUtils.getRootDir(conf); + Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf); + FileSystem workingDirFS = workingDir.getFileSystem(conf); + SnapshotManifest manifest = SnapshotManifest.open(conf, workingDirFS, workingDir, snapshot); + + // verify snapshot info matches + verifySnapshotDescription(workingDirFS, workingDir, snapshot); + + // check that tableinfo is a valid table description + verifyTableInfo(manifest, snapshot); + + // check that each region is valid + verifyRegions(manifest, regions, snapshot, tableName, expectedNumRegion); + + // check that each store file is valid + verifyStoreFiles(conf, manifest, regions, CommonFSUtils.getRootDirFileSystem(conf), + snapshot, workingDir); + } + + /** + * Check that the snapshot description written in the filesystem matches the current snapshot + * @param snapshotDir snapshot directory to check + */ + private static void verifySnapshotDescription(FileSystem fs, Path snapshotDir, + SnapshotDescription snapshot) throws CorruptedSnapshotException { + SnapshotDescription found = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + if (!snapshot.equals(found)) { + throw new CorruptedSnapshotException( + "Snapshot read (" + found + ") doesn't equal snapshot we ran (" + snapshot + ").", + ProtobufUtil.createSnapshotDesc(snapshot)); + } + } + + /** + * Check that the table descriptor for the snapshot is a valid table descriptor + * @param manifest snapshot manifest to inspect + */ + private static void verifyTableInfo(SnapshotManifest manifest, + SnapshotDescription snapshot) throws IOException { + TableDescriptor htd = manifest.getTableDescriptor(); + if (htd == null) { + throw new CorruptedSnapshotException("Missing Table Descriptor", + ProtobufUtil.createSnapshotDesc(snapshot)); + } + + if (!htd.getTableName().getNameAsString().equals(snapshot.getTable())) { + throw new CorruptedSnapshotException( + "Invalid Table Descriptor. Expected " + snapshot.getTable() + " name, got " + + htd.getTableName().getNameAsString(), ProtobufUtil.createSnapshotDesc(snapshot)); + } + } + + /** + * Check that all the regions in the snapshot are valid, and accounted for. + * @param manifest snapshot manifest to inspect + */ + private static void verifyRegions(SnapshotManifest manifest, List regions, + SnapshotDescription snapshot, TableName tableName, int expectedNumRegion) + throws CorruptedSnapshotException { + Map regionManifests = manifest.getRegionManifestsMap(); + if (regionManifests == null) { + throw new CorruptedSnapshotException("Snapshot " + + ClientSnapshotDescriptionUtils.toString(snapshot) + " looks empty"); + } + + // Verify Region Count + int realRegionCount = regionManifests.size(); + if (regionManifests.get(MobUtils.getMobRegionInfo(tableName).getEncodedName()) != null) { + // the mob region is a dummy region, it's not a real region in HBase. + // the mob region has a special name, it could be found by the region name. + realRegionCount --; + } + if (realRegionCount != expectedNumRegion) { + throw new CorruptedSnapshotException("number of region didn't match for snapshot '" + + ClientSnapshotDescriptionUtils.toString(snapshot) + "', expected=" + + expectedNumRegion + ", snapshotted=" + realRegionCount); + } + + // Verify RegionInfo + for (RegionInfo region : regions) { + SnapshotRegionManifest regionManifest = regionManifests.get(region.getEncodedName()); + if (regionManifest == null) { + LOG.warn("No snapshot region directory found for {}", region.getRegionNameAsString()); + continue; + } + verifyRegionInfo(region, snapshot, regionManifest); + } + } + + /** + * Verify that the regionInfo is valid + * @param region the region to check + * @param manifest snapshot manifest to inspect + */ + private static void verifyRegionInfo(final RegionInfo region, final SnapshotDescription snapshot, + final SnapshotRegionManifest manifest) throws CorruptedSnapshotException { + RegionInfo manifestRegionInfo = ProtobufUtil.toRegionInfo(manifest.getRegionInfo()); + if (RegionInfo.COMPARATOR.compare(region, manifestRegionInfo) != 0) { + String msg = "Manifest region info " + manifestRegionInfo + + "doesn't match expected region:" + region; + throw new CorruptedSnapshotException(msg, ProtobufUtil.createSnapshotDesc(snapshot)); + } + } + + /** + * Verify that store files are valid + */ + private static void verifyStoreFiles(final Configuration conf, final SnapshotManifest manifest, + final List regions, final FileSystem fs, final SnapshotDescription snapshot, + final Path snapshotDir) throws IOException { + // Verify Snapshot HFiles + // Requires the root directory file system as HFiles are stored in the root directory + SnapshotReferenceUtil.verifySnapshot(conf, CommonFSUtils.getRootDirFileSystem(conf), manifest, + new SnapshotReferenceUtil.StoreFileVisitor() { + @Override + public void storeFile(RegionInfo regionInfo, String familyName, + SnapshotRegionManifest.StoreFile storeFile) throws IOException { + if (regions.contains(regionInfo)) { + SnapshotReferenceUtil.verifyStoreFile(conf, fs, snapshotDir, snapshot, + regionInfo, familyName, storeFile); + } + } + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedure.java new file mode 100644 index 000000000000..5889c0983c1b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedure.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.SnapshotDescription; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.internal.stubbing.answers.AnswersWithDelay; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestSnapshotProcedure { + private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotProcedure.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSnapshotProcedure.class); + + private static HBaseTestingUtil TEST_UTIL; + private HMaster master; + private TableName TABLE_NAME; + private byte[] CF; + private String SNAPSHOT_NAME; + private SnapshotDescription snapshot; + private SnapshotProtos.SnapshotDescription snapshotProto; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + Configuration config = TEST_UTIL.getConfiguration(); + config.setBoolean("hbase.snapshot.zk.coordinated", false); + // using SnapshotVerifyProcedure to verify snapshot + config.setInt("hbase.snapshot.remote.verify.threshold", 1); + config.setInt(HConstants.MASTER_INFO_PORT, 8080); + // delay dispatch so that we can do something, for example kill a target server + config.setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 10000); + config.setInt(RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, 128); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("SPTestTable")); + CF = Bytes.toBytes("cf"); + SNAPSHOT_NAME = "SnapshotProcedureTest"; + snapshot = new SnapshotDescription(SNAPSHOT_NAME, TABLE_NAME, SnapshotType.FLUSH); + snapshotProto = ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot); + snapshotProto = SnapshotDescriptionUtils.validate(snapshotProto, master.getConfiguration()); + final byte[][] splitKeys = new RegionSplitter.HexStringSplit().split(10); + Table table = TEST_UTIL.createTable(TABLE_NAME, CF, splitKeys); + TEST_UTIL.loadTable(table, CF, false); + } + + @Test + public void testSimpleSnapshotTable() throws Exception { + TEST_UTIL.getAdmin().snapshotTable(snapshot); + SnapshotTestingUtils.assertOneSnapshotThatMatches(TEST_UTIL.getAdmin(), snapshotProto); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + } + + @Test + public void testMasterRestart() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + SnapshotProcedure sp = new SnapshotProcedure(env, snapshotProto); + SnapshotProcedure spySp = getDelayedOnSpecificStateSnapshotProcedure(sp, + procExec.getEnvironment(), SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); + + long procId = procExec.submitProcedure(spySp); + + TEST_UTIL.waitFor(2000, () -> env.getMasterServices().getProcedures() + .stream().map(Procedure::getProcId).collect(Collectors.toList()).contains(procId)); + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 30000); + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + + master = TEST_UTIL.getHBaseCluster().getMaster(); + assertTrue(master.getSnapshotManager().isTakingAnySnapshot()); + assertTrue(master.getSnapshotManager().isTakingSnapshot(TABLE_NAME, true)); + + List unfinishedProcedures = master + .getMasterProcedureExecutor().getProcedures().stream() + .filter(p -> p instanceof SnapshotProcedure) + .filter(p -> !p.isFinished()).map(p -> (SnapshotProcedure) p) + .collect(Collectors.toList()); + assertEquals(unfinishedProcedures.size(), 1); + long newProcId = unfinishedProcedures.get(0).getProcId(); + assertEquals(procId, newProcId); + + ProcedureTestingUtility.waitProcedure(master.getMasterProcedureExecutor(), newProcId); + assertFalse(master.getSnapshotManager().isTakingSnapshot(TABLE_NAME, true)); + + List snapshots + = master.getSnapshotManager().getCompletedSnapshots(); + assertEquals(1, snapshots.size()); + assertEquals(SNAPSHOT_NAME, snapshots.get(0).getName()); + assertEquals(TABLE_NAME, TableName.valueOf(snapshots.get(0).getTable())); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + } + + @Test + public void testRegionServerCrashWhileTakingSnapshot() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + SnapshotProcedure sp = new SnapshotProcedure(env, snapshotProto); + long procId = procExec.submitProcedure(sp); + + SnapshotRegionProcedure snp = waitProcedureRunnableAndGetFirst( + SnapshotRegionProcedure.class, 60000); + ServerName targetServer = env.getAssignmentManager().getRegionStates() + .getRegionStateNode(snp.getRegion()).getRegionLocation(); + TEST_UTIL.getHBaseCluster().killRegionServer(targetServer); + + TEST_UTIL.waitFor(60000, () -> snp.inRetrying()); + ProcedureTestingUtility.waitProcedure(procExec, procId); + + SnapshotTestingUtils.assertOneSnapshotThatMatches(TEST_UTIL.getAdmin(), snapshotProto); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + } + + @Test + public void testRegionServerCrashWhileVerifyingSnapshot() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + SnapshotProcedure sp = new SnapshotProcedure(env, snapshotProto); + long procId = procExec.submitProcedure(sp); + + SnapshotVerifyProcedure svp = waitProcedureRunnableAndGetFirst( + SnapshotVerifyProcedure.class, 60000); + ServerName previousTargetServer = svp.getServerName(); + + HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(previousTargetServer); + TEST_UTIL.getHBaseCluster().killRegionServer(rs.getServerName()); + TEST_UTIL.waitFor(60000, () -> svp.getServerName() != previousTargetServer); + ProcedureTestingUtility.waitProcedure(procExec, procId); + + SnapshotTestingUtils.assertOneSnapshotThatMatches(TEST_UTIL.getAdmin(), snapshotProto); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + } + + public > T waitProcedureRunnableAndGetFirst( + Class clazz, long timeout) throws IOException { + TEST_UTIL.waitFor(timeout, () -> master.getProcedures().stream() + .anyMatch(clazz::isInstance)); + Optional procOpt = master.getMasterProcedureExecutor().getProcedures().stream() + .filter(clazz::isInstance).map(clazz::cast).findFirst(); + assertTrue(procOpt.isPresent()); + return procOpt.get(); + } + + @Test(expected = org.apache.hadoop.hbase.snapshot.SnapshotCreationException.class) + public void testClientTakingTwoSnapshotOnSameTable() throws Exception { + Thread first = new Thread("first-client") { + @Override + public void run() { + try { + TEST_UTIL.getAdmin().snapshotTable(snapshot); + } catch (IOException e) { + LOG.error("first client failed taking snapshot", e); + fail("first client failed taking snapshot"); + } + } + }; + first.start(); + Thread.sleep(1000); + // we don't allow different snapshot with same name + SnapshotDescription snapshotWithSameName = + new SnapshotDescription(SNAPSHOT_NAME, TABLE_NAME, SnapshotType.SKIPFLUSH); + TEST_UTIL.getAdmin().snapshotTable(snapshotWithSameName); + } + + @Test(expected = org.apache.hadoop.hbase.snapshot.SnapshotCreationException.class) + public void testClientTakeSameSnapshotTwice() throws IOException, InterruptedException { + Thread first = new Thread("first-client") { + @Override + public void run() { + try { + TEST_UTIL.getAdmin().snapshotTable(snapshot); + } catch (IOException e) { + LOG.error("first client failed taking snapshot", e); + fail("first client failed taking snapshot"); + } + } + }; + first.start(); + Thread.sleep(1000); + TEST_UTIL.getAdmin().snapshotTable(snapshot); + } + + @Test + public void testTakeZkCoordinatedSnapshotAndProcedureCoordinatedSnapshotBoth() throws Exception { + String newSnapshotName = SNAPSHOT_NAME + "_2"; + Thread first = new Thread("procedure-snapshot") { + @Override + public void run() { + try { + TEST_UTIL.getAdmin().snapshotTable(snapshot); + } catch (IOException e) { + LOG.error("procedure snapshot failed", e); + fail("procedure snapshot failed"); + } + } + }; + first.start(); + Thread.sleep(1000); + + SnapshotManager sm = master.getSnapshotManager(); + TEST_UTIL.waitFor(2000,50, () -> !sm.isTakingSnapshot(TABLE_NAME) + && sm.isTakingSnapshot(TABLE_NAME, true)); + + TEST_UTIL.getConfiguration().setBoolean("hbase.snapshot.zk.coordinated", true); + SnapshotDescription snapshotOnSameTable = + new SnapshotDescription(newSnapshotName, TABLE_NAME, SnapshotType.SKIPFLUSH); + SnapshotProtos.SnapshotDescription snapshotOnSameTableProto = ProtobufUtil + .createHBaseProtosSnapshotDesc(snapshotOnSameTable); + Thread second = new Thread("zk-snapshot") { + @Override + public void run() { + try { + TEST_UTIL.getAdmin().snapshot(snapshotOnSameTable); + } catch (IOException e) { + LOG.error("zk snapshot failed", e); + fail("zk snapshot failed"); + } + } + }; + second.start(); + + TEST_UTIL.waitFor(2000, () -> sm.isTakingSnapshot(TABLE_NAME)); + TEST_UTIL.waitFor(60000, () -> sm.isSnapshotDone(snapshotOnSameTableProto) + && !sm.isTakingAnySnapshot()); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotOnSameTableProto, TABLE_NAME, CF); + } + + @Test + public void testRunningTowSnapshotProcedureOnSameTable() throws Exception { + String newSnapshotName = SNAPSHOT_NAME + "_2"; + SnapshotProtos.SnapshotDescription snapshotProto2 = SnapshotProtos.SnapshotDescription + .newBuilder(snapshotProto).setName(newSnapshotName).build(); + + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + + SnapshotProcedure sp1 = new SnapshotProcedure(env, snapshotProto); + SnapshotProcedure sp2 = new SnapshotProcedure(env, snapshotProto2); + SnapshotProcedure spySp1 = getDelayedOnSpecificStateSnapshotProcedure(sp1, + procExec.getEnvironment(), SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); + SnapshotProcedure spySp2 = getDelayedOnSpecificStateSnapshotProcedure(sp2, + procExec.getEnvironment(), SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); + + long procId1 = procExec.submitProcedure(spySp1); + long procId2 = procExec.submitProcedure(spySp2); + TEST_UTIL.waitFor(2000, () -> env.getMasterServices().getProcedures() + .stream().map(Procedure::getProcId).collect(Collectors.toList()) + .containsAll(Arrays.asList(procId1, procId2))); + + assertFalse(procExec.isFinished(procId1)); + assertFalse(procExec.isFinished(procId2)); + + ProcedureTestingUtility.waitProcedure(master.getMasterProcedureExecutor(), procId1); + ProcedureTestingUtility.waitProcedure(master.getMasterProcedureExecutor(), procId2); + + List snapshots = + master.getSnapshotManager().getCompletedSnapshots(); + assertEquals(2, snapshots.size()); + snapshots.sort(Comparator.comparing(SnapshotProtos.SnapshotDescription::getName)); + assertEquals(SNAPSHOT_NAME, snapshots.get(0).getName()); + assertEquals(newSnapshotName, snapshots.get(1).getName()); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto2, TABLE_NAME, CF); + } + + + private SnapshotProcedure getDelayedOnSpecificStateSnapshotProcedure( + SnapshotProcedure sp, MasterProcedureEnv env, SnapshotState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + SnapshotProcedure spySp = Mockito.spy(sp); + Mockito.doAnswer(new AnswersWithDelay(60000, new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return invocation.callRealMethod(); + } + })).when(spySp).executeFromState(env, state); + return spySp; + } + + @After + public void teardown() throws Exception { + if (this.master != null) { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( + master.getMasterProcedureExecutor(), false); + } + TEST_UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotRegionProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotRegionProcedure.java new file mode 100644 index 000000000000..98084c7d2e80 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotRegionProcedure.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.SnapshotDescription; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifestV2; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestSnapshotRegionProcedure { + private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotRegionProcedure.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSnapshotRegionProcedure.class); + + private static HBaseTestingUtil TEST_UTIL; + private HMaster master; + private TableName TABLE_NAME; + private byte[] CF; + private String SNAPSHOT_NAME; + private SnapshotDescription snapshot; + private SnapshotProtos.SnapshotDescription snapshotProto; + private Path workingDir; + private FileSystem workingDirFs; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.MASTER_INFO_PORT, 8080); + // delay dispatch so that we can do something, for example kill a target server + conf.setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 10000); + conf.setInt(RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, 128); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("SRPTestTable")); + CF = Bytes.toBytes("cf"); + SNAPSHOT_NAME = "SnapshotRegionProcedureTest"; + snapshot = new SnapshotDescription(SNAPSHOT_NAME, TABLE_NAME, SnapshotType.FLUSH); + snapshotProto = ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot); + snapshotProto = SnapshotDescriptionUtils.validate(snapshotProto, master.getConfiguration()); + final byte[][] splitKeys = new RegionSplitter.HexStringSplit().split(10); + Table table = TEST_UTIL.createTable(TABLE_NAME, CF, splitKeys); + TEST_UTIL.loadTable(table, CF, false); + Path rootDir = CommonFSUtils.getRootDir(conf); + this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshotProto, rootDir, conf); + this.workingDirFs = workingDir.getFileSystem(conf); + if (!workingDirFs.exists(workingDir)) { + workingDirFs.mkdirs(workingDir); + } + } + + private boolean assertRegionManifestGenerated(RegionInfo region) throws Exception { + // path: /////region-manifest. + String regionManifest = SnapshotManifestV2.SNAPSHOT_MANIFEST_PREFIX + region.getEncodedName(); + Path targetPath = new Path(workingDir, regionManifest); + return workingDirFs.exists(targetPath); + } + + @Test + public void testSimpleSnapshotRegion() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + List> regions = + master.getAssignmentManager().getTableRegionsAndLocations(TABLE_NAME, true); + assertEquals(10, regions.size()); + Pair region = regions.get(0); + SnapshotRegionProcedure srp = new SnapshotRegionProcedure(snapshotProto, region.getFirst()); + long procId = procExec.submitProcedure(srp); + ProcedureTestingUtility.waitProcedure(procExec, procId); + assertTrue(assertRegionManifestGenerated(region.getFirst())); + } + + @Test + public void testRegionServerCrashWhileTakingSnapshotRegion() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + List> regions = + master.getAssignmentManager().getTableRegionsAndLocations(TABLE_NAME, true); + assertEquals(10, regions.size()); + Pair pair = regions.get(0); + SnapshotRegionProcedure srp = new SnapshotRegionProcedure(snapshotProto, pair.getFirst()); + long procId = procExec.submitProcedure(srp); + TEST_UTIL.getHBaseCluster().killRegionServer(pair.getSecond()); + TEST_UTIL.waitFor(60000, () -> !pair.getSecond().equals(master.getAssignmentManager() + .getRegionStates().getRegionStateNode(pair.getFirst()).getRegionLocation())); + TEST_UTIL.waitFor(60000, () -> srp.inRetrying()); + ProcedureTestingUtility.waitProcedure(procExec, procId); + assertTrue(assertRegionManifestGenerated(pair.getFirst())); + } + + @After + public void teardown() throws Exception { + if (this.master != null) { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( + master.getMasterProcedureExecutor(), false); + } + TEST_UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java index c5911f2a4a87..ce80b66948dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java @@ -526,6 +526,12 @@ public void snapshot(SnapshotDescription snapshot) admin.snapshot(snapshot); } + @Override + public void snapshotTable(SnapshotDescription snapshot) + throws IOException, SnapshotCreationException, IllegalArgumentException { + admin.snapshotTable(snapshot); + } + public Future snapshotAsync(SnapshotDescription snapshot) throws IOException, SnapshotCreationException { return admin.snapshotAsync(snapshot); diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 0d02dbaaa1d8..ec2aea87b38e 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -1238,6 +1238,38 @@ def snapshot(table, snapshot_name, *args) end end +#---------------------------------------------------------------------------------------------- + # Take a snapshot of specified table + def snapshot_table(table, snapshot_name, *args) + # Table name should be a string + raise(ArgumentError, 'Table name must be of type String') unless table.is_a?(String) + + # Snapshot name should be a string + raise(ArgumentError, 'Snapshot name must be of type String') unless + snapshot_name.is_a?(String) + + table_name = TableName.valueOf(table) + if args.empty? + @admin.snapshotTable(snapshot_name, table_name) + else + args.each do |arg| + ttl = arg[TTL] + ttl = ttl ? ttl.to_java(:long) : -1 + snapshot_props = java.util.HashMap.new + snapshot_props.put("TTL", ttl) + max_filesize = arg[MAX_FILESIZE] + max_filesize = max_filesize ? max_filesize.to_java(:long) : -1 + snapshot_props.put("MAX_FILESIZE", max_filesize) + if arg[SKIP_FLUSH] == true + @admin.snapshotTable(snapshot_name, table_name, + org.apache.hadoop.hbase.client.SnapshotType::SKIPFLUSH, snapshot_props) + else + @admin.snapshotTable(snapshot_name, table_name, snapshot_props) + end + end + end + end + #---------------------------------------------------------------------------------------------- # Restore specified snapshot def restore_snapshot(snapshot_name, restore_acl = false) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index d4da726e617f..b77ff1e3fc52 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -537,6 +537,7 @@ def self.exception_handler(hide_traceback) full_name: 'CLUSTER SNAPSHOT TOOLS', commands: %w[ snapshot + snapshot_table clone_snapshot restore_snapshot delete_snapshot diff --git a/hbase-shell/src/main/ruby/shell/commands/snapshot_table.rb b/hbase-shell/src/main/ruby/shell/commands/snapshot_table.rb new file mode 100644 index 000000000000..72398b750af3 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/snapshot_table.rb @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SnapshotTable < Command + def help + <<-EOF +Take a snapshot of specified table. Examples: + + hbase> snapshot_table 'sourceTable', 'snapshotName' + hbase> snapshot_table 'namespace:sourceTable', 'snapshotName', {SKIP_FLUSH => true, MAX_FILESIZE => 21474836480} +EOF + end + + def command(table, snapshot_name, *args) + admin.snapshot_table(table, snapshot_name, *args) + end + end + end +end diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 2b5452528283..c61a4af267c6 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -843,7 +843,11 @@ public void snapshot(String snapshotName, TableName tableName, SnapshotType type @Override public void snapshot(SnapshotDescription snapshot) { throw new NotImplementedException("snapshot not supported in ThriftAdmin"); + } + @Override + public void snapshotTable(SnapshotDescription snapshot) { + throw new NotImplementedException("snapshot not supported in ThriftAdmin"); } @Override