diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 614b120cf887..640ef17d4994 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.quotas.QuotaSettings; import org.apache.hadoop.hbase.quotas.QuotaTableUtil; import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; @@ -965,6 +966,8 @@ public CompletableFuture flush(TableName tableName) { @Override public CompletableFuture flush(TableName tableName, byte[] columnFamily) { + Preconditions.checkNotNull(columnFamily, + "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); return flush(tableName, Collections.singletonList(columnFamily)); } @@ -974,6 +977,8 @@ public CompletableFuture flush(TableName tableName, List columnFam // If the server version is lower than the client version, it's possible that the // flushTable method is not present in the server side, if so, we need to fall back // to the old implementation. + Preconditions.checkNotNull(columnFamilyList, + "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead."); List columnFamilies = columnFamilyList.stream() .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies, @@ -984,7 +989,10 @@ public CompletableFuture flush(TableName tableName, List columnFam CompletableFuture future = new CompletableFuture<>(); addListener(procFuture, (ret, error) -> { if (error != null) { - if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) { + if ( + error instanceof TableNotFoundException || error instanceof TableNotEnabledException + || error instanceof NoSuchColumnFamilyException + ) { future.completeExceptionally(error); } else if (error instanceof DoNotRetryIOException) { // usually this is caused by the method is not present on the server or diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java index 892d4d13b5ee..bf05dfe54c45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java @@ -24,10 +24,13 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; import org.apache.yetus.audience.InterfaceAudience; @@ -111,6 +114,30 @@ protected Flow executeFromState(MasterProcedureEnv env, FlushTableState state) return Flow.HAS_MORE_STATE; } + @Override + protected void preflightChecks(MasterProcedureEnv env, Boolean enabled) throws HBaseIOException { + super.preflightChecks(env, enabled); + if (columnFamilies == null) { + return; + } + MasterServices master = env.getMasterServices(); + try { + TableDescriptor tableDescriptor = master.getTableDescriptors().get(tableName); + List noSuchFamilies = + columnFamilies.stream().filter(cf -> !tableDescriptor.hasColumnFamily(cf)) + .map(Bytes::toString).collect(Collectors.toList()); + if (!noSuchFamilies.isEmpty()) { + throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies + + " don't exist in table " + tableName.getNameAsString()); + } + } catch (IOException ioe) { + if (ioe instanceof HBaseIOException) { + throw (HBaseIOException) ioe; + } + throw new HBaseIOException(ioe); + } + } + @Override protected void rollbackState(MasterProcedureEnv env, FlushTableState state) throws IOException, InterruptedException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java index a4ef8cf0a1f8..12a3809fb749 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java @@ -24,12 +24,15 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; @@ -40,9 +43,12 @@ import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator; +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.AccessChecker; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -146,13 +152,28 @@ public void execProcedure(ProcedureDescription desc) throws IOException { ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); - HBaseProtos.NameStringPair family = null; + HBaseProtos.NameStringPair families = null; for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) { if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) { - family = nsp; + families = nsp; } } - byte[] procArgs = family != null ? family.toByteArray() : new byte[0]; + + byte[] procArgs; + if (families != null) { + TableDescriptor tableDescriptor = master.getTableDescriptors().get(tableName); + List noSuchFamilies = + StreamSupport.stream(Strings.SPLITTER.split(families.getValue()).spliterator(), false) + .filter(cf -> !tableDescriptor.hasColumnFamily(Bytes.toBytes(cf))) + .collect(Collectors.toList()); + if (!noSuchFamilies.isEmpty()) { + throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies + + " don't exist in table " + tableName.getNameAsString()); + } + procArgs = families.toByteArray(); + } else { + procArgs = new byte[0]; + } // Kick of the global procedure from the master coordinator to the region servers. // We rely on the existing Distributed Procedure framework to prevent any concurrent diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 06abad1a4966..b77fcf338a50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -1776,8 +1777,16 @@ public FlushRegionResponse flushRegion(final RpcController controller, // Go behind the curtain so we can manage writing of the flush WAL marker HRegion.FlushResultImpl flushResult = null; if (request.hasFamily()) { - List families = new ArrayList(); + List families = new ArrayList(); families.add(request.getFamily().toByteArray()); + TableDescriptor tableDescriptor = region.getTableDescriptor(); + List noSuchFamilies = + families.stream().filter(f -> !tableDescriptor.hasColumnFamily(f)).map(Bytes::toString) + .collect(Collectors.toList()); + if (!noSuchFamilies.isEmpty()) { + throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies + + " don't exist in table " + tableDescriptor.getTableName().getNameAsString()); + } flushResult = region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java index 15023def30f3..a93603cd5461 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java @@ -19,10 +19,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -30,9 +35,11 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.junit.After; import org.junit.AfterClass; @@ -185,6 +192,30 @@ public void testAsyncFlushRegionFamily() throws Exception { } } + @Test + public void testAsyncFlushTableWithNonExistingFamilies() throws IOException { + AsyncAdmin admin = asyncConn.getAdmin(); + List families = new ArrayList<>(); + families.add(FAMILY_1); + families.add(FAMILY_2); + families.add(Bytes.toBytes("non_family01")); + families.add(Bytes.toBytes("non_family02")); + CompletableFuture future = CompletableFuture.allOf(admin.flush(tableName, families)); + assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future)); + } + + @Test + public void testAsyncFlushRegionWithNonExistingFamily() throws IOException { + AsyncAdmin admin = asyncConn.getAdmin(); + List regions = getRegionInfo(); + assertNotNull(regions); + assertTrue(regions.size() > 0); + HRegion region = regions.get(0); + CompletableFuture future = CompletableFuture.allOf(admin + .flushRegion(region.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes("non_family"))); + assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future)); + } + @Test public void testFlushRegionServer() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClientWithDisabledFlushProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClientWithDisabledFlushProcedure.java new file mode 100644 index 000000000000..453f1223b047 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClientWithDisabledFlushProcedure.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestFlushFromClientWithDisabledFlushProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushFromClientWithDisabledFlushProcedure.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestFlushFromClientWithDisabledFlushProcedure.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static AsyncConnection asyncConn; + private static final byte[] FAMILY = Bytes.toBytes("info"); + private static final byte[] QUALIFIER = Bytes.toBytes("name"); + + @Rule + public TestName name = new TestName(); + + private TableName tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.setBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, false); + TEST_UTIL.startMiniCluster(1); + asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + Closeables.close(asyncConn, true); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + tableName = TableName.valueOf(name.getMethodName()); + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + List puts = new ArrayList<>(); + for (int i = 0; i <= 10; ++i) { + Put put = new Put(Bytes.toBytes(i)); + put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)); + puts.add(put); + } + t.put(puts); + } + List regions = TEST_UTIL.getHBaseCluster().getRegions(tableName); + assertFalse(regions.isEmpty()); + } + + @After + public void tearDown() throws Exception { + for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { + LOG.info("Tear down, remove table=" + htd.getTableName()); + TEST_UTIL.deleteTable(htd.getTableName()); + } + } + + @Test + public void flushTableWithNonExistingFamily() { + AsyncAdmin admin = asyncConn.getAdmin(); + List families = new ArrayList<>(); + families.add(FAMILY); + families.add(Bytes.toBytes("non_family01")); + families.add(Bytes.toBytes("non_family02")); + assertFalse(TEST_UTIL.getConfiguration().getBoolean( + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)); + CompletableFuture future = CompletableFuture.allOf(admin.flush(tableName, families)); + assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future)); + } +}