diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index c9c9c6357312..062555e73c2c 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -29,6 +29,7 @@ import "rpc/RPC.proto"; import "server/Snapshot.proto"; import "server/master/Replication.proto"; import "server/master/RegionServerStatus.proto"; +import "server/ErrorHandling.proto"; // ============================================================================ // WARNING - Compatibility rules @@ -254,6 +255,8 @@ message SnapshotVerifyProcedureStateData { required SnapshotDescription snapshot = 1; required RegionInfo region = 2; optional ServerName target_server = 3; + optional ServerRemoteProcedureState state = 4; + optional ForeignExceptionMessage error = 5; } message SnapshotVerifyParameter { @@ -522,6 +525,8 @@ message RefreshPeerStateData { required ServerName target_server = 3; /** We need multiple stages for sync replication state transition **/ optional uint32 stage = 4 [default = 0]; + optional ServerRemoteProcedureState state = 5; + optional ForeignExceptionMessage error = 6; } message RefreshPeerParameter { @@ -613,6 +618,8 @@ message SyncReplicationReplayWALRemoteStateData { required string peer_id = 1; repeated string wal = 2; required ServerName target_server = 3; + optional ServerRemoteProcedureState state = 4; + optional ForeignExceptionMessage error = 5; } message ReplaySyncReplicationWALParameter { @@ -650,6 +657,14 @@ enum RegionRemoteProcedureBaseState { REGION_REMOTE_PROCEDURE_SERVER_CRASH = 4; } +enum ServerRemoteProcedureState { + SERVER_REMOTE_PROCEDURE_DISPATCH = 1; + SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL = 2; + SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED = 3; + SERVER_REMOTE_PROCEDURE_REPORT_FAILED = 4; + SERVER_REMOTE_PROCEDURE_SERVER_CRASH = 5; +} + message RegionRemoteProcedureBaseStateData { required RegionInfo region = 1; required ServerName target_server = 2; @@ -681,6 +696,8 @@ message SwitchRpcThrottleStateData { message SwitchRpcThrottleRemoteStateData { required ServerName target_server = 1; required bool rpc_throttle_enabled = 2; + optional ServerRemoteProcedureState state = 3; + optional ForeignExceptionMessage error = 4; } message SplitWALParameter { @@ -698,6 +715,8 @@ message SplitWALRemoteData{ required string wal_path = 1; required ServerName crashed_server=2; required ServerName worker = 3; + optional ServerRemoteProcedureState state = 4; + optional ForeignExceptionMessage error = 5; } enum SplitWALState{ @@ -715,6 +734,8 @@ message ClaimReplicationQueueRemoteStateData { required string queue = 2; required ServerName target_server = 3; optional ServerName source_server = 4; + optional ServerRemoteProcedureState state = 5; + optional ForeignExceptionMessage error = 6; } message ClaimReplicationQueueRemoteParameter { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index d24471b938e2..776eae27e429 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -30,6 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + @InterfaceAudience.Private /** * The base class for Procedures that run {@link java.util.concurrent.Callable}s on a (remote) @@ -80,17 +82,25 @@ public abstract class ServerRemoteProcedure extends Procedure[] execute(MasterProcedureEnv env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { - if (dispatched) { + if ( + state != MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH + ) { + complete(env, this.remoteError); if (succ) { return null; } - dispatched = false; + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH; + ; } try { env.getRemoteDispatcher().addOperationToNode(targetServer, this); @@ -113,17 +123,20 @@ protected synchronized void completionCleanup(MasterProcedureEnv env) { @Override public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOException exception) { + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL; remoteOperationDone(env, exception); } @Override public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED; remoteOperationDone(env, null); } @Override public synchronized void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_FAILED; remoteOperationDone(env, error); } @@ -137,7 +150,9 @@ synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) { getProcId()); return; } - complete(env, error); + this.remoteError = error; + // below persistence is added so that if report goes to last active master, it throws exception + env.getMasterServices().getMasterProcedureExecutor().getStore().update(this); event.wake(env.getProcedureScheduler()); event = null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java index 651822ff5b2a..d6ab01d20452 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java @@ -32,12 +32,14 @@ import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.hadoop.hbase.regionserver.SnapshotVerifyCallable; import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; @@ -177,10 +179,15 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { SnapshotVerifyProcedureStateData.Builder builder = SnapshotVerifyProcedureStateData.newBuilder(); - builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)); + builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)).setState(state); if (targetServer != null) { builder.setTargetServer(ProtobufUtil.toServerName(targetServer)); } + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } serializer.serialize(builder.build()); } @@ -190,9 +197,13 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws serializer.deserialize(SnapshotVerifyProcedureStateData.class); this.snapshot = data.getSnapshot(); this.region = ProtobufUtil.toRegionInfo(data.getRegion()); + this.state = data.getState(); if (data.hasTargetServer()) { this.targetServer = ProtobufUtil.toServerName(data.getTargetServer()); } + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java index 3dc49073c720..d074bcfa077b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -25,12 +25,14 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; import org.apache.hadoop.hbase.regionserver.SplitWALCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; /** @@ -70,7 +72,12 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO MasterProcedureProtos.SplitWALRemoteData.Builder builder = MasterProcedureProtos.SplitWALRemoteData.newBuilder(); builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer)) - .setCrashedServer(ProtobufUtil.toServerName(crashedServer)); + .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setState(state); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } serializer.serialize(builder.build()); } @@ -81,6 +88,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws walPath = data.getWalPath(); targetServer = ProtobufUtil.toServerName(data.getWorker()); crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); + state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java index 53a4af1b5104..37270cf52139 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java @@ -23,11 +23,13 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData; /** @@ -59,9 +61,16 @@ protected boolean abort(MasterProcedureEnv env) { @Override protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { - SwitchRpcThrottleRemoteStateData.newBuilder() - .setTargetServer(ProtobufUtil.toServerName(targetServer)) - .setRpcThrottleEnabled(rpcThrottleEnabled).build(); + SwitchRpcThrottleRemoteStateData.Builder builder = + SwitchRpcThrottleRemoteStateData.newBuilder(); + builder.setTargetServer(ProtobufUtil.toServerName(targetServer)) + .setRpcThrottleEnabled(rpcThrottleEnabled).setState(state).build(); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } + serializer.serialize(builder.build()); } @Override @@ -70,6 +79,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws serializer.deserialize(SwitchRpcThrottleRemoteStateData.class); targetServer = ProtobufUtil.toServerName(data.getTargetServer()); rpcThrottleEnabled = data.getRpcThrottleEnabled(); + state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java index d3aeeba541a2..83618de58e6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java @@ -35,11 +35,13 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData; @@ -144,7 +146,13 @@ protected boolean waitInitialized(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { ClaimReplicationQueueRemoteStateData.Builder builder = ClaimReplicationQueueRemoteStateData .newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName())) - .setQueue(queueId.getPeerId()).setTargetServer(ProtobufUtil.toServerName(targetServer)); + .setQueue(queueId.getPeerId()).setTargetServer(ProtobufUtil.toServerName(targetServer)) + .setState(state); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } queueId.getSourceServerName() .ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer))); serializer.serialize(builder.build()); @@ -157,11 +165,15 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws targetServer = ProtobufUtil.toServerName(data.getTargetServer()); ServerName crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); String queue = data.getQueue(); + state = data.getState(); if (data.hasSourceServer()) { queueId = new ReplicationQueueId(crashedServer, queue, ProtobufUtil.toServerName(data.getSourceServer())); } else { queueId = new ReplicationQueueId(crashedServer, queue); } + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index 0e1b9a3b3810..4196f537fe14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -28,11 +28,13 @@ import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData; @@ -149,9 +151,15 @@ protected boolean waitInitialized(MasterProcedureEnv env) { @Override protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { - serializer.serialize( - RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) - .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).build()); + RefreshPeerStateData.Builder builder = RefreshPeerStateData.newBuilder(); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } + serializer.serialize(builder.setPeerId(peerId).setType(toPeerModificationType(type)) + .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).setState(state) + .build()); } @Override @@ -161,5 +169,9 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws type = toPeerOperationType(data.getType()); targetServer = ProtobufUtil.toServerName(data.getTargetServer()); stage = data.getStage(); + state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java index 4d3bf236716f..356c9b1e963a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java @@ -29,11 +29,13 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData; @@ -125,8 +127,13 @@ protected boolean abort(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { SyncReplicationReplayWALRemoteStateData.Builder builder = SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId) - .setTargetServer(ProtobufUtil.toServerName(targetServer)); + .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state); wals.stream().forEach(builder::addWal); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } serializer.serialize(builder.build()); } @@ -138,6 +145,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws wals = new ArrayList<>(); data.getWalList().forEach(wals::add); targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override