Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -771,3 +771,12 @@ enum MigrateReplicationQueueFromZkToTableState {
message MigrateReplicationQueueFromZkToTableStateData {
repeated string disabled_peer_id = 1;
}

enum MigrateNamespaceTableProcedureState {
MIGRATE_NAMESPACE_TABLE_ADD_FAMILY = 1;
MIGRATE_NAMESPACE_TABLE_MIGRATE_DATA = 2;
MIGRATE_NAMESPACE_TABLE_DISABLE_TABLE = 3;
}

message MigrateNamespaceTableProcedureStateData {
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ protected synchronized void doStart() {
try {
notifyStarted();
this.tableNamespaceManager.start();
} catch (IOException ioe) {
notifyFailed(ioe);
} catch (IOException | InterruptedException e) {
notifyFailed(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,29 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MigrateNamespaceTableProcedure;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -66,61 +66,112 @@ public class TableNamespaceManager {

private final MasterServices masterServices;

private volatile boolean migrationDone;

TableNamespaceManager(MasterServices masterServices) {
this.masterServices = masterServices;
}

private void migrateNamespaceTable() throws IOException {
try (Table nsTable = masterServices.getConnection().getTable(TableName.NAMESPACE_TABLE_NAME);
ResultScanner scanner = nsTable.getScanner(
new Scan().addFamily(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES).readAllVersions());
BufferedMutator mutator =
masterServices.getConnection().getBufferedMutator(TableName.META_TABLE_NAME)) {
private void tryMigrateNamespaceTable() throws IOException, InterruptedException {
Optional<MigrateNamespaceTableProcedure> opt = masterServices.getProcedures().stream()
.filter(p -> p instanceof MigrateNamespaceTableProcedure)
.map(p -> (MigrateNamespaceTableProcedure) p).findAny();
if (!opt.isPresent()) {
// the procedure is not present, check whether have the ns family in meta table
TableDescriptor metaTableDesc =
masterServices.getTableDescriptors().get(TableName.META_TABLE_NAME);
if (metaTableDesc.hasColumnFamily(HConstants.NAMESPACE_FAMILY)) {
// normal case, upgrading is done or the cluster is created with 3.x code
migrationDone = true;
} else {
// submit the migration procedure
MigrateNamespaceTableProcedure proc = new MigrateNamespaceTableProcedure();
masterServices.getMasterProcedureExecutor().submitProcedure(proc);
}
} else {
if (opt.get().isFinished()) {
// the procedure is already done
migrationDone = true;
}
// we have already submitted the procedure, continue
}
}

private void addToCache(Result result, byte[] family, byte[] qualifier) throws IOException {
Cell cell = result.getColumnLatestCell(family, qualifier);
NamespaceDescriptor ns =
ProtobufUtil.toNamespaceDescriptor(HBaseProtos.NamespaceDescriptor.parseFrom(CodedInputStream
.newInstance(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
cache.put(ns.getName(), ns);
}

private void loadFromMeta() throws IOException {
try (Table table = masterServices.getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = table.getScanner(HConstants.NAMESPACE_FAMILY)) {
for (Result result;;) {
result = scanner.next();
if (result == null) {
break;
}
Put put = new Put(result.getRow());
result
.getColumnCells(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES,
TableDescriptorBuilder.NAMESPACE_COL_DESC_BYTES)
.forEach(c -> put.addColumn(HConstants.NAMESPACE_FAMILY,
HConstants.NAMESPACE_COL_DESC_QUALIFIER, c.getTimestamp(), CellUtil.cloneValue(c)));
mutator.mutate(put);
addToCache(result, HConstants.NAMESPACE_FAMILY, HConstants.NAMESPACE_COL_DESC_QUALIFIER);
}
}
// schedule a disable procedure instead of block waiting here, as when disabling a table we will
// wait until master is initialized, but we are part of the initialization...
masterServices.getMasterProcedureExecutor().submitProcedure(
new DisableTableProcedure(masterServices.getMasterProcedureExecutor().getEnvironment(),
TableName.NAMESPACE_TABLE_NAME, false));
}

private void loadNamespaceIntoCache() throws IOException {
try (Table table = masterServices.getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = table.getScanner(HConstants.NAMESPACE_FAMILY)) {
private void loadFromNamespace() throws IOException {
try (Table table = masterServices.getConnection().getTable(TableName.NAMESPACE_TABLE_NAME);
ResultScanner scanner =
table.getScanner(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES)) {
for (Result result;;) {
result = scanner.next();
if (result == null) {
break;
}
Cell cell = result.getColumnLatestCell(HConstants.NAMESPACE_FAMILY,
HConstants.NAMESPACE_COL_DESC_QUALIFIER);
NamespaceDescriptor ns = ProtobufUtil
.toNamespaceDescriptor(HBaseProtos.NamespaceDescriptor.parseFrom(CodedInputStream
.newInstance(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
cache.put(ns.getName(), ns);
addToCache(result, TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES,
TableDescriptorBuilder.NAMESPACE_COL_DESC_BYTES);
}
}
}

public void start() throws IOException {
TableState nsTableState = MetaTableAccessor.getTableState(masterServices.getConnection(),
TableName.NAMESPACE_TABLE_NAME);
if (nsTableState != null && nsTableState.isEnabled()) {
migrateNamespaceTable();
private boolean shouldLoadFromMeta() throws IOException {
if (migrationDone) {
return true;
}
// the implementation is bit tricky
// if there is already a disable namespace table procedure or the namespace table is already
// disabled, we are safe to read from meta table as the migration is already done. If not, since
// we are part of the master initialization work, so we can make sure that when reaching here,
// the master has not been marked as initialize yet. And DisableTableProcedure can only be
// executed after master is initialized, so here we are safe to read from namespace table,
// without worrying about that the namespace table is disabled while we are reading and crash
// the master startup.
if (
masterServices.getTableStateManager().isTableState(TableName.NAMESPACE_TABLE_NAME,
TableState.State.DISABLED)
) {
return true;
}
if (
masterServices.getProcedures().stream().filter(p -> p instanceof DisableTableProcedure)
.anyMatch(
p -> ((DisableTableProcedure) p).getTableName().equals(TableName.NAMESPACE_TABLE_NAME))
) {
return true;
}
return false;
}

private void loadNamespaceIntoCache() throws IOException {
if (shouldLoadFromMeta()) {
loadFromMeta();
} else {
loadFromNamespace();
}

}

public void start() throws IOException, InterruptedException {
tryMigrateNamespaceTable();
loadNamespaceIntoCache();
}

Expand All @@ -135,7 +186,14 @@ public NamespaceDescriptor get(String name) throws IOException {
return cache.get(name);
}

private void checkMigrationDone() throws IOException {
if (!migrationDone) {
throw new HBaseIOException("namespace migration is ongoing, modification is disallowed");
}
}

public void addOrUpdateNamespace(NamespaceDescriptor ns) throws IOException {
checkMigrationDone();
insertNamespaceToMeta(masterServices.getConnection(), ns);
cache.put(ns.getName(), ns);
}
Expand All @@ -152,6 +210,7 @@ public static void insertNamespaceToMeta(Connection conn, NamespaceDescriptor ns
}

public void deleteNamespace(String namespaceName) throws IOException {
checkMigrationDone();
Delete d = new Delete(Bytes.toBytes(namespaceName));
try (Table table = masterServices.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.delete(d);
Expand All @@ -174,6 +233,10 @@ public void validateTableAndRegionCount(NamespaceDescriptor desc) throws IOExcep
}
}

public void setMigrationDone() {
migrationDone = true;
}

public static long getMaxTables(NamespaceDescriptor ns) throws IOException {
String value = ns.getConfigurationValue(KEY_MAX_TABLES);
long maxTables = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateNamespaceTableProcedureState;

/**
* Migrate the namespace data to meta table's namespace family while upgrading
*/
@InterfaceAudience.Private
public class MigrateNamespaceTableProcedure
extends StateMachineProcedure<MasterProcedureEnv, MigrateNamespaceTableProcedureState>
implements GlobalProcedureInterface {

private static final Logger LOG = LoggerFactory.getLogger(MigrateNamespaceTableProcedure.class);

private RetryCounter retryCounter;

@Override
public String getGlobalId() {
return getClass().getSimpleName();
}

private void migrate(MasterProcedureEnv env) throws IOException {
Connection conn = env.getMasterServices().getConnection();
try (Table nsTable = conn.getTable(TableName.NAMESPACE_TABLE_NAME);
ResultScanner scanner = nsTable.getScanner(
new Scan().addFamily(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES).readAllVersions());
BufferedMutator mutator = conn.getBufferedMutator(TableName.META_TABLE_NAME)) {
for (Result result;;) {
result = scanner.next();
if (result == null) {
break;
}
Put put = new Put(result.getRow());
result
.getColumnCells(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES,
TableDescriptorBuilder.NAMESPACE_COL_DESC_BYTES)
.forEach(c -> put.addColumn(HConstants.NAMESPACE_FAMILY,
HConstants.NAMESPACE_COL_DESC_QUALIFIER, c.getTimestamp(), CellUtil.cloneValue(c)));
mutator.mutate(put);
}
}
}

@Override
protected Flow executeFromState(MasterProcedureEnv env, MigrateNamespaceTableProcedureState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
try {
switch (state) {
case MIGRATE_NAMESPACE_TABLE_ADD_FAMILY:
TableDescriptor metaTableDesc =
env.getMasterServices().getTableDescriptors().get(TableName.META_TABLE_NAME);
if (!metaTableDesc.hasColumnFamily(HConstants.NAMESPACE_FAMILY)) {
TableDescriptor newMetaTableDesc = TableDescriptorBuilder.newBuilder(metaTableDesc)
.setColumnFamily(
FSTableDescriptors.getNamespaceFamilyDescForMeta(env.getMasterConfiguration()))
.build();
addChildProcedure(new ModifyTableProcedure(env, newMetaTableDesc));
}
setNextState(MigrateNamespaceTableProcedureState.MIGRATE_NAMESPACE_TABLE_MIGRATE_DATA);
return Flow.HAS_MORE_STATE;
case MIGRATE_NAMESPACE_TABLE_MIGRATE_DATA:
migrate(env);
setNextState(MigrateNamespaceTableProcedureState.MIGRATE_NAMESPACE_TABLE_DISABLE_TABLE);
return Flow.HAS_MORE_STATE;
case MIGRATE_NAMESPACE_TABLE_DISABLE_TABLE:
addChildProcedure(new DisableTableProcedure(env, TableName.NAMESPACE_TABLE_NAME, false));
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("Unhandled state=" + state);
}
} catch (IOException e) {
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed migrating namespace data, suspend {}secs {}", backoff / 1000, this, e);
throw suspend(Math.toIntExact(backoff), true);
}
}

@Override
protected void rollbackState(MasterProcedureEnv env, MigrateNamespaceTableProcedureState state)
throws IOException, InterruptedException {
}

@Override
protected MigrateNamespaceTableProcedureState getState(int stateId) {
return MigrateNamespaceTableProcedureState.forNumber(stateId);
}

@Override
protected int getStateId(MigrateNamespaceTableProcedureState state) {
return state.getNumber();
}

@Override
protected MigrateNamespaceTableProcedureState getInitialState() {
return MigrateNamespaceTableProcedureState.MIGRATE_NAMESPACE_TABLE_ADD_FAMILY;
}

@Override
protected void completionCleanup(MasterProcedureEnv env) {
env.getMasterServices().getClusterSchema().getTableNamespaceManager().setMigrationDone();
}
}
Loading