Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -645,3 +645,13 @@ enum ClaimReplicationQueuesState {
CLAIM_REPLICATION_QUEUES_DISPATCH = 1;
CLAIM_REPLICATION_QUEUES_FINISH = 2;
}

enum ModifyTableDescriptorState {
MODIFY_TABLE_DESCRIPTOR_PREPARE = 1;
MODIFY_TABLE_DESCRIPTOR_UPDATE = 2;
}

message ModifyTableDescriptorStateData {
required TableSchema unmodified_table_schema = 1;
optional TableSchema modified_table_schema = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ModifyTableDescriptorState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ModifyTableDescriptorStateData;

/**
* The procedure will only update the table descriptor without reopening all the regions.
* <p/>
* It is usually used for migrating when upgrading, where we need to add something into the table
* descriptor, such as the rs group information.
*/
@InterfaceAudience.Private
public abstract class ModifyTableDescriptorProcedure
extends AbstractStateMachineTableProcedure<ModifyTableDescriptorState> {

private static final Logger LOG = LoggerFactory.getLogger(ModifyTableDescriptorProcedure.class);

private TableDescriptor unmodifiedTableDescriptor;
private TableDescriptor modifiedTableDescriptor;

protected ModifyTableDescriptorProcedure() {
}

protected ModifyTableDescriptorProcedure(MasterProcedureEnv env, TableDescriptor unmodified) {
super(env);
this.unmodifiedTableDescriptor = unmodified;
}

@Override
public TableName getTableName() {
return unmodifiedTableDescriptor.getTableName();
}

@Override
public TableOperationType getTableOperationType() {
return TableOperationType.EDIT;
}

/**
* Sub class should implement this method to modify the table descriptor, such as storing the rs
* group information.
* <p/>
* Since the migrating is asynchronouns, it is possible that users have already changed the rs
* group for a table, in this case we do not need to modify the table descriptor any more, then
* you could just return {@link Optional#empty()}.
*/
protected abstract Optional<TableDescriptor> modify(MasterProcedureEnv env,
TableDescriptor current) throws IOException;

@Override
protected Flow executeFromState(MasterProcedureEnv env, ModifyTableDescriptorState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
try {
switch (state) {
case MODIFY_TABLE_DESCRIPTOR_PREPARE:
Optional<TableDescriptor> modified = modify(env, unmodifiedTableDescriptor);
if (modified.isPresent()) {
modifiedTableDescriptor = modified.get();
setNextState(ModifyTableDescriptorState.MODIFY_TABLE_DESCRIPTOR_UPDATE);
return Flow.HAS_MORE_STATE;
} else {
// do not need to modify
return Flow.NO_MORE_STATE;
}
case MODIFY_TABLE_DESCRIPTOR_UPDATE:
env.getMasterServices().getTableDescriptors().update(modifiedTableDescriptor);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
if (isRollbackSupported(state)) {
setFailure("master-modify-table-descriptor", e);
} else {
LOG.warn("Retriable error trying to modify table descriptor={} (in state={})",
getTableName(), state, e);
}
}
return Flow.HAS_MORE_STATE;
}

@Override
protected void rollbackState(MasterProcedureEnv env, ModifyTableDescriptorState state)
throws IOException, InterruptedException {
if (state == ModifyTableDescriptorState.MODIFY_TABLE_DESCRIPTOR_PREPARE) {
return;
}
throw new UnsupportedOperationException("unhandled state=" + state);
}

@Override
protected boolean isRollbackSupported(ModifyTableDescriptorState state) {
return state == ModifyTableDescriptorState.MODIFY_TABLE_DESCRIPTOR_PREPARE;
}

@Override
protected ModifyTableDescriptorState getState(int stateId) {
return ModifyTableDescriptorState.forNumber(stateId);
}

@Override
protected int getStateId(ModifyTableDescriptorState state) {
return state.getNumber();
}

@Override
protected ModifyTableDescriptorState getInitialState() {
return ModifyTableDescriptorState.MODIFY_TABLE_DESCRIPTOR_PREPARE;
}

@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
ModifyTableDescriptorStateData.Builder builder = ModifyTableDescriptorStateData.newBuilder()
.setUnmodifiedTableSchema(ProtobufUtil.toTableSchema(unmodifiedTableDescriptor));
if (modifiedTableDescriptor != null) {
builder.setModifiedTableSchema(ProtobufUtil.toTableSchema(modifiedTableDescriptor));
}
serializer.serialize(builder.build());
}

@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
ModifyTableDescriptorStateData data =
serializer.deserialize(ModifyTableDescriptorStateData.class);
unmodifiedTableDescriptor = ProtobufUtil.toTableDescriptor(data.getUnmodifiedTableSchema());
if (data.hasModifiedTableSchema()) {
modifiedTableDescriptor = ProtobufUtil.toTableDescriptor(data.getModifiedTableSchema());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;

import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ModifyTableDescriptorProcedure;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Procedure for migrating rs group information to table descriptor.
*/
@InterfaceAudience.Private
public class MigrateRSGroupProcedure extends ModifyTableDescriptorProcedure {

private static final Logger LOG = LoggerFactory.getLogger(MigrateRSGroupProcedure.class);

public MigrateRSGroupProcedure() {
}

public MigrateRSGroupProcedure(MasterProcedureEnv env, TableDescriptor unmodified) {
super(env, unmodified);
}

@Override
protected Optional<TableDescriptor> modify(MasterProcedureEnv env, TableDescriptor current)
throws IOException {
if (current.getRegionServerGroup().isPresent()) {
// usually this means user has set the rs group using the new code which will set the group
// directly on table descriptor, skip.
LOG.debug("Skip migrating {} since it is already in group {}", current.getTableName(),
current.getRegionServerGroup().get());
return Optional.empty();
}
RSGroupInfo group =
env.getMasterServices().getRSGroupInfoManager().getRSGroupForTable(current.getTableName());
if (group == null) {
LOG.debug("RSGroup for table {} is empty when migrating, usually this should not happen" +
" unless we have removed the RSGroup, ignore...", current.getTableName());
return Optional.empty();
}
return Optional
.of(TableDescriptorBuilder.newBuilder(current).setRegionServerGroup(group.getName()).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -492,11 +494,14 @@ private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {

private void migrate(Collection<RSGroupInfo> groupList) {
TableDescriptors tds = masterServices.getTableDescriptors();
ProcedureExecutor<MasterProcedureEnv> procExec =
masterServices.getMasterProcedureExecutor();
for (RSGroupInfo groupInfo : groupList) {
if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
continue;
}
SortedSet<TableName> failedTables = new TreeSet<>();
List<MigrateRSGroupProcedure> procs = new ArrayList<>();
for (TableName tableName : groupInfo.getTables()) {
LOG.debug("Migrating {} in group {}", tableName, groupInfo.getName());
TableDescriptor oldTd;
Expand All @@ -517,20 +522,24 @@ private void migrate(Collection<RSGroupInfo> groupList) {
oldTd.getRegionServerGroup().get());
continue;
}
TableDescriptor newTd = TableDescriptorBuilder.newBuilder(oldTd)
.setRegionServerGroup(groupInfo.getName()).build();
// This is a bit tricky. Since we know that the region server group config in
// TableDescriptor will only be used at master side, it is fine to just update the table
// descriptor on file system and also the cache, without reopening all the regions. This
// will be much faster than the normal modifyTable. And when upgrading, we will update
// master first and then region server, so after all the region servers has been reopened,
// the new TableDescriptor will be loaded.
MigrateRSGroupProcedure proc =
new MigrateRSGroupProcedure(procExec.getEnvironment(), oldTd);
procExec.submitProcedure(proc);
procs.add(proc);
}
for (MigrateRSGroupProcedure proc : procs) {
try {
tds.update(newTd);
ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60000);
} catch (IOException e) {
LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
failedTables.add(tableName);
continue;
LOG.warn("Failed to migrate rs group {} for table {}", groupInfo.getName(),
proc.getTableName());
failedTables.add(proc.getTableName());
}
}
LOG.debug("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables);
Expand Down