diff --git a/hbase-asyncfs/pom.xml b/hbase-asyncfs/pom.xml index d0df0ba9e65f..4e729906e86c 100644 --- a/hbase-asyncfs/pom.xml +++ b/hbase-asyncfs/pom.xml @@ -99,6 +99,11 @@ mockito-core test + + org.mockito + mockito-inline + test + org.slf4j jcl-over-slf4j diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 2a67d6e27426..2dcbf56ade8f 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -47,6 +48,7 @@ import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.Encryptor; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; @@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.DummyDFSOutputStream; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -122,7 +125,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final String src; - private HdfsFileStatus stat; + private final HdfsFileStatus stat; private final ExtendedBlock block; @@ -138,6 +141,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final ByteBufAllocator alloc; + // a dummy DFSOutputStream used for lease renewal + private final DummyDFSOutputStream dummyStream; + private static final class Callback { private final CompletableFuture future; @@ -356,8 +362,9 @@ private void setupReceiver(int timeoutMs) { FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client, ClientProtocol namenode, String clientName, String src, HdfsFileStatus stat, - LocatedBlock locatedBlock, Encryptor encryptor, Map datanodeInfoMap, - DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) { + EnumSet createFlags, LocatedBlock locatedBlock, Encryptor encryptor, + Map datanodeInfoMap, DataChecksum summer, ByteBufAllocator alloc, + StreamSlowMonitor streamSlowMonitor) { this.conf = conf; this.dfs = dfs; this.client = client; @@ -376,6 +383,7 @@ private void setupReceiver(int timeoutMs) { this.state = State.STREAMING; setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); this.streamSlowMonitor = streamSlowMonitor; + this.dummyStream = new DummyDFSOutputStream(this, client, src, stat, createFlags, summer); } @Override @@ -593,7 +601,7 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException buf = null; } closeDataNodeChannelsAndAwait(); - endFileLease(client, stat); + endFileLease(this); RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, reporter == null ? new CancelOnClose(client) : reporter); } @@ -608,7 +616,7 @@ public void close() throws IOException { state = State.CLOSED; closeDataNodeChannelsAndAwait(); block.setNumBytes(ackedBlockLength); - completeFile(client, namenode, src, clientName, block, stat); + completeFile(this, client, namenode, src, clientName, block, stat); } @Override @@ -626,4 +634,20 @@ public long getSyncedLength() { Map getDatanodeInfoMap() { return this.datanodeInfoMap; } + + DFSClient getClient() { + return client; + } + + DummyDFSOutputStream getDummyStream() { + return dummyStream; + } + + boolean isClosed() { + return state == State.CLOSED; + } + + HdfsFileStatus getStat() { + return stat; + } } diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 99359f765507..7d74adb44b86 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -141,9 +141,9 @@ private FanOutOneBlockAsyncDFSOutputHelper() { private interface LeaseManager { - void begin(DFSClient client, HdfsFileStatus stat); + void begin(FanOutOneBlockAsyncDFSOutput output); - void end(DFSClient client, HdfsFileStatus stat); + void end(FanOutOneBlockAsyncDFSOutput output); } private static final LeaseManager LEASE_MANAGER; @@ -178,44 +178,28 @@ private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException beginFileLeaseMethod.setAccessible(true); Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", String.class); endFileLeaseMethod.setAccessible(true); - Method getConfigurationMethod = DFSClient.class.getDeclaredMethod("getConfiguration"); - getConfigurationMethod.setAccessible(true); - Method getNamespaceMehtod = HdfsFileStatus.class.getDeclaredMethod("getNamespace"); - + Method getUniqKeyMethod = DFSOutputStream.class.getMethod("getUniqKey"); return new LeaseManager() { - private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY = - "dfs.client.output.stream.uniq.default.key"; - private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT"; - - private String getUniqId(DFSClient client, HdfsFileStatus stat) - throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { - // Copied from DFSClient in Hadoop 3.4.0 - long fileId = stat.getFileId(); - String namespace = (String) getNamespaceMehtod.invoke(stat); - if (namespace == null) { - Configuration conf = (Configuration) getConfigurationMethod.invoke(client); - String defaultKey = conf.get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY, - DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT); - return defaultKey + "_" + fileId; - } else { - return namespace + "_" + fileId; - } + private String getUniqKey(FanOutOneBlockAsyncDFSOutput output) + throws IllegalAccessException, InvocationTargetException { + return (String) getUniqKeyMethod.invoke(output.getDummyStream()); } @Override - public void begin(DFSClient client, HdfsFileStatus stat) { + public void begin(FanOutOneBlockAsyncDFSOutput output) { try { - beginFileLeaseMethod.invoke(client, getUniqId(client, stat), null); + beginFileLeaseMethod.invoke(output.getClient(), getUniqKey(output), + output.getDummyStream()); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } @Override - public void end(DFSClient client, HdfsFileStatus stat) { + public void end(FanOutOneBlockAsyncDFSOutput output) { try { - endFileLeaseMethod.invoke(client, getUniqId(client, stat)); + endFileLeaseMethod.invoke(output.getClient(), getUniqKey(output)); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } @@ -232,18 +216,19 @@ private static LeaseManager createLeaseManager3() throws NoSuchMethodException { return new LeaseManager() { @Override - public void begin(DFSClient client, HdfsFileStatus stat) { + public void begin(FanOutOneBlockAsyncDFSOutput output) { try { - beginFileLeaseMethod.invoke(client, stat.getFileId(), null); + beginFileLeaseMethod.invoke(output.getClient(), output.getDummyStream().getFileId(), + output.getDummyStream()); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } @Override - public void end(DFSClient client, HdfsFileStatus stat) { + public void end(FanOutOneBlockAsyncDFSOutput output) { try { - endFileLeaseMethod.invoke(client, stat.getFileId()); + endFileLeaseMethod.invoke(output.getClient(), output.getDummyStream().getFileId()); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } @@ -323,12 +308,12 @@ public boolean progress() { } } - static void beginFileLease(DFSClient client, HdfsFileStatus stat) { - LEASE_MANAGER.begin(client, stat); + private static void beginFileLease(FanOutOneBlockAsyncDFSOutput output) { + LEASE_MANAGER.begin(output); } - static void endFileLease(DFSClient client, HdfsFileStatus stat) { - LEASE_MANAGER.end(client, stat); + static void endFileLease(FanOutOneBlockAsyncDFSOutput output) { + LEASE_MANAGER.end(output); } static DataChecksum createChecksum(DFSClient client) { @@ -540,12 +525,12 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src, getDataNodeInfo(toExcludeNodes), retry); } + EnumSetWritable createFlags = getCreateFlags(overwrite, noLocalWrite); HdfsFileStatus stat; try { stat = FILE_CREATOR.create(namenode, src, FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, - getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize, - CryptoProtocolVersion.supported()); + createFlags, createParent, replication, blockSize, CryptoProtocolVersion.supported()); } catch (Exception e) { if (e instanceof RemoteException) { throw (RemoteException) e; @@ -553,7 +538,6 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d throw new NameNodeException(e); } } - beginFileLease(client, stat); boolean succ = false; LocatedBlock locatedBlock = null; List> futureList = null; @@ -578,7 +562,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d Encryptor encryptor = createEncryptor(conf, stat, client); FanOutOneBlockAsyncDFSOutput output = new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat, - locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); + createFlags.get(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); + beginFileLease(output); succ = true; return output; } catch (RemoteException e) { @@ -617,7 +602,6 @@ public void operationComplete(Future future) throws Exception { }); } } - endFileLease(client, stat); } } } @@ -654,12 +638,13 @@ public static boolean shouldRetryCreate(RemoteException e) { return e.getClassName().endsWith("RetryStartFileException"); } - static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, - ExtendedBlock block, HdfsFileStatus stat) { + static void completeFile(FanOutOneBlockAsyncDFSOutput output, DFSClient client, + ClientProtocol namenode, String src, String clientName, ExtendedBlock block, + HdfsFileStatus stat) { for (int retry = 0;; retry++) { try { if (namenode.complete(src, clientName, block, stat.getFileId())) { - endFileLease(client, stat); + endFileLease(output); return; } else { LOG.warn("complete file " + src + " not finished, retry = " + retry); diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hdfs/DummyDFSOutputStream.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hdfs/DummyDFSOutputStream.java new file mode 100644 index 000000000000..c92ff416b0cb --- /dev/null +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hdfs/DummyDFSOutputStream.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.EnumSet; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.util.DataChecksum; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A dummy DFSOutputStream which is mainly used for lease renewal. + *

+ * We have to put it under this package as we want to override a package private method. + */ +@InterfaceAudience.Private +public final class DummyDFSOutputStream extends DFSOutputStream { + + private final AsyncFSOutput delegate; + + public DummyDFSOutputStream(AsyncFSOutput output, DFSClient dfsClient, String src, + HdfsFileStatus stat, EnumSet flag, DataChecksum checksum) { + super(dfsClient, src, stat, flag, null, checksum, null, false); + this.delegate = output; + } + + // public for testing + @Override + public void abort() throws IOException { + delegate.close(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } +} diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal.java new file mode 100644 index 000000000000..e8f7188518d0 --- /dev/null +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.DummyDFSOutputStream; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.MockedConstruction; + +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; + +/** + * Make sure lease renewal works. Since it is in a background thread, normal read/write test can not + * verify it. + *

+ * See HBASE-28955 for more details. + */ +@Category({ MiscTests.class, MediumTests.class }) +public class TestLeaseRenewal extends AsyncFSTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestLeaseRenewal.class); + + private static DistributedFileSystem FS; + private static EventLoopGroup EVENT_LOOP_GROUP; + private static Class CHANNEL_CLASS; + private static StreamSlowMonitor MONITOR; + + @BeforeClass + public static void setUp() throws Exception { + startMiniDFSCluster(3); + FS = CLUSTER.getFileSystem(); + EVENT_LOOP_GROUP = new NioEventLoopGroup(); + CHANNEL_CLASS = NioSocketChannel.class; + MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); + } + + @AfterClass + public static void tearDown() throws Exception { + if (EVENT_LOOP_GROUP != null) { + EVENT_LOOP_GROUP.shutdownGracefully().get(); + } + shutdownMiniDFSCluster(); + } + + private FanOutOneBlockAsyncDFSOutput create(String file) + throws IllegalArgumentException, IOException { + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + return FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/test_lease_renew"), true, + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); + } + + @Test + public void testLeaseRenew() throws IOException { + DFSClient client = FS.getClient(); + assertFalse(client.renewLease()); + + FanOutOneBlockAsyncDFSOutput out = create("/test_lease_renew"); + assertTrue(client.renewLease()); + client.closeAllFilesBeingWritten(false); + assertTrue(out.isClosed()); + + assertFalse(client.renewLease()); + + out = create("/test_lease_renew"); + assertTrue(client.renewLease()); + client.closeAllFilesBeingWritten(true); + assertTrue(out.isClosed()); + } + + private Optional getUniqKeyMethod() { + try { + return Optional.of(DFSOutputStream.class.getMethod("getUniqKey")); + } catch (NoSuchMethodException e) { + // should be hadoop 3.3 or below + return Optional.empty(); + } + } + + @Test + public void testEnsureMethodsCalledWhenLeaseRenewal() throws Exception { + try (MockedConstruction mocked = + mockConstruction(DummyDFSOutputStream.class)) { + try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_lease_renewal")) { + DummyDFSOutputStream dummy = mocked.constructed().get(0); + assertTrue(FS.getClient().renewLease()); + Optional getUniqKeyMethod = getUniqKeyMethod(); + if (getUniqKeyMethod.isPresent()) { + getUniqKeyMethod.get().invoke(verify(dummy)); + Method getNamespaceMethod = DFSOutputStream.class.getMethod("getNamespace"); + getNamespaceMethod.invoke(verify(dummy)); + } else { + verify(dummy).getFileId(); + } + verifyNoMoreInteractions(dummy); + } + } + } + + private void verifyGetUniqKey(DummyDFSOutputStream dummy) throws Exception { + Optional getUniqKeyMethod = getUniqKeyMethod(); + if (getUniqKeyMethod.isPresent()) { + getUniqKeyMethod.get().invoke(verify(dummy)); + } else { + verify(dummy).getFileId(); + } + } + + @Test + public void testEnsureMethodsCalledWhenClosing() throws Exception { + try (MockedConstruction mocked = + mockConstruction(DummyDFSOutputStream.class)) { + try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_closing")) { + DummyDFSOutputStream dummy = mocked.constructed().get(0); + verifyGetUniqKey(dummy); + FS.getClient().closeAllFilesBeingWritten(false); + verify(dummy).close(); + + verifyNoMoreInteractions(dummy); + } + } + } + + @Test + public void testEnsureMethodsCalledWhenAborting() throws Exception { + try (MockedConstruction mocked = + mockConstruction(DummyDFSOutputStream.class)) { + try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_aborting")) { + DummyDFSOutputStream dummy = mocked.constructed().get(0); + verifyGetUniqKey(dummy); + FS.getClient().closeAllFilesBeingWritten(true); + verify(dummy).abort(); + verifyNoMoreInteractions(dummy); + } + } + } +} diff --git a/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh b/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh index 0ca7ecc58175..c6a923fb9022 100644 --- a/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh +++ b/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh @@ -75,6 +75,8 @@ allowed_expr="(^org/$|^org/apache/$|^org/apache/hadoop/$" allowed_expr+="|^org/apache/hadoop/hbase" # * classes in packages that start with org.apache.hbase allowed_expr+="|^org/apache/hbase/" +# We have a dummy DFSOutputStream implementation in hbase +allowed_expr+="|^org/apache/hadoop/hdfs/$|^org/apache/hadoop/hdfs/DummyDFSOutputStream.class" # * whatever in the "META-INF" directory allowed_expr+="|^META-INF/" # * the folding tables from jcodings diff --git a/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh b/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh index 0ca7ecc58175..c6a923fb9022 100644 --- a/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh +++ b/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh @@ -75,6 +75,8 @@ allowed_expr="(^org/$|^org/apache/$|^org/apache/hadoop/$" allowed_expr+="|^org/apache/hadoop/hbase" # * classes in packages that start with org.apache.hbase allowed_expr+="|^org/apache/hbase/" +# We have a dummy DFSOutputStream implementation in hbase +allowed_expr+="|^org/apache/hadoop/hdfs/$|^org/apache/hadoop/hdfs/DummyDFSOutputStream.class" # * whatever in the "META-INF" directory allowed_expr+="|^META-INF/" # * the folding tables from jcodings