This is an automated email from the ASF dual-hosted git repository. tsato pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 9757aa1 camel-netty - Small refactorings 9757aa1 is described below commit 9757aa12e8a9c554cd93ebebceecf5dcc08b6a3c Author: Tadayoshi Sato <sato.tadayo...@gmail.com> AuthorDate: Fri Mar 5 13:13:25 2021 +0900 camel-netty - Small refactorings --- .../netty/http/DefaultNettyHttpBinding.java | 11 ++++----- .../http/handlers/HttpServerChannelHandler.java | 2 +- .../netty/handlers/ServerChannelHandler.java | 27 +++++++++------------- 3 files changed, 16 insertions(+), 24 deletions(-) diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java index 1bef299..c4fbcf8 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java @@ -309,8 +309,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } @Override - public Message toCamelMessage(FullHttpResponse response, Exchange exchange, NettyHttpConfiguration configuration) - throws Exception { + public Message toCamelMessage(FullHttpResponse response, Exchange exchange, NettyHttpConfiguration configuration) { LOG.trace("toCamelMessage: {}", response); NettyHttpMessage answer = new NettyHttpMessage(exchange.getContext(), null, response); @@ -336,8 +335,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } @Override - public Message toCamelMessage(InboundStreamHttpResponse response, Exchange exchange, NettyHttpConfiguration configuration) - throws Exception { + public Message toCamelMessage(InboundStreamHttpResponse response, Exchange exchange, NettyHttpConfiguration configuration) { LOG.trace("toCamelMessage: {}", response); NettyHttpMessage answer = new NettyHttpMessage(exchange.getContext(), null, null); @@ -352,8 +350,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { @Override public void populateCamelHeaders( - HttpResponse response, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) - throws Exception { + HttpResponse response, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) { LOG.trace("populateCamelHeaders: {}", response); headers.put(Exchange.HTTP_RESPONSE_CODE, response.status().code()); @@ -445,7 +442,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { HttpResponse response = null; - if (response == null && body instanceof InputStream && configuration.isDisableStreamCache()) { + if (body instanceof InputStream && configuration.isDisableStreamCache()) { response = new OutboundStreamHttpResponse( (InputStream) body, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code))); response.headers().set(TRANSFER_ENCODING, CHUNKED); diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java index ded58a9..1831d64 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java @@ -321,7 +321,7 @@ public class HttpServerChannelHandler extends ServerChannelHandler { } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // only close if we are still allowed to run if (consumer.isRunAllowed()) { diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java index b40e0c6..7c0eaed 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java @@ -21,7 +21,6 @@ import java.net.SocketAddress; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.component.netty.NettyConstants; @@ -82,9 +81,8 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { - Object in = msg; if (LOG.isDebugEnabled()) { - LOG.debug("Channel: {} received body: {}", ctx.channel(), in); + LOG.debug("Channel: {} received body: {}", ctx.channel(), msg); } // create Exchange and let the consumer process it @@ -147,20 +145,17 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> { } private void processAsynchronously(final Exchange exchange, final ChannelHandlerContext ctx, final Object message) { - consumer.getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - // send back response if the communication is synchronous - try { - if (consumer.getConfiguration().isSync()) { - sendResponse(message, ctx, exchange); - } - } catch (Throwable e) { - consumer.getExceptionHandler().handleException(e); - } finally { - consumer.doneUoW(exchange); - consumer.releaseExchange(exchange, false); + consumer.getAsyncProcessor().process(exchange, doneSync -> { + // send back response if the communication is synchronous + try { + if (consumer.getConfiguration().isSync()) { + sendResponse(message, ctx, exchange); } + } catch (Throwable e) { + consumer.getExceptionHandler().handleException(e); + } finally { + consumer.doneUoW(exchange); + consumer.releaseExchange(exchange, false); } }); }