This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new 401890f  CAMEL-16222: PooledExchangeFactory experiment
401890f is described below

commit 401890f0696e15b0646206f5dc19e387651152aa
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Feb 22 09:38:50 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../apache/camel/component/nats/NatsConsumer.java  | 40 ++++++++++++----------
 .../camel/component/nitrite/NitriteConsumer.java   |  3 +-
 .../apache/camel/component/nsq/NsqConsumer.java    | 13 ++++---
 .../camel/oaipmh/handler/AbstractHandler.java      |  8 +++--
 .../org/apache/camel/oaipmh/handler/Harvester.java |  8 ++---
 .../component/optaplanner/OptaPlannerConsumer.java |  8 ++---
 6 files changed, 44 insertions(+), 36 deletions(-)

diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 11b15d7..0c78677 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -153,30 +153,32 @@ public class NatsConsumer extends DefaultConsumer {
             @Override
             public void onMessage(Message msg) throws InterruptedException {
                 LOG.debug("Received Message: {}", msg);
-                Exchange exchange = getEndpoint().createExchange();
-                exchange.getIn().setBody(msg.getData());
-                exchange.getIn().setHeader(NatsConstants.NATS_REPLY_TO, 
msg.getReplyTo());
-                exchange.getIn().setHeader(NatsConstants.NATS_SID, 
msg.getSID());
-                exchange.getIn().setHeader(NatsConstants.NATS_SUBJECT, 
msg.getSubject());
-                exchange.getIn().setHeader(NatsConstants.NATS_QUEUE_NAME, 
msg.getSubscription().getQueueName());
-                
exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, 
System.currentTimeMillis());
+                Exchange exchange = createExchange(false);
                 try {
+                    exchange.getIn().setBody(msg.getData());
+                    exchange.getIn().setHeader(NatsConstants.NATS_REPLY_TO, 
msg.getReplyTo());
+                    exchange.getIn().setHeader(NatsConstants.NATS_SID, 
msg.getSID());
+                    exchange.getIn().setHeader(NatsConstants.NATS_SUBJECT, 
msg.getSubject());
+                    exchange.getIn().setHeader(NatsConstants.NATS_QUEUE_NAME, 
msg.getSubscription().getQueueName());
+                    
exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, 
System.currentTimeMillis());
+
                     processor.process(exchange);
-                } catch (Exception e) {
-                    getExceptionHandler().handleException("Error during 
processing", exchange, e);
-                }
 
-                // is there a reply?
-                if (!configuration.isReplyToDisabled()
-                        && msg.getReplyTo() != null && msg.getConnection() != 
null) {
-                    Connection con = msg.getConnection();
-                    byte[] data = exchange.getMessage().getBody(byte[].class);
-                    if (data != null) {
-                        LOG.debug("Publishing replyTo: {} message", 
msg.getReplyTo());
-                        con.publish(msg.getReplyTo(), data);
+                    // is there a reply?
+                    if (!configuration.isReplyToDisabled()
+                            && msg.getReplyTo() != null && msg.getConnection() 
!= null) {
+                        Connection con = msg.getConnection();
+                        byte[] data = 
exchange.getMessage().getBody(byte[].class);
+                        if (data != null) {
+                            LOG.debug("Publishing replyTo: {} message", 
msg.getReplyTo());
+                            con.publish(msg.getReplyTo(), data);
+                        }
                     }
+                } catch (Exception e){
+                    getExceptionHandler().handleException("Error during 
processing", exchange, e);
+                } finally {
+                    releaseExchange(exchange, false);
                 }
-
             }
         }
     }
diff --git 
a/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java
 
b/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java
index a9fd003..8742460 100644
--- 
a/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java
+++ 
b/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java
@@ -61,7 +61,7 @@ public class NitriteConsumer extends DefaultConsumer {
         @Override
         public void onChange(ChangeInfo changeInfo) {
             for (ChangedItem changedItem : changeInfo.getChangedItems()) {
-                Exchange exchange = endpoint.createExchange();
+                Exchange exchange = createExchange(false);
                 Message message = exchange.getMessage();
                 message.setHeader(NitriteConstants.CHANGE_TIMESTAMP, 
changedItem.getChangeTimestamp());
                 message.setHeader(NitriteConstants.CHANGE_TYPE, 
changedItem.getChangeType());
@@ -75,6 +75,7 @@ public class NitriteConsumer extends DefaultConsumer {
                     if (exchange.getException() != null) {
                         getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
                     }
+                    releaseExchange(exchange, false);
                 }
             }
         }
diff --git 
a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
 
b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
index 10c44fb..04b9514 100644
--- 
a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
+++ 
b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
@@ -111,12 +111,13 @@ public class NsqConsumer extends DefaultConsumer {
         @Override
         public void message(NSQMessage msg) {
             LOG.debug("Received Message: {}", msg);
-            Exchange exchange = 
getEndpoint().createExchange(ExchangePattern.InOnly);
-            exchange.getIn().setBody(msg.getMessage());
-            exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ID, 
msg.getId());
-            exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS, 
msg.getAttempts());
-            exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_TIMESTAMP, 
msg.getTimestamp());
+            Exchange exchange = createExchange(false);
             try {
+                exchange.setPattern(ExchangePattern.InOnly);
+                exchange.getIn().setBody(msg.getMessage());
+                exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ID, 
msg.getId());
+                exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS, 
msg.getAttempts());
+                exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_TIMESTAMP, 
msg.getTimestamp());
                 if (configuration.getAutoFinish()) {
                     msg.finished();
                 } else {
@@ -129,6 +130,8 @@ public class NsqConsumer extends DefaultConsumer {
                     msg.requeue((int) configuration.getRequeueInterval());
                 }
                 getExceptionHandler().handleException("Error during 
processing", exchange, e);
+            } finally {
+                releaseExchange(exchange, false);
             }
         }
     }
diff --git 
a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java
 
b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java
index 88cfd7d..c204028 100644
--- 
a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java
+++ 
b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.oaipmh.handler;
 
+import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -30,18 +31,20 @@ public abstract class AbstractHandler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractHandler.class);
 
+    protected final Consumer consumer;
     protected final Endpoint endpoint;
     protected final Processor processor;
     protected final ExceptionHandler exceptionHandler;
 
     public AbstractHandler(OAIPMHConsumer consumer) {
+        this.consumer = consumer;
         this.endpoint = consumer.getEndpoint();
         this.processor = consumer.getAsyncProcessor();
         this.exceptionHandler = consumer.getExceptionHandler();
     }
 
     protected void send(OAIPMHResponse message) {
-        Exchange exchange = endpoint.createExchange();
+        Exchange exchange = consumer.createExchange(false);
         String xml = message.getRawResponse();
         exchange.getIn().setBody(xml);
         try {
@@ -49,12 +52,13 @@ public abstract class AbstractHandler {
             LOG.trace("sending exchange: {}", exchange);
             processor.process(exchange);
         } catch (Exception e) {
-            throw new RuntimeCamelException("Error sending exchange: " + 
exchange, e);
+            exchange.setException(e);
         } finally {
             // log exception if an exception occurred and was not handled
             if (exchange.getException() != null) {
                 exceptionHandler.handleException("Error processing exchange", 
exchange, exchange.getException());
             }
+            consumer.releaseExchange(exchange, false);
         }
     }
 
diff --git 
a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java
 
b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java
index 494b426..84a795b 100644
--- 
a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java
+++ 
b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java
@@ -69,7 +69,7 @@ public class Harvester {
 
     }
 
-    private boolean harvest() throws IOException, URISyntaxException, 
ParserConfigurationException, SAXException, Exception {
+    private boolean harvest() throws Exception {
         boolean hasNext = false;
         if (!this.empty) {
             String responseXML = httpClient.doRequest(this.baseURI, this.verb, 
this.set, this.from, this.until, this.metadata,
@@ -88,13 +88,11 @@ public class Harvester {
         return hasNext;
     }
 
-    public void asynHarvest() throws IOException, URISyntaxException, 
ParserConfigurationException, SAXException, Exception {
+    public void asynHarvest() throws Exception {
         this.harvest();
-
     }
 
-    public List<String> synHarvest(boolean onlyFirst)
-            throws IOException, URISyntaxException, 
ParserConfigurationException, SAXException, Exception {
+    public List<String> synHarvest(boolean onlyFirst) throws Exception {
         while (this.harvest()) {
             if (onlyFirst) {
                 break;
diff --git 
a/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java
 
b/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java
index 3ea6ccf..a4dc33f 100644
--- 
a/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java
+++ 
b/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java
@@ -59,22 +59,22 @@ public class OptaPlannerConsumer extends DefaultConsumer {
     }
 
     public void processEvent(BestSolutionChangedEvent<Object> event) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getMessage().setHeader(OptaPlannerConstants.BEST_SOLUTION, 
event.getNewBestSolution());
         try {
             getProcessor().process(exchange);
         } catch (Exception e) {
-            LOG.error("Error processing event ", e);
+            getExceptionHandler().handleException(e);
         }
     }
 
     public void processSolverJobEvent(OptaplannerSolutionEvent event) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getMessage().setHeader(OptaPlannerConstants.BEST_SOLUTION, 
event.getBestSolution());
         try {
             getProcessor().process(exchange);
         } catch (Exception e) {
-            LOG.error("Error processing event ", e);
+            getExceptionHandler().handleException(e);
         }
     }
 

Reply via email to