Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.execution.OMExecutionFlow;
import org.apache.hadoop.ozone.om.ha.OMHAMetrics;
import org.apache.hadoop.ozone.om.ha.OMHANodeDetails;
import org.apache.hadoop.ozone.om.helpers.BasicOmKeyInfo;
Expand Down Expand Up @@ -422,6 +423,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol;

private OzoneManagerRatisServer omRatisServer;
private OMExecutionFlow omExecutionFlow;
private OmRatisSnapshotProvider omRatisSnapshotProvider;
private OMNodeDetails omNodeDetails;
private final Map<String, OMNodeDetails> peerNodesMap;
Expand Down Expand Up @@ -714,6 +716,10 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
if (isOmGrpcServerEnabled) {
omS3gGrpcServer = getOmS3gGrpcServer(configuration);
}

// init om gateway for request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gateway -> execution flow

omExecutionFlow = new OMExecutionFlow(this);

ShutdownHookManager.get().addShutdownHook(this::saveOmMetrics,
SHUTDOWN_HOOK_PRIORITY);

Expand Down Expand Up @@ -5031,4 +5037,8 @@ public void checkFeatureEnabled(OzoneManagerVersion feature) throws OMException
throw new OMException("Feature disabled: " + feature, OMException.ResultCodes.NOT_SUPPORTED_OPERATION);
}
}

public OMExecutionFlow getOmExecutionFlow() {
return omExecutionFlow;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.execution;

import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;

import com.google.protobuf.ServiceException;
import java.io.IOException;
import org.apache.hadoop.ozone.om.OMPerformanceMetrics;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OMAuditLogger;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* entry for execution flow for write request.
*/
public class OMExecutionFlow {
private static final Logger LOG = LoggerFactory.getLogger(OMExecutionFlow.class);

private final OzoneManager ozoneManager;
private final OMPerformanceMetrics perfMetrics;

public OMExecutionFlow(OzoneManager om) {
this.ozoneManager = om;
this.perfMetrics = ozoneManager.getPerfMetrics();
}

/**
* External request handling.
*
* @param omRequest the request
* @return OMResponse the response of execution
* @throws ServiceException the exception on execution
*/
public OMResponse submit(OMRequest omRequest) throws ServiceException {
// TODO: currently have only execution after ratis submission, but with new flow can have switch later
return submitExecutionToRatis(omRequest);
}

/**
* Internal request to be directly executed.
*
* @param omRequest the request
* @param clientId clientId of request
* @param callId callId of request
* @return the response of execution
* @throws ServiceException the exception on execution
*/
public OMResponse submitInternal(OMRequest omRequest, ClientId clientId, long callId) throws ServiceException {
return ozoneManager.getOmRatisServer().submitRequest(omRequest, clientId, callId);
}

private OMResponse submitExecutionToRatis(OMRequest request) throws ServiceException {
// 1. create client request and preExecute
OMClientRequest omClientRequest = null;
final OMRequest requestToSubmit;
try {
omClientRequest = OzoneManagerRatisUtils.createClientRequest(request, ozoneManager);
assert (omClientRequest != null);
final OMClientRequest finalOmClientRequest = omClientRequest;
requestToSubmit = captureLatencyNs(perfMetrics.getPreExecuteLatencyNs(),
() -> finalOmClientRequest.preExecute(ozoneManager));
} catch (IOException ex) {
if (omClientRequest != null) {
OMAuditLogger.log(omClientRequest.getAuditBuilder());
omClientRequest.handleRequestFailure(ozoneManager);
}
return OzoneManagerRatisUtils.createErrorResponse(request, ex);
}

// 2. submit request to ratis
OMResponse response = ozoneManager.getOmRatisServer().submitRequest(requestToSubmit);
if (!response.getSuccess()) {
omClientRequest.handleRequestFailure(ozoneManager);
}
return response;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This package contains classes for the OM execution implementation.
*/
package org.apache.hadoop.ozone.om.execution;
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,22 @@ public static GrpcTlsConfig createServerTlsConfig(SecurityConfig conf,

public static OzoneManagerProtocolProtos.OMResponse submitRequest(
OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) throws ServiceException {
return om.getOmRatisServer().submitRequest(omRequest, clientId, callId);
return om.getOmExecutionFlow().submitInternal(omRequest, clientId, callId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear why we need om -> executionFlow -> om in the changes here . Let's change this line and add submitInternal later when they are needed. We can see if there is a better way to arrange the code at that time.

Copy link
Contributor Author

@sumitagrawl sumitagrawl Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need have call to ratis from common place, as we may need append the request with passing index, for,
HDDS-11901. use om managed index for handling concern - #7722 (comment)

-- lock may be required for this flow also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change that line as part of PR where we need handling for internal request.

  • OBS-locking do not need, as trash / delete recursive handling as part of FSO. will add later as part of that.
  • Will add as part of "HDDS-11901. use om managed index for handling request" to this PR, here we need.

}

public static OzoneManagerProtocolProtos.OMResponse createErrorResponse(
OMRequest omRequest, IOException exception) {
// Added all write command types here, because in future if any of the
// preExecute is changed to return IOException, we can return the error
// OMResponse to the client.
OzoneManagerProtocolProtos.OMResponse.Builder omResponse = OzoneManagerProtocolProtos.OMResponse.newBuilder()
.setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
.setCmdType(omRequest.getCmdType())
.setTraceID(omRequest.getTraceID())
.setSuccess(false);
if (exception.getMessage() != null) {
omResponse.setMessage(exception.getMessage());
}
return omResponse.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY;
import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER;
import static org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.createClientRequest;
import static org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.createErrorResponse;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.PrepareStatus;
import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;

Expand All @@ -38,12 +38,10 @@
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.helpers.OMAuditLogger;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.request.validation.RequestValidations;
import org.apache.hadoop.ozone.om.request.validation.ValidationContext;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
Expand Down Expand Up @@ -173,43 +171,13 @@ private OMResponse internalProcessRequest(OMRequest request) throws ServiceExcep
return cached;
}

// process new request
OMClientRequest omClientRequest = null;
final OMRequest requestToSubmit;
try {
omClientRequest = createClientRequest(request, ozoneManager);
// TODO: Note: Due to HDDS-6055, createClientRequest() could now
// return null, which triggered the findbugs warning.
// Added the assertion.
assert (omClientRequest != null);
OMClientRequest finalOmClientRequest = omClientRequest;

requestToSubmit = preExecute(finalOmClientRequest);
this.lastRequestToSubmit = requestToSubmit;
} catch (IOException ex) {
if (omClientRequest != null) {
OMAuditLogger.log(omClientRequest.getAuditBuilder());
omClientRequest.handleRequestFailure(ozoneManager);
}
return createErrorResponse(request, ex);
}

final OMResponse response = omRatisServer.submitRequest(requestToSubmit);
if (!response.getSuccess()) {
omClientRequest.handleRequestFailure(ozoneManager);
}
return response;
this.lastRequestToSubmit = request;
return ozoneManager.getOmExecutionFlow().submit(request);
} finally {
OzoneManager.setS3Auth(null);
}
}

private OMRequest preExecute(OMClientRequest finalOmClientRequest)
throws IOException {
return captureLatencyNs(perfMetrics.getPreExecuteLatencyNs(),
() -> finalOmClientRequest.preExecute(ozoneManager));
}

@VisibleForTesting
public OMRequest getLastRequestToSubmit() {
return lastRequestToSubmit;
Expand Down Expand Up @@ -248,23 +216,6 @@ private ServiceException createLeaderNotReadyException() {
return new ServiceException(leaderNotReadyException);
}

/** @return an {@link OMResponse} from the given {@link OMRequest} and the given exception. */
private OMResponse createErrorResponse(
OMRequest omRequest, IOException exception) {
// Added all write command types here, because in future if any of the
// preExecute is changed to return IOException, we can return the error
// OMResponse to the client.
OMResponse.Builder omResponse = OMResponse.newBuilder()
.setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
.setCmdType(omRequest.getCmdType())
.setTraceID(omRequest.getTraceID())
.setSuccess(false);
if (exception.getMessage() != null) {
omResponse.setMessage(exception.getMessage());
}
return omResponse.build();
}

public static Logger getLog() {
return LOG;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.execution.OMExecutionFlow;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
Expand Down Expand Up @@ -109,6 +110,8 @@ public void testUnknownRequestHandling()
omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration,
ozoneManager);
when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
OMExecutionFlow omGateway = new OMExecutionFlow(ozoneManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gateway -> execution flow

when(ozoneManager.getOmExecutionFlow()).thenReturn(omGateway);
when(ozoneManager.getConfiguration()).thenReturn(ozoneConfiguration);
final OmConfig omConfig = ozoneConfiguration.getObject(OmConfig.class);
when(ozoneManager.getConfig()).thenReturn(omConfig);
Expand Down