Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -136,7 +138,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn
.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.intercept(new ClientCredentialInterceptor(userName, encodedToken));
.intercept(new ClientCredentialInterceptor(userName, encodedToken),
new GrpcClientInterceptor());
if (secConfig.isGrpcTlsEnabled()) {
File trustCertCollectionFile = secConfig.getTrustStoreFile();
File privateKeyFile = secConfig.getClientPrivateKeyFile();
Expand Down Expand Up @@ -204,7 +207,7 @@ public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException {
try {
XceiverClientReply reply;
reply = sendCommandWithRetry(request, null);
reply = sendCommandWithTraceIDAndRetry(request, null);
ContainerCommandResponseProto responseProto = reply.getResponse().get();
return responseProto;
} catch (ExecutionException | InterruptedException e) {
Expand All @@ -217,7 +220,21 @@ public XceiverClientReply sendCommand(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
throws IOException {
Preconditions.checkState(HddsUtils.isReadOnly(request));
return sendCommandWithRetry(request, excludeDns);
return sendCommandWithTraceIDAndRetry(request, excludeDns);
}

private XceiverClientReply sendCommandWithTraceIDAndRetry(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
throws IOException {
try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientGrpc." + request.getCmdType().name())
.startActive(true)) {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
return sendCommandWithRetry(finalPayload, excludeDns);
}
}

private XceiverClientReply sendCommandWithRetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,27 @@
import io.jaegertracing.internal.exceptions.TraceIdOutOfBoundException;
import io.jaegertracing.spi.Codec;
import io.opentracing.propagation.Format;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A jaeger codec to save the current tracing context t a string.
* A jaeger codec to save the current tracing context as a string.
*/
public class StringCodec implements Codec<StringBuilder> {

public static final Logger LOG = LoggerFactory.getLogger(StringCodec.class);
public static final StringFormat FORMAT = new StringFormat();

@Override
public JaegerSpanContext extract(StringBuilder s) {
if (s == null) {
throw new EmptyTracerStateStringException();
}
String value = s.toString();
if (value != null && !value.equals("")) {
String[] parts = value.split(":");
if (parts.length != 4) {
LOG.trace("MalformedTracerStateString: {}", value);
throw new MalformedTracerStateStringException(value);
} else {
String traceId = parts[0];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.tracing;

import io.jaegertracing.internal.JaegerSpanContext;
import io.jaegertracing.internal.exceptions.EmptyTracerStateStringException;
import io.jaegertracing.internal.exceptions.MalformedTracerStateStringException;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertTrue;

class TestStringCodec {

@Test
void testExtract() throws Exception {
StringCodec codec = new StringCodec();

LambdaTestUtils.intercept(EmptyTracerStateStringException.class,
() -> codec.extract(null));

StringBuilder sb = new StringBuilder().append("123");
LambdaTestUtils.intercept(MalformedTracerStateStringException.class,
"String does not match tracer state format",
() -> codec.extract(sb));

sb.append(":456:789");
LambdaTestUtils.intercept(MalformedTracerStateStringException.class,
"String does not match tracer state format",
() -> codec.extract(sb));
sb.append(":66");
JaegerSpanContext context = codec.extract(sb);
String expectedContextString = new String("123:456:789:66");
assertTrue(context.getTraceId().equals("123"));
assertTrue(context.toString().equals(expectedContextString));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.tracing;
/**
Test cases for ozone tracing.
*/
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,6 @@ public static ContainerCommandRequestProto getBlockRequest(
*/
public static void verifyGetBlock(ContainerCommandRequestProto request,
ContainerCommandResponseProto response, int expectedChunksCount) {
Assert.assertEquals(request.getTraceID(), response.getTraceID());
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertEquals(expectedChunksCount,
response.getGetBlock().getBlockData().getChunksCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ public void testContainerReplication() throws Exception {

Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(
putBlockRequest.getTraceID().equals(response.getTraceID()));

HddsDatanodeService destinationDatanode =
chooseDatanodeWithoutContainer(sourcePipelines,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ public void testContainerMetrics() throws Exception {
ContainerCommandRequestProto request = ContainerTestHelper
.getCreateContainerRequest(containerID, pipeline);
ContainerCommandResponseProto response = client.sendCommand(request);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ static void runTestOzoneContainerViaDataNode(
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));

// Put Block
putBlockRequest = ContainerTestHelper.getPutBlockRequest(
Expand All @@ -168,8 +167,6 @@ static void runTestOzoneContainerViaDataNode(
response = client.sendCommand(putBlockRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(putBlockRequest.getTraceID()
.equals(response.getTraceID()));

// Get Block
request = ContainerTestHelper.
Expand All @@ -187,7 +184,6 @@ static void runTestOzoneContainerViaDataNode(
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));

//Delete Chunk
request = ContainerTestHelper.getDeleteChunkRequest(
Expand All @@ -196,7 +192,6 @@ static void runTestOzoneContainerViaDataNode(
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));

//Update an existing container
Map<String, String> containerUpdate = new HashMap<String, String>();
Expand Down Expand Up @@ -259,8 +254,6 @@ static void runTestBothGetandPutSmallFile(
ContainerProtos.ContainerCommandResponseProto response
= client.sendCommand(smallFileRequest);
Assert.assertNotNull(response);
Assert.assertTrue(smallFileRequest.getTraceID()
.equals(response.getTraceID()));

final ContainerProtos.ContainerCommandRequestProto getSmallFileRequest
= ContainerTestHelper.getReadSmallFileRequest(client.getPipeline(),
Expand Down Expand Up @@ -310,16 +303,13 @@ public void testCloseContainer() throws Exception {
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
Assert.assertTrue(
putBlockRequest.getTraceID().equals(response.getTraceID()));

// Close the contianer.
request = ContainerTestHelper.getCloseContainer(
client.getPipeline(), containerID);
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));


// Assert that none of the write operations are working after close.
Expand All @@ -330,25 +320,19 @@ public void testCloseContainer() throws Exception {
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
response.getResult());
Assert.assertTrue(
writeChunkRequest.getTraceID().equals(response.getTraceID()));

// Read chunk must work on a closed container.
request = ContainerTestHelper.getReadChunkRequest(client.getPipeline(),
writeChunkRequest.getWriteChunk());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));


// Put block will fail on a closed container.
response = client.sendCommand(putBlockRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
response.getResult());
Assert.assertTrue(putBlockRequest.getTraceID()
.equals(response.getTraceID()));

// Get block must work on the closed container.
request = ContainerTestHelper.getBlockRequest(client.getPipeline(),
Expand All @@ -366,7 +350,6 @@ public void testCloseContainer() throws Exception {
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
} finally {
if (client != null) {
client.close();
Expand Down Expand Up @@ -407,8 +390,6 @@ public void testDeleteContainer() throws Exception {
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
Assert.assertTrue(
putBlockRequest.getTraceID().equals(response.getTraceID()));

// Container cannot be deleted because force flag is set to false and
// the container is still open
Expand All @@ -419,7 +400,6 @@ public void testDeleteContainer() throws Exception {
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER,
response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));

// Container can be deleted, by setting force flag, even with out closing
request = ContainerTestHelper.getDeleteContainer(
Expand All @@ -429,7 +409,6 @@ public void testDeleteContainer() throws Exception {
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));

} finally {
if (client != null) {
Expand Down Expand Up @@ -524,7 +503,6 @@ public static void createContainerForTesting(XceiverClientSpi client,
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
}

public static ContainerProtos.ContainerCommandRequestProto
Expand All @@ -539,30 +517,6 @@ public static void createContainerForTesting(XceiverClientSpi client,
client.sendCommand(writeChunkRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(response.getTraceID().equals(response.getTraceID()));
return writeChunkRequest;
}

static void runRequestWithoutTraceId(
long containerID, XceiverClientSpi client) throws Exception {
try {
client.connect();
createContainerForTesting(client, containerID);
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
final ContainerProtos.ContainerCommandRequestProto smallFileRequest
= ContainerTestHelper.getWriteSmallFileRequest(
client.getPipeline(), blockID, 1024);

ContainerProtos.ContainerCommandResponseProto response
= client.sendCommand(smallFileRequest);
Assert.assertNotNull(response);
Assert.assertTrue(smallFileRequest.getTraceID()
.equals(response.getTraceID()));
} finally {
if (client != null) {
client.close();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ public static void createContainerForTesting(XceiverClientSpi client,
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
}

private StateContext getContext(DatanodeDetails datanodeDetails) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ public static void createContainerForTesting(XceiverClientSpi client,
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
}

private StateContext getContext(DatanodeDetails datanodeDetails) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ static void runTestClientServer(
Assert.assertNotNull(request.getTraceID());

ContainerCommandResponseProto response = client.sendCommand(request);
Assert.assertEquals(request.getTraceID(), response.getTraceID());
} finally {
if (client != null) {
client.close();
Expand Down Expand Up @@ -245,7 +244,6 @@ caClient, createReplicationService(
ContainerTestHelper.getCreateContainerRequest(
ContainerTestHelper.getTestContainerID(), pipeline);
ContainerCommandResponseProto response = client.sendCommand(request);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
} finally {
if (client != null) {
Expand Down
Loading