Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hbase-asyncfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -47,6 +48,7 @@
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
Expand All @@ -57,6 +59,7 @@
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DummyDFSOutputStream;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
Expand Down Expand Up @@ -122,7 +125,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {

private final String src;

private HdfsFileStatus stat;
private final HdfsFileStatus stat;

private final ExtendedBlock block;

Expand All @@ -138,6 +141,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {

private final ByteBufAllocator alloc;

// a dummy DFSOutputStream used for lease renewal
private final DummyDFSOutputStream dummyStream;

private static final class Callback {

private final CompletableFuture<Long> future;
Expand Down Expand Up @@ -356,8 +362,9 @@ private void setupReceiver(int timeoutMs) {

FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client,
ClientProtocol namenode, String clientName, String src, HdfsFileStatus stat,
LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
EnumSet<CreateFlag> createFlags, LocatedBlock locatedBlock, Encryptor encryptor,
Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer, ByteBufAllocator alloc,
StreamSlowMonitor streamSlowMonitor) {
this.conf = conf;
this.dfs = dfs;
this.client = client;
Expand All @@ -376,6 +383,7 @@ private void setupReceiver(int timeoutMs) {
this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
this.streamSlowMonitor = streamSlowMonitor;
this.dummyStream = new DummyDFSOutputStream(this, client, src, stat, createFlags, summer);
}

@Override
Expand Down Expand Up @@ -593,7 +601,7 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException
buf = null;
}
closeDataNodeChannelsAndAwait();
endFileLease(client, stat);
endFileLease(this);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
}
Expand All @@ -608,7 +616,7 @@ public void close() throws IOException {
state = State.CLOSED;
closeDataNodeChannelsAndAwait();
block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, stat);
completeFile(this, client, namenode, src, clientName, block, stat);
}

@Override
Expand All @@ -626,4 +634,20 @@ public long getSyncedLength() {
Map<Channel, DatanodeInfo> getDatanodeInfoMap() {
return this.datanodeInfoMap;
}

DFSClient getClient() {
return client;
}

DummyDFSOutputStream getDummyStream() {
return dummyStream;
}

boolean isClosed() {
return state == State.CLOSED;
}

HdfsFileStatus getStat() {
return stat;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ private FanOutOneBlockAsyncDFSOutputHelper() {

private interface LeaseManager {

void begin(DFSClient client, HdfsFileStatus stat);
void begin(FanOutOneBlockAsyncDFSOutput output);

void end(DFSClient client, HdfsFileStatus stat);
void end(FanOutOneBlockAsyncDFSOutput output);
}

private static final LeaseManager LEASE_MANAGER;
Expand Down Expand Up @@ -178,44 +178,28 @@ private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException
beginFileLeaseMethod.setAccessible(true);
Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", String.class);
endFileLeaseMethod.setAccessible(true);
Method getConfigurationMethod = DFSClient.class.getDeclaredMethod("getConfiguration");
getConfigurationMethod.setAccessible(true);
Method getNamespaceMehtod = HdfsFileStatus.class.getDeclaredMethod("getNamespace");

Method getUniqKeyMethod = DFSOutputStream.class.getMethod("getUniqKey");
return new LeaseManager() {

private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
"dfs.client.output.stream.uniq.default.key";
private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT";

private String getUniqId(DFSClient client, HdfsFileStatus stat)
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
// Copied from DFSClient in Hadoop 3.4.0
long fileId = stat.getFileId();
String namespace = (String) getNamespaceMehtod.invoke(stat);
if (namespace == null) {
Configuration conf = (Configuration) getConfigurationMethod.invoke(client);
String defaultKey = conf.get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY,
DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT);
return defaultKey + "_" + fileId;
} else {
return namespace + "_" + fileId;
}
private String getUniqKey(FanOutOneBlockAsyncDFSOutput output)
throws IllegalAccessException, InvocationTargetException {
return (String) getUniqKeyMethod.invoke(output.getDummyStream());
}

@Override
public void begin(DFSClient client, HdfsFileStatus stat) {
public void begin(FanOutOneBlockAsyncDFSOutput output) {
try {
beginFileLeaseMethod.invoke(client, getUniqId(client, stat), null);
beginFileLeaseMethod.invoke(output.getClient(), getUniqKey(output),
output.getDummyStream());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

@Override
public void end(DFSClient client, HdfsFileStatus stat) {
public void end(FanOutOneBlockAsyncDFSOutput output) {
try {
endFileLeaseMethod.invoke(client, getUniqId(client, stat));
endFileLeaseMethod.invoke(output.getClient(), getUniqKey(output));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
Expand All @@ -232,18 +216,19 @@ private static LeaseManager createLeaseManager3() throws NoSuchMethodException {
return new LeaseManager() {

@Override
public void begin(DFSClient client, HdfsFileStatus stat) {
public void begin(FanOutOneBlockAsyncDFSOutput output) {
try {
beginFileLeaseMethod.invoke(client, stat.getFileId(), null);
beginFileLeaseMethod.invoke(output.getClient(), output.getDummyStream().getFileId(),
output.getDummyStream());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

@Override
public void end(DFSClient client, HdfsFileStatus stat) {
public void end(FanOutOneBlockAsyncDFSOutput output) {
try {
endFileLeaseMethod.invoke(client, stat.getFileId());
endFileLeaseMethod.invoke(output.getClient(), output.getDummyStream().getFileId());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -323,12 +308,12 @@ public boolean progress() {
}
}

static void beginFileLease(DFSClient client, HdfsFileStatus stat) {
LEASE_MANAGER.begin(client, stat);
private static void beginFileLease(FanOutOneBlockAsyncDFSOutput output) {
LEASE_MANAGER.begin(output);
}

static void endFileLease(DFSClient client, HdfsFileStatus stat) {
LEASE_MANAGER.end(client, stat);
static void endFileLease(FanOutOneBlockAsyncDFSOutput output) {
LEASE_MANAGER.end(output);
}

static DataChecksum createChecksum(DFSClient client) {
Expand Down Expand Up @@ -540,20 +525,19 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
getDataNodeInfo(toExcludeNodes), retry);
}
EnumSetWritable<CreateFlag> createFlags = getCreateFlags(overwrite, noLocalWrite);
HdfsFileStatus stat;
try {
stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize,
CryptoProtocolVersion.supported());
createFlags, createParent, replication, blockSize, CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
} else {
throw new NameNodeException(e);
}
}
beginFileLease(client, stat);
boolean succ = false;
LocatedBlock locatedBlock = null;
List<Future<Channel>> futureList = null;
Expand All @@ -578,7 +562,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat,
locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
createFlags.get(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
beginFileLease(output);
succ = true;
return output;
} catch (RemoteException e) {
Expand Down Expand Up @@ -617,7 +602,6 @@ public void operationComplete(Future<Channel> future) throws Exception {
});
}
}
endFileLease(client, stat);
}
}
}
Expand Down Expand Up @@ -654,12 +638,13 @@ public static boolean shouldRetryCreate(RemoteException e) {
return e.getClassName().endsWith("RetryStartFileException");
}

static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
ExtendedBlock block, HdfsFileStatus stat) {
static void completeFile(FanOutOneBlockAsyncDFSOutput output, DFSClient client,
ClientProtocol namenode, String src, String clientName, ExtendedBlock block,
HdfsFileStatus stat) {
for (int retry = 0;; retry++) {
try {
if (namenode.complete(src, clientName, block, stat.getFileId())) {
endFileLease(client, stat);
endFileLease(output);
return;
} else {
LOG.warn("complete file " + src + " not finished, retry = " + retry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.util.DataChecksum;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A dummy DFSOutputStream which is mainly used for lease renewal.
* <p>
* We have to put it under this package as we want to override a package private method.
*/
@InterfaceAudience.Private
public final class DummyDFSOutputStream extends DFSOutputStream {

private final AsyncFSOutput delegate;

public DummyDFSOutputStream(AsyncFSOutput output, DFSClient dfsClient, String src,
HdfsFileStatus stat, EnumSet<CreateFlag> flag, DataChecksum checksum) {
super(dfsClient, src, stat, flag, null, checksum, null, false);
this.delegate = output;
}

// public for testing
@Override
public void abort() throws IOException {
delegate.close();
}

@Override
public void close() throws IOException {
delegate.close();
}
}
Loading