This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit daa879729bd000402297f52950f33ec27164bce4
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Feb 23 16:20:23 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../component/netty/http/NettyHttpEndpoint.java    | 32 -----------
 .../component/netty/http/NettyHttpMessage.java     | 19 ++++++-
 .../http/handlers/HttpServerChannelHandler.java    | 30 +++++++++++
 .../netty/DefaultServerInitializerFactory.java     |  1 -
 .../camel/component/netty/NettyEndpoint.java       | 10 +---
 .../netty/handlers/ServerChannelHandler.java       | 11 +++-
 .../NettyTextlineInOnlyPooledExchangeTest.java     | 63 ++++++++++++++++++++++
 7 files changed, 121 insertions(+), 45 deletions(-)

diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
index 476aa86..1d5e8e2 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
@@ -18,13 +18,9 @@ package org.apache.camel.component.netty.http;
 
 import java.util.Map;
 
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http.FullHttpRequest;
 import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -121,34 +117,6 @@ public class NettyHttpEndpoint extends NettyEndpoint 
implements AsyncEndpoint, H
     }
 
     @Override
-    public Exchange createExchange(ChannelHandlerContext ctx, Object message) 
throws Exception {
-        Exchange exchange = createExchange();
-
-        Message in;
-        if (message instanceof FullHttpRequest) {
-            FullHttpRequest request = (FullHttpRequest) message;
-            in = getNettyHttpBinding().toCamelMessage(request, exchange, 
getConfiguration());
-        } else {
-            InboundStreamHttpRequest request = (InboundStreamHttpRequest) 
message;
-            in = getNettyHttpBinding().toCamelMessage(request, exchange, 
getConfiguration());
-        }
-        exchange.setIn(in);
-
-        // setup the common message headers
-        updateMessageHeader(in, ctx);
-
-        // honor the character encoding
-        String contentType = in.getHeader(Exchange.CONTENT_TYPE, String.class);
-        String charset = 
NettyHttpHelper.getCharsetFromContentType(contentType);
-        if (charset != null) {
-            exchange.setProperty(Exchange.CHARSET_NAME, charset);
-            in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, charset);
-        }
-
-        return exchange;
-    }
-
-    @Override
     public boolean isLenientProperties() {
         // true to allow dynamic URI options to be configured and passed to 
external system for eg. the HttpProducer
         return true;
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpMessage.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpMessage.java
index e1a50c5..bbe3331 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpMessage.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpMessage.java
@@ -29,8 +29,8 @@ import org.apache.camel.support.DefaultMessage;
  */
 public class NettyHttpMessage extends DefaultMessage {
 
-    private final transient FullHttpRequest httpRequest;
-    private final transient FullHttpResponse httpResponse;
+    private FullHttpRequest httpRequest;
+    private FullHttpResponse httpResponse;
 
     public NettyHttpMessage(CamelContext camelContext, FullHttpRequest 
httpRequest, FullHttpResponse httpResponse) {
         super(camelContext);
@@ -38,14 +38,29 @@ public class NettyHttpMessage extends DefaultMessage {
         this.httpResponse = httpResponse;
     }
 
+    @Override
+    public void reset() {
+        super.reset();
+        httpRequest = null;
+        httpResponse = null;
+    }
+
     public FullHttpRequest getHttpRequest() {
         return httpRequest;
     }
 
+    public void setHttpRequest(FullHttpRequest httpRequest) {
+        this.httpRequest = httpRequest;
+    }
+
     public FullHttpResponse getHttpResponse() {
         return httpResponse;
     }
 
+    public void setHttpResponse(FullHttpResponse httpResponse) {
+        this.httpResponse = httpResponse;
+    }
+
     @Override
     public DefaultMessage newInstance() {
         return new NettyHttpMessage(getCamelContext(), httpRequest, 
httpResponse);
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
index 69ac775..c3409d8 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.DecoderResult;
 import io.netty.handler.codec.base64.Base64;
 import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpHeaderValues;
 import io.netty.handler.codec.http.HttpRequest;
@@ -44,6 +45,7 @@ import org.apache.camel.component.netty.http.HttpPrincipal;
 import org.apache.camel.component.netty.http.InboundStreamHttpRequest;
 import org.apache.camel.component.netty.http.NettyHttpConfiguration;
 import org.apache.camel.component.netty.http.NettyHttpConsumer;
+import org.apache.camel.component.netty.http.NettyHttpHelper;
 import org.apache.camel.component.netty.http.NettyHttpSecurityConfiguration;
 import org.apache.camel.component.netty.http.SecurityAuthenticator;
 import org.apache.camel.spi.CamelLogger;
@@ -339,4 +341,32 @@ public class HttpServerChannelHandler extends 
ServerChannelHandler {
         return 
consumer.getEndpoint().getNettyHttpBinding().toNettyResponse(exchange.getMessage(),
 consumer.getConfiguration());
     }
 
+    @Override
+    protected Exchange createExchange(ChannelHandlerContext ctx, Object 
message) throws Exception {
+        Exchange exchange = consumer.createExchange(false);
+
+        // TODO: optimize to avoid creating new NettyHttpMessage
+        Message in;
+        if (message instanceof FullHttpRequest) {
+            FullHttpRequest request = (FullHttpRequest) message;
+            in = 
consumer.getEndpoint().getNettyHttpBinding().toCamelMessage(request, exchange, 
consumer.getConfiguration());
+        } else {
+            InboundStreamHttpRequest request = (InboundStreamHttpRequest) 
message;
+            in = 
consumer.getEndpoint().getNettyHttpBinding().toCamelMessage(request, exchange, 
consumer.getConfiguration());
+        }
+        exchange.setIn(in);
+
+        // setup the common message headers
+        consumer.getEndpoint().updateMessageHeader(in, ctx);
+
+        // honor the character encoding
+        String contentType = in.getHeader(Exchange.CONTENT_TYPE, String.class);
+        String charset = 
NettyHttpHelper.getCharsetFromContentType(contentType);
+        if (charset != null) {
+            exchange.setProperty(Exchange.CHARSET_NAME, charset);
+            in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, charset);
+        }
+
+        return exchange;
+    }
 }
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java
index 2075bad..dfb605d 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerInitializerFactory.java
@@ -189,5 +189,4 @@ public class DefaultServerInitializerFactory extends 
ServerInitializerFactory {
     public ServerInitializerFactory createPipelineFactory(NettyConsumer 
consumer) {
         return new DefaultServerInitializerFactory(consumer);
     }
-
 }
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
index 6ced9ba..bf9449d 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
@@ -28,7 +28,6 @@ import io.netty.handler.ssl.SslHandler;
 import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -83,13 +82,6 @@ public class NettyEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
         }
     }
 
-    public Exchange createExchange(ChannelHandlerContext ctx, Object message) 
throws Exception {
-        Exchange exchange = createExchange();
-        updateMessageHeader(exchange.getIn(), ctx);
-        NettyPayloadHelper.setIn(exchange, message);
-        return exchange;
-    }
-
     @Override
     public NettyComponent getComponent() {
         return (NettyComponent) super.getComponent();
@@ -119,7 +111,7 @@ public class NettyEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
         return sslSession;
     }
 
-    protected void updateMessageHeader(Message in, ChannelHandlerContext ctx) {
+    public void updateMessageHeader(Message in, ChannelHandlerContext ctx) {
         in.setHeader(NettyConstants.NETTY_CHANNEL_HANDLER_CONTEXT, ctx);
         in.setHeader(NettyConstants.NETTY_REMOTE_ADDRESS, 
ctx.channel().remoteAddress());
         in.setHeader(NettyConstants.NETTY_LOCAL_ADDRESS, 
ctx.channel().localAddress());
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
index dd5b0a4..b40e0c6 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
@@ -88,7 +88,7 @@ public class ServerChannelHandler extends 
SimpleChannelInboundHandler<Object> {
         }
 
         // create Exchange and let the consumer process it
-        final Exchange exchange = consumer.getEndpoint().createExchange(ctx, 
msg);
+        final Exchange exchange = createExchange(ctx, msg);
         if (consumer.getConfiguration().isSync()) {
             exchange.setPattern(ExchangePattern.InOut);
         }
@@ -114,6 +114,13 @@ public class ServerChannelHandler extends 
SimpleChannelInboundHandler<Object> {
         }
     }
 
+    protected Exchange createExchange(ChannelHandlerContext ctx, Object 
message) throws Exception {
+        Exchange exchange = consumer.createExchange(false);
+        consumer.getEndpoint().updateMessageHeader(exchange.getIn(), ctx);
+        NettyPayloadHelper.setIn(exchange, message);
+        return exchange;
+    }
+
     /**
      * Allows any custom logic before the {@link Exchange} is processed by the 
routing engine.
      *
@@ -135,6 +142,7 @@ public class ServerChannelHandler extends 
SimpleChannelInboundHandler<Object> {
             consumer.getExceptionHandler().handleException(e);
         } finally {
             consumer.doneUoW(exchange);
+            consumer.releaseExchange(exchange, false);
         }
     }
 
@@ -151,6 +159,7 @@ public class ServerChannelHandler extends 
SimpleChannelInboundHandler<Object> {
                     consumer.getExceptionHandler().handleException(e);
                 } finally {
                     consumer.doneUoW(exchange);
+                    consumer.releaseExchange(exchange, false);
                 }
             }
         });
diff --git 
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyPooledExchangeTest.java
 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyPooledExchangeTest.java
new file mode 100644
index 0000000..f66ad38
--- /dev/null
+++ 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOnlyPooledExchangeTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.junit.jupiter.api.Test;
+
+public class NettyTextlineInOnlyPooledExchangeTest extends BaseNettyTest {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        camelContext.adapt(ExtendedCamelContext.class).setExchangeFactory(new 
PooledExchangeFactory());
+        return camelContext;
+    }
+
+    @Test
+    public void testOne() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        
template.sendBody("netty:tcp://localhost:{{port}}?textline=true&sync=false", 
"Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testTwo() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", 
"Bye World");
+
+        
template.sendBody("netty:tcp://localhost:{{port}}?textline=true&sync=false", 
"Hello World");
+        
template.sendBody("netty:tcp://localhost:{{port}}?textline=true&sync=false", 
"Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("netty:tcp://localhost:{{port}}?textline=true&sync=false")
+                        .to("mock:result");
+            }
+        };
+    }
+}

Reply via email to