|
39 | 39 | import org.apache.seata.core.rpc.netty.NettyServerConfig; |
40 | 40 | import org.slf4j.Logger; |
41 | 41 | import org.slf4j.LoggerFactory; |
42 | | - |
43 | | -import java.io.IOException; |
44 | | - |
45 | 42 | import org.apache.seata.common.rpc.http.HttpContext; |
46 | | -import org.slf4j.Logger; |
47 | | -import org.slf4j.LoggerFactory; |
48 | 43 |
|
49 | 44 | import java.lang.reflect.Method; |
50 | 45 | import java.util.concurrent.ExecutorService; |
@@ -76,45 +71,36 @@ public class HttpDispatchHandler extends SimpleChannelInboundHandler<HttpRequest |
76 | 71 | } |
77 | 72 |
|
78 | 73 | @Override |
79 | | - protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception { |
| 74 | + protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) { |
80 | 75 | boolean keepAlive = HttpUtil.isKeepAlive(httpRequest) && httpRequest.protocolVersion().isKeepAliveDefault(); |
81 | 76 | try { |
82 | 77 | httpHandlerThreads.execute(() -> { |
83 | 78 | try { |
84 | 79 | processHttpRequest(ctx, httpRequest, keepAlive); |
| 80 | + } catch (IllegalArgumentException e) { |
| 81 | + LOGGER.error("Illegal argument exception: {}", e.getMessage(), e); |
| 82 | + sendErrorResponse(ctx, HttpResponseStatus.BAD_REQUEST, false); |
85 | 83 | } catch (Exception e) { |
86 | | - LOGGER.error("HTTP request processing error", e); |
87 | | - sendResponse(ctx, null, keepAlive); |
| 84 | + LOGGER.error("Exception occurred while processing HTTP request: {}", e.getMessage(), e); |
| 85 | + sendErrorResponse(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, false); |
88 | 86 | } |
89 | 87 | }); |
90 | 88 | } catch (RejectedExecutionException e) { |
91 | | - sendUnavailable(ctx, keepAlive); |
92 | | - LOGGER.warn("HTTP thread pool is full, return 503 status code", e); |
| 89 | + sendErrorResponse(ctx, HttpResponseStatus.SERVICE_UNAVAILABLE, false); |
| 90 | + LOGGER.error("HTTP thread pool is full: {}", e.getMessage(), e); |
93 | 91 | } |
94 | 92 | } |
95 | 93 |
|
96 | | - |
97 | | - /* |
98 | | - catch (IllegalArgumentException e) { |
99 | | - keepAlive = false; |
100 | | - LOGGER.error("Illegal argument exception: {}", e.getMessage(), e); |
101 | | - response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST, |
102 | | - Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER)); |
103 | | - } catch (Exception e) { |
104 | | - keepAlive = false; |
105 | | - LOGGER.error("Exception occurred while processing HTTP request: {}", e.getMessage(), e); |
106 | | - response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR, |
107 | | - Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER)); |
108 | | - } |
109 | | - */ |
110 | 94 | private void processHttpRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, boolean keepAlive) throws Exception { |
111 | 95 | QueryStringDecoder queryStringDecoder = new QueryStringDecoder(httpRequest.uri()); |
112 | 96 | String path = queryStringDecoder.path(); |
113 | 97 | HttpInvocation httpInvocation = ControllerManager.getHttpInvocation(path); |
| 98 | + |
114 | 99 | if (httpInvocation == null) { |
115 | | - sendNotFound(ctx, keepAlive); |
| 100 | + sendErrorResponse(ctx, HttpResponseStatus.NOT_FOUND, keepAlive); |
116 | 101 | return; |
117 | 102 | } |
| 103 | + |
118 | 104 | HttpContext httpContext = new HttpContext(httpRequest, ctx, keepAlive); |
119 | 105 | ObjectNode requestDataNode = OBJECT_MAPPER.createObjectNode(); |
120 | 106 | requestDataNode.putIfAbsent("param", ParameterParser.convertParamMap(queryStringDecoder.parameters())); |
@@ -149,55 +135,30 @@ private void processHttpRequest(ChannelHandlerContext ctx, HttpRequest httpReque |
149 | 135 | return; |
150 | 136 | } |
151 | 137 |
|
152 | | - if (requestDataNode.get("channel") == null) { |
153 | | - return; |
154 | | - } |
155 | | - |
156 | 138 | sendResponse(ctx, result, keepAlive); |
157 | 139 | } |
158 | 140 |
|
159 | | - private void sendResponse(ChannelHandlerContext ctx, Object result, boolean keepAlive) { |
160 | | - try { |
161 | | - FullHttpResponse response; |
162 | | - if (result != null) { |
163 | | - byte[] body = OBJECT_MAPPER.writeValueAsBytes(result); |
164 | | - response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, |
165 | | - Unpooled.wrappedBuffer(body)); |
166 | | - response.headers().set(HttpHeaderNames.CONTENT_LENGTH, body.length); |
167 | | - } else { |
168 | | - response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, |
169 | | - Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER)); |
170 | | - } |
171 | | - if (!keepAlive) { |
172 | | - ctx.writeAndFlush(response).addListeners(ChannelFutureListener.CLOSE); |
173 | | - } else { |
174 | | - ctx.writeAndFlush(response); |
175 | | - } |
176 | | - } catch (JsonProcessingException e) { |
177 | | - LOGGER.error("Failed to serialize response", e); |
178 | | - FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR, |
| 141 | + private void sendResponse(ChannelHandlerContext ctx, Object result, boolean keepAlive) throws JsonProcessingException { |
| 142 | + FullHttpResponse response; |
| 143 | + if (result != null) { |
| 144 | + byte[] body = OBJECT_MAPPER.writeValueAsBytes(result); |
| 145 | + response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, |
| 146 | + Unpooled.wrappedBuffer(body)); |
| 147 | + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, body.length); |
| 148 | + } else { |
| 149 | + response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, |
179 | 150 | Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER)); |
180 | | - if (!keepAlive) { |
181 | | - ctx.writeAndFlush(response).addListeners(ChannelFutureListener.CLOSE); |
182 | | - } else { |
183 | | - ctx.writeAndFlush(response); |
184 | | - } |
185 | 151 | } |
186 | | - } |
187 | | - |
188 | | - private void sendNotFound(ChannelHandlerContext ctx, boolean keepAlive) { |
189 | | - FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, |
190 | | - Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER)); |
191 | 152 | if (!keepAlive) { |
192 | 153 | ctx.writeAndFlush(response).addListeners(ChannelFutureListener.CLOSE); |
193 | 154 | } else { |
194 | 155 | ctx.writeAndFlush(response); |
195 | 156 | } |
196 | 157 | } |
197 | 158 |
|
198 | | - private void sendUnavailable(ChannelHandlerContext ctx, boolean keepAlive) { |
199 | | - FullHttpResponse response = new DefaultFullHttpResponse( |
200 | | - HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER)); |
| 159 | + private void sendErrorResponse(ChannelHandlerContext ctx, HttpResponseStatus status, boolean keepAlive) { |
| 160 | + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, |
| 161 | + Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER)); |
201 | 162 | if (!keepAlive) { |
202 | 163 | ctx.writeAndFlush(response).addListeners(ChannelFutureListener.CLOSE); |
203 | 164 | } else { |
|
0 commit comments