This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new 01c91e2  CAMEL-16222: PooledExchangeFactory experiment
01c91e2 is described below

commit 01c91e20e82fc4b7e4f27b53f89874eb10d67bdb
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Feb 22 07:22:53 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../apache/camel/component/jbpm/JBPMConsumer.java  |  7 ++--
 .../camel/component/jcache/JCacheConsumer.java     |  7 +---
 .../jclouds/JcloudsBlobStoreConsumer.java          |  2 +-
 .../camel/component/jclouds/JcloudsConsumer.java   | 46 ----------------------
 .../camel/component/jcr/EndpointEventListener.java | 12 ++++--
 .../apache/camel/component/jcr/JcrConsumer.java    |  2 +-
 .../jgroups/raft/CamelRoleChangeListener.java      | 17 +++++---
 .../jgroups/raft/JGroupsRaftConsumer.java          |  2 +-
 .../jgroups/raft/JGroupsRaftEndpoint.java          |  7 ----
 .../component/jgroups/CamelJGroupsReceiver.java    | 19 ++++++---
 .../camel/component/jgroups/JGroupsConsumer.java   |  2 +-
 .../apache/camel/component/jira/JiraConstants.java |  2 +
 .../jira/consumer/NewCommentsConsumer.java         |  2 +-
 .../component/jira/consumer/NewIssuesConsumer.java |  2 +-
 .../jira/consumer/WatchUpdatesConsumer.java        |  9 +++--
 .../apache/camel/component/jmx/JMXConsumer.java    |  4 +-
 .../apache/camel/component/jooq/JooqConsumer.java  |  2 +-
 .../apache/camel/component/jpa/JpaConsumer.java    | 25 ++++++++----
 .../component/jt400/Jt400DataQueueConsumer.java    |  2 +-
 .../component/jt400/Jt400MsgQueueConsumer.java     |  2 +-
 20 files changed, 72 insertions(+), 101 deletions(-)

diff --git 
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
 
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
index 343bfc7..a8d6c59 100644
--- 
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
+++ 
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.jbpm;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.component.jbpm.emitters.CamelEventEmitter;
 import org.apache.camel.component.jbpm.listeners.CamelCaseEventListener;
@@ -98,7 +97,7 @@ public class JBPMConsumer extends DefaultConsumer implements 
DeploymentEventList
     }
 
     public void sendMessage(String eventType, Object body) {
-        Exchange exchange = 
getEndpoint().createExchange(ExchangePattern.InOnly);
+        Exchange exchange = createExchange(false);
         exchange.getIn().setHeader("EventType", eventType);
 
         exchange.getIn().setBody(body);
@@ -111,6 +110,7 @@ public class JBPMConsumer extends DefaultConsumer 
implements DeploymentEventList
                     if (exchange.getException() != null) {
                         getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
                     }
+                    releaseExchange(exchange, false);
                 }
             });
         } else {
@@ -124,6 +124,7 @@ public class JBPMConsumer extends DefaultConsumer 
implements DeploymentEventList
             if (exchange.getException() != null) {
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
             }
+            releaseExchange(exchange, false);
         }
     }
 
@@ -131,7 +132,6 @@ public class JBPMConsumer extends DefaultConsumer 
implements DeploymentEventList
     public void onDeploy(DeploymentEvent event) {
         InternalRuntimeManager manager = (InternalRuntimeManager) 
event.getDeployedUnit().getRuntimeManager();
         configure(manager, this);
-
     }
 
     @Override
@@ -156,7 +156,6 @@ public class JBPMConsumer extends DefaultConsumer 
implements DeploymentEventList
         }
 
         configureConsumer(eventListenerType, manager, consumer);
-
     }
 
     protected void configureConsumer(String eventListenerType, 
InternalRuntimeManager manager, JBPMConsumer consumer) {
diff --git 
a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java
 
b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java
index 327e242..544a562 100644
--- 
a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java
+++ 
b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java
@@ -28,14 +28,11 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The JCache consumer.
  */
 public class JCacheConsumer extends DefaultConsumer {
-    private static final Logger LOG = 
LoggerFactory.getLogger(JCacheConsumer.class);
 
     private CacheEntryListenerConfiguration<Object, Object> 
entryListenerConfiguration;
 
@@ -83,7 +80,7 @@ public class JCacheConsumer extends DefaultConsumer {
                             @Override
                             protected void 
onEvents(Iterable<CacheEntryEvent<?, ?>> events) {
                                 for (CacheEntryEvent<?, ?> event : events) {
-                                    Exchange exchange = 
getEndpoint().createExchange();
+                                    Exchange exchange = createExchange(true);
                                     Message message = exchange.getIn();
                                     
message.setHeader(JCacheConstants.EVENT_TYPE, event.getEventType().name());
                                     message.setHeader(JCacheConstants.KEY, 
event.getKey());
@@ -96,7 +93,7 @@ public class JCacheConsumer extends DefaultConsumer {
                                     try {
                                         getProcessor().process(exchange);
                                     } catch (Exception e) {
-                                        LOG.error("Error processing event ", 
e);
+                                        
getExceptionHandler().handleException(e);
                                     }
                                 }
                             }
diff --git 
a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
 
b/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
index 899d323..7812704 100644
--- 
a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
+++ 
b/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
@@ -75,7 +75,7 @@ public class JcloudsBlobStoreConsumer extends 
ScheduledBatchPollingConsumer {
                 if (!Strings.isNullOrEmpty(blobName)) {
                     InputStream body = 
JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName);
                     if (body != null) {
-                        Exchange exchange = endpoint.createExchange();
+                        Exchange exchange = createExchange(true);
                         CachedOutputStream cos = new 
CachedOutputStream(exchange);
                         IOHelper.copy(body, cos);
                         exchange.getIn().setBody(cos.newStreamCache());
diff --git 
a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsConsumer.java
 
b/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsConsumer.java
deleted file mode 100644
index fdcef9c..0000000
--- 
a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsConsumer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.jclouds;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.support.ScheduledPollConsumer;
-
-public class JcloudsConsumer extends ScheduledPollConsumer {
-    private final JcloudsEndpoint endpoint;
-
-    public JcloudsConsumer(JcloudsEndpoint endpoint, Processor processor) {
-        super(endpoint, processor);
-        this.endpoint = endpoint;
-    }
-
-    @Override
-    protected int poll() throws Exception {
-        Exchange exchange = endpoint.createExchange();
-
-        try {
-            // send message to next processor in the route
-            getProcessor().process(exchange);
-            return 1; // number of messages polled
-        } finally {
-            // log exception if an exception occurred and was not handled
-            if (exchange.getException() != null) {
-                getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-            }
-        }
-    }
-}
diff --git 
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
 
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
index 4602722..abc06f6 100644
--- 
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
+++ 
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
@@ -38,10 +38,12 @@ public class EndpointEventListener implements EventListener 
{
 
     private static final Logger LOG = 
LoggerFactory.getLogger(EndpointEventListener.class);
 
+    private final JcrConsumer consumer;
     private final JcrEndpoint endpoint;
     private final Processor processor;
 
-    public EndpointEventListener(JcrEndpoint endpoint, Processor processor) {
+    public EndpointEventListener(JcrConsumer consumer, JcrEndpoint endpoint, 
Processor processor) {
+        this.consumer = consumer;
         this.endpoint = endpoint;
         this.processor = processor;
     }
@@ -50,10 +52,10 @@ public class EndpointEventListener implements EventListener 
{
     public void onEvent(EventIterator events) {
         LOG.trace("onEvent START");
         LOG.debug("{} consumer received JCR events: {}", endpoint, events);
-        RuntimeCamelException rce = null;
+        RuntimeCamelException rce;
 
+        final Exchange exchange = createExchange(events);
         try {
-            final Exchange exchange = createExchange(events);
 
             try {
                 LOG.debug("Processor, {}, is processing exchange, {}", 
processor, exchange);
@@ -65,6 +67,8 @@ public class EndpointEventListener implements EventListener {
             rce = exchange.getException(RuntimeCamelException.class);
         } catch (Exception e) {
             rce = wrapRuntimeCamelException(e);
+        } finally {
+            consumer.releaseExchange(exchange, false);
         }
 
         if (rce != null) {
@@ -76,7 +80,7 @@ public class EndpointEventListener implements EventListener {
     }
 
     private Exchange createExchange(EventIterator events) {
-        Exchange exchange = endpoint.createExchange();
+        Exchange exchange = consumer.createExchange(false);
 
         List<Event> eventList = new LinkedList<>();
         if (events != null) {
diff --git 
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
 
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
index ae7fd5d..9a39485 100644
--- 
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
+++ 
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
@@ -108,7 +108,7 @@ public class JcrConsumer extends DefaultConsumer {
 
         boolean noLocal = getJcrEndpoint().isNoLocal();
 
-        eventListener = new EndpointEventListener(getJcrEndpoint(), 
getProcessor());
+        eventListener = new EndpointEventListener(this, getJcrEndpoint(), 
getProcessor());
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Adding JCR Event Listener, {}, on {}. eventTypes={}, 
isDeep={}, uuid={}, nodeTypeName={}, noLocal={}",
diff --git 
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java
 
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java
index 917e052..baf3325 100644
--- 
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java
+++ 
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java
@@ -21,7 +21,6 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.util.ObjectHelper;
 import org.jgroups.protocols.raft.RAFT;
 import org.jgroups.protocols.raft.Role;
 import org.slf4j.Logger;
@@ -30,13 +29,12 @@ import org.slf4j.LoggerFactory;
 public class CamelRoleChangeListener implements RAFT.RoleChange {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(CamelRoleChangeListener.class);
 
+    private final JGroupsRaftConsumer consumer;
     private final JGroupsRaftEndpoint endpoint;
     private final AsyncProcessor processor;
 
-    public CamelRoleChangeListener(JGroupsRaftEndpoint endpoint, Processor 
processor) {
-        ObjectHelper.notNull(endpoint, "endpoint");
-        ObjectHelper.notNull(processor, "processor");
-
+    public CamelRoleChangeListener(JGroupsRaftConsumer consumer, 
JGroupsRaftEndpoint endpoint, Processor processor) {
+        this.consumer = consumer;
         this.endpoint = endpoint;
         this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
@@ -44,7 +42,7 @@ public class CamelRoleChangeListener implements 
RAFT.RoleChange {
     @Override
     public void roleChanged(Role role) {
         LOG.trace("New Role {} received.", role);
-        Exchange exchange = endpoint.createExchange();
+        Exchange exchange = createExchange();
         switch (role) {
             case Leader:
                 
exchange.getIn().setHeader(JGroupsRaftConstants.HEADER_JGROUPSRAFT_EVENT_TYPE, 
JGroupsRaftEventType.LEADER);
@@ -76,4 +74,11 @@ public class CamelRoleChangeListener implements 
RAFT.RoleChange {
             throw new JGroupsRaftException("Error in consumer while 
dispatching exchange containing role " + role, e);
         }
     }
+
+    private Exchange createExchange() {
+        Exchange exchange = consumer.createExchange(true);
+        endpoint.populateJGroupsRaftHeaders(exchange);
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java
 
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java
index cf275c7..623b4d9 100644
--- 
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java
+++ 
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java
@@ -42,7 +42,7 @@ public class JGroupsRaftConsumer extends DefaultConsumer {
         this.clusterName = clusterName;
         this.enableRoleChangeEvents = enableRoleChangeEvents;
 
-        this.roleListener = new CamelRoleChangeListener(endpoint, processor);
+        this.roleListener = new CamelRoleChangeListener(this, endpoint, 
processor);
     }
 
     @Override
diff --git 
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java
 
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java
index 3e0dfac..075f97e 100644
--- 
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java
+++ 
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java
@@ -82,13 +82,6 @@ public class JGroupsRaftEndpoint extends DefaultEndpoint {
         return consumer;
     }
 
-    @Override
-    public Exchange createExchange() {
-        Exchange exchange = super.createExchange();
-        populateJGroupsRaftHeaders(exchange);
-        return exchange;
-    }
-
     public void populateJGroupsRaftHeaders(Exchange exchange) {
         
exchange.getIn().setHeader(JGroupsRaftConstants.HEADER_JGROUPSRAFT_COMMIT_INDEX,
 resolvedRaftHandle.commitIndex());
         
exchange.getIn().setHeader(JGroupsRaftConstants.HEADER_JGROUPSRAFT_CURRENT_TERM,
 resolvedRaftHandle.currentTerm());
diff --git 
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java
 
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java
index 9fcdef8..4383903 100644
--- 
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java
+++ 
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java
@@ -21,13 +21,14 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.util.ObjectHelper;
 import org.jgroups.Message;
 import org.jgroups.ReceiverAdapter;
 import org.jgroups.View;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.camel.component.jgroups.JGroupsEndpoint.HEADER_JGROUPS_CHANNEL_ADDRESS;
+
 /**
  * Implementation of JGroups message receiver ({@code org.jgroups.Receiver}) 
wrapping incoming messages into Camel
  * exchanges. Used by {@link JGroupsConsumer}.
@@ -36,13 +37,12 @@ public class CamelJGroupsReceiver extends ReceiverAdapter {
 
     private static final transient Logger LOG = 
LoggerFactory.getLogger(CamelJGroupsReceiver.class);
 
+    private final JGroupsConsumer consumer;
     private final JGroupsEndpoint endpoint;
     private final AsyncProcessor processor;
 
-    public CamelJGroupsReceiver(JGroupsEndpoint endpoint, Processor processor) 
{
-        ObjectHelper.notNull(endpoint, "endpoint");
-        ObjectHelper.notNull(processor, "processor");
-
+    public CamelJGroupsReceiver(JGroupsConsumer consumer, JGroupsEndpoint 
endpoint, Processor processor) {
+        this.consumer = consumer;
         this.endpoint = endpoint;
         this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
@@ -50,7 +50,7 @@ public class CamelJGroupsReceiver extends ReceiverAdapter {
     @Override
     public void viewAccepted(View view) {
         if (endpoint.isEnableViewMessages()) {
-            Exchange exchange = endpoint.createExchange(view);
+            Exchange exchange = createExchange(view);
             try {
                 LOG.debug("Processing view: {}", view);
                 processor.process(exchange, new AsyncCallback() {
@@ -80,4 +80,11 @@ public class CamelJGroupsReceiver extends ReceiverAdapter {
         }
     }
 
+    public Exchange createExchange(View view) {
+        Exchange exchange = consumer.createExchange(true);
+        exchange.getIn().setHeader(HEADER_JGROUPS_CHANNEL_ADDRESS, 
endpoint.getResolvedChannel().getAddress());
+        exchange.getIn().setBody(view);
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
 
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
index 7cb4e90..bb8ce3e 100644
--- 
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
+++ 
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
@@ -40,7 +40,7 @@ public class JGroupsConsumer extends DefaultConsumer {
         this.endpoint = endpoint;
         this.clusterName = clusterName;
 
-        this.receiver = new CamelJGroupsReceiver(endpoint, processor);
+        this.receiver = new CamelJGroupsReceiver(this, endpoint, processor);
     }
 
     @Override
diff --git 
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java
 
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java
index d53dd95..7f9ed14 100644
--- 
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java
+++ 
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java
@@ -27,6 +27,7 @@ public interface JiraConstants {
     String ISSUE_ASSIGNEE = "IssueAssignee";
     String ISSUE_COMPONENTS = "IssueComponents";
     String ISSUE_COMMENT = "IssueComment";
+    String ISSUE_CHANGED = "IssueChanged";
     String ISSUE_KEY = "IssueKey";
     String ISSUE_PRIORITY_ID = "IssuePriorityId";
     String ISSUE_PRIORITY_NAME = "IssuePriorityName";
@@ -35,6 +36,7 @@ public interface JiraConstants {
     String ISSUE_TRANSITION_ID = "IssueTransitionId";
     String ISSUE_TYPE_ID = "IssueTypeId";
     String ISSUE_TYPE_NAME = "IssueTypeName";
+    String ISSUE_WATCHED_ISSUES = "IssueWatchedIssues";
     String ISSUE_WATCHERS_ADD = "IssueWatchersAdd";
     String ISSUE_WATCHERS_REMOVE = "IssueWatchersRemove";
     String JIRA_REST_CLIENT_FACTORY = "JiraRestClientFactory";
diff --git 
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
 
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
index 0b080fb..00c0b38 100644
--- 
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
+++ 
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
@@ -52,7 +52,7 @@ public class NewCommentsConsumer extends AbstractJiraConsumer 
{
         // retrieve from last to first item LIFO
         for (int i = max; i > -1; i--) {
             Comment newComment = newComments.get(i);
-            Exchange e = getEndpoint().createExchange();
+            Exchange e = createExchange(true);
             e.getIn().setBody(newComment);
             getProcessor().process(e);
         }
diff --git 
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
 
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
index b2fd19e..b5d09ef 100644
--- 
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
+++ 
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
@@ -61,7 +61,7 @@ public class NewIssuesConsumer extends AbstractJiraConsumer {
             // In the end, we want only *new* issues oldest to newest.
             for (int i = newIssues.size() - 1; i > -1; i--) {
                 Issue newIssue = newIssues.get(i);
-                Exchange e = getEndpoint().createExchange();
+                Exchange e = createExchange(true);
                 e.getIn().setBody(newIssue);
                 getProcessor().process(e);
             }
diff --git 
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
 
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
index a97d5fb..6a512c5 100644
--- 
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
+++ 
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
 import com.atlassian.jira.rest.client.api.domain.Issue;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.jira.JiraConstants;
 import org.apache.camel.component.jira.JiraEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,11 +108,11 @@ public class WatchUpdatesConsumer extends 
AbstractJiraConsumer {
     }
 
     private void processExchange(Object body, String issueKey, String changed) 
throws Exception {
-        Exchange e = getEndpoint().createExchange();
+        Exchange e = createExchange(true);
         e.getIn().setBody(body);
-        e.getIn().setHeader("issueKey", issueKey);
-        e.getIn().setHeader("changed", changed);
-        e.getIn().setHeader("watchedIssues", watchedIssuesKeys);
+        e.getIn().setHeader(JiraConstants.ISSUE_KEY, issueKey);
+        e.getIn().setHeader(JiraConstants.ISSUE_CHANGED, changed);
+        e.getIn().setHeader(JiraConstants.ISSUE_WATCHED_ISSUES, 
watchedIssuesKeys);
         LOG.debug(" {}: {} changed to {}", issueKey, changed, body);
         getProcessor().process(e);
     }
diff --git 
a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java
 
b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java
index 18171a3..7fa7853 100644
--- 
a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java
+++ 
b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java
@@ -319,7 +319,7 @@ public class JMXConsumer extends DefaultConsumer implements 
NotificationListener
     @Override
     public void handleNotification(Notification aNotification, Object 
aHandback) {
         JMXEndpoint ep = getEndpoint();
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         Message message = exchange.getIn();
         message.setHeader("jmx.handback", aHandback);
         try {
@@ -329,7 +329,7 @@ public class JMXConsumer extends DefaultConsumer implements 
NotificationListener
                 message.setBody(aNotification);
             }
 
-            // process the notification from thred pool to not block this 
notification callback thread from the JVM
+            // process the notification from thread pool to not block this 
notification callback thread from the JVM
             executorService.submit(() -> {
                 try {
                     getProcessor().process(exchange);
diff --git 
a/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java
 
b/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java
index b2868e3..b7ac8e2 100644
--- 
a/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java
+++ 
b/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java
@@ -78,7 +78,7 @@ public class JooqConsumer extends 
ScheduledBatchPollingConsumer {
     }
 
     protected Exchange createExchange(Object result) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setBody(result);
         return exchange;
     }
diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index 6bb0774..cf99d19 100644
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++ 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -144,7 +144,7 @@ public class JpaConsumer extends 
ScheduledBatchPollingConsumer {
                                     "Error processing last message due: {}. 
Will commit all previous successful processed message, and ignore this last 
failure.",
                                     cause.getMessage(), cause);
                         } else {
-                            // rollback all by throwning exception
+                            // rollback all by throwing exception
                             throw cause;
                         }
                     }
@@ -200,14 +200,23 @@ public class JpaConsumer extends 
ScheduledBatchPollingConsumer {
 
                 // process the current exchange
                 LOG.debug("Processing exchange: {}", exchange);
-                getProcessor().process(exchange);
-                if (exchange.getException() != null) {
-                    // if we failed then throw exception
-                    throw exchange.getException();
+                try {
+                    getProcessor().process(exchange);
+                } catch (Exception e) {
+                    exchange.setException(e);
                 }
 
-                // Run the @Consumed callback
-                getDeleteHandler().deleteObject(entityManager, result, 
exchange);
+                try {
+                    if (exchange.getException() != null) {
+                        // if we failed then throw exception
+                        throw exchange.getException();
+                    } else {
+                        // Run the @Consumed callback
+                        getDeleteHandler().deleteObject(entityManager, result, 
exchange);
+                    }
+                } finally {
+                    releaseExchange(exchange, false);
+                }
             }
         }
 
@@ -514,7 +523,7 @@ public class JpaConsumer extends 
ScheduledBatchPollingConsumer {
     }
 
     protected Exchange createExchange(Object result, EntityManager 
entityManager) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(false);
         exchange.getIn().setBody(result);
         exchange.getIn().setHeader(JpaConstants.ENTITY_MANAGER, entityManager);
         return exchange;
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
index d73ae11..e7f113c 100644
--- 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
@@ -152,7 +152,7 @@ public class Jt400DataQueueConsumer extends 
ScheduledPollConsumer {
             entry = queue.read(key, -1, searchType);
         }
 
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         if (entry != null) {
             exchange.getIn().setHeader(Jt400Endpoint.SENDER_INFORMATION, 
entry.getSenderInformation());
             if (getEndpoint().getFormat() == Jt400Configuration.Format.binary) 
{
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
index 5dbbe93..8dac242 100755
--- 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
@@ -121,7 +121,7 @@ public class Jt400MsgQueueConsumer extends 
ScheduledPollConsumer {
             this.messageKey = entry.getKey();
         }
 
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(Jt400Constants.SENDER_INFORMATION,
                 entry.getFromJobNumber() + "/" + entry.getUser() + "/" + 
entry.getFromJobName());
         setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_ID, 
entry.getID());

Reply via email to