This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new 01c91e2 CAMEL-16222: PooledExchangeFactory experiment 01c91e2 is described below commit 01c91e20e82fc4b7e4f27b53f89874eb10d67bdb Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Feb 22 07:22:53 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../apache/camel/component/jbpm/JBPMConsumer.java | 7 ++-- .../camel/component/jcache/JCacheConsumer.java | 7 +--- .../jclouds/JcloudsBlobStoreConsumer.java | 2 +- .../camel/component/jclouds/JcloudsConsumer.java | 46 ---------------------- .../camel/component/jcr/EndpointEventListener.java | 12 ++++-- .../apache/camel/component/jcr/JcrConsumer.java | 2 +- .../jgroups/raft/CamelRoleChangeListener.java | 17 +++++--- .../jgroups/raft/JGroupsRaftConsumer.java | 2 +- .../jgroups/raft/JGroupsRaftEndpoint.java | 7 ---- .../component/jgroups/CamelJGroupsReceiver.java | 19 ++++++--- .../camel/component/jgroups/JGroupsConsumer.java | 2 +- .../apache/camel/component/jira/JiraConstants.java | 2 + .../jira/consumer/NewCommentsConsumer.java | 2 +- .../component/jira/consumer/NewIssuesConsumer.java | 2 +- .../jira/consumer/WatchUpdatesConsumer.java | 9 +++-- .../apache/camel/component/jmx/JMXConsumer.java | 4 +- .../apache/camel/component/jooq/JooqConsumer.java | 2 +- .../apache/camel/component/jpa/JpaConsumer.java | 25 ++++++++---- .../component/jt400/Jt400DataQueueConsumer.java | 2 +- .../component/jt400/Jt400MsgQueueConsumer.java | 2 +- 20 files changed, 72 insertions(+), 101 deletions(-) diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java index 343bfc7..a8d6c59 100644 --- a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java +++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java @@ -19,7 +19,6 @@ package org.apache.camel.component.jbpm; import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.component.jbpm.emitters.CamelEventEmitter; import org.apache.camel.component.jbpm.listeners.CamelCaseEventListener; @@ -98,7 +97,7 @@ public class JBPMConsumer extends DefaultConsumer implements DeploymentEventList } public void sendMessage(String eventType, Object body) { - Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly); + Exchange exchange = createExchange(false); exchange.getIn().setHeader("EventType", eventType); exchange.getIn().setBody(body); @@ -111,6 +110,7 @@ public class JBPMConsumer extends DefaultConsumer implements DeploymentEventList if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + releaseExchange(exchange, false); } }); } else { @@ -124,6 +124,7 @@ public class JBPMConsumer extends DefaultConsumer implements DeploymentEventList if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + releaseExchange(exchange, false); } } @@ -131,7 +132,6 @@ public class JBPMConsumer extends DefaultConsumer implements DeploymentEventList public void onDeploy(DeploymentEvent event) { InternalRuntimeManager manager = (InternalRuntimeManager) event.getDeployedUnit().getRuntimeManager(); configure(manager, this); - } @Override @@ -156,7 +156,6 @@ public class JBPMConsumer extends DefaultConsumer implements DeploymentEventList } configureConsumer(eventListenerType, manager, consumer); - } protected void configureConsumer(String eventListenerType, InternalRuntimeManager manager, JBPMConsumer consumer) { diff --git a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java index 327e242..544a562 100644 --- a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java +++ b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java @@ -28,14 +28,11 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.support.DefaultConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The JCache consumer. */ public class JCacheConsumer extends DefaultConsumer { - private static final Logger LOG = LoggerFactory.getLogger(JCacheConsumer.class); private CacheEntryListenerConfiguration<Object, Object> entryListenerConfiguration; @@ -83,7 +80,7 @@ public class JCacheConsumer extends DefaultConsumer { @Override protected void onEvents(Iterable<CacheEntryEvent<?, ?>> events) { for (CacheEntryEvent<?, ?> event : events) { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); Message message = exchange.getIn(); message.setHeader(JCacheConstants.EVENT_TYPE, event.getEventType().name()); message.setHeader(JCacheConstants.KEY, event.getKey()); @@ -96,7 +93,7 @@ public class JCacheConsumer extends DefaultConsumer { try { getProcessor().process(exchange); } catch (Exception e) { - LOG.error("Error processing event ", e); + getExceptionHandler().handleException(e); } } } diff --git a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java b/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java index 899d323..7812704 100644 --- a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java +++ b/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java @@ -75,7 +75,7 @@ public class JcloudsBlobStoreConsumer extends ScheduledBatchPollingConsumer { if (!Strings.isNullOrEmpty(blobName)) { InputStream body = JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName); if (body != null) { - Exchange exchange = endpoint.createExchange(); + Exchange exchange = createExchange(true); CachedOutputStream cos = new CachedOutputStream(exchange); IOHelper.copy(body, cos); exchange.getIn().setBody(cos.newStreamCache()); diff --git a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsConsumer.java b/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsConsumer.java deleted file mode 100644 index fdcef9c..0000000 --- a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsConsumer.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.jclouds; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.support.ScheduledPollConsumer; - -public class JcloudsConsumer extends ScheduledPollConsumer { - private final JcloudsEndpoint endpoint; - - public JcloudsConsumer(JcloudsEndpoint endpoint, Processor processor) { - super(endpoint, processor); - this.endpoint = endpoint; - } - - @Override - protected int poll() throws Exception { - Exchange exchange = endpoint.createExchange(); - - try { - // send message to next processor in the route - getProcessor().process(exchange); - return 1; // number of messages polled - } finally { - // log exception if an exception occurred and was not handled - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - } - } -} diff --git a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java index 4602722..abc06f6 100644 --- a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java +++ b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java @@ -38,10 +38,12 @@ public class EndpointEventListener implements EventListener { private static final Logger LOG = LoggerFactory.getLogger(EndpointEventListener.class); + private final JcrConsumer consumer; private final JcrEndpoint endpoint; private final Processor processor; - public EndpointEventListener(JcrEndpoint endpoint, Processor processor) { + public EndpointEventListener(JcrConsumer consumer, JcrEndpoint endpoint, Processor processor) { + this.consumer = consumer; this.endpoint = endpoint; this.processor = processor; } @@ -50,10 +52,10 @@ public class EndpointEventListener implements EventListener { public void onEvent(EventIterator events) { LOG.trace("onEvent START"); LOG.debug("{} consumer received JCR events: {}", endpoint, events); - RuntimeCamelException rce = null; + RuntimeCamelException rce; + final Exchange exchange = createExchange(events); try { - final Exchange exchange = createExchange(events); try { LOG.debug("Processor, {}, is processing exchange, {}", processor, exchange); @@ -65,6 +67,8 @@ public class EndpointEventListener implements EventListener { rce = exchange.getException(RuntimeCamelException.class); } catch (Exception e) { rce = wrapRuntimeCamelException(e); + } finally { + consumer.releaseExchange(exchange, false); } if (rce != null) { @@ -76,7 +80,7 @@ public class EndpointEventListener implements EventListener { } private Exchange createExchange(EventIterator events) { - Exchange exchange = endpoint.createExchange(); + Exchange exchange = consumer.createExchange(false); List<Event> eventList = new LinkedList<>(); if (events != null) { diff --git a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java index ae7fd5d..9a39485 100644 --- a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java +++ b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java @@ -108,7 +108,7 @@ public class JcrConsumer extends DefaultConsumer { boolean noLocal = getJcrEndpoint().isNoLocal(); - eventListener = new EndpointEventListener(getJcrEndpoint(), getProcessor()); + eventListener = new EndpointEventListener(this, getJcrEndpoint(), getProcessor()); if (LOG.isDebugEnabled()) { LOG.debug("Adding JCR Event Listener, {}, on {}. eventTypes={}, isDeep={}, uuid={}, nodeTypeName={}, noLocal={}", diff --git a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java index 917e052..baf3325 100644 --- a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java +++ b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java @@ -21,7 +21,6 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.support.AsyncProcessorConverterHelper; -import org.apache.camel.util.ObjectHelper; import org.jgroups.protocols.raft.RAFT; import org.jgroups.protocols.raft.Role; import org.slf4j.Logger; @@ -30,13 +29,12 @@ import org.slf4j.LoggerFactory; public class CamelRoleChangeListener implements RAFT.RoleChange { private static final transient Logger LOG = LoggerFactory.getLogger(CamelRoleChangeListener.class); + private final JGroupsRaftConsumer consumer; private final JGroupsRaftEndpoint endpoint; private final AsyncProcessor processor; - public CamelRoleChangeListener(JGroupsRaftEndpoint endpoint, Processor processor) { - ObjectHelper.notNull(endpoint, "endpoint"); - ObjectHelper.notNull(processor, "processor"); - + public CamelRoleChangeListener(JGroupsRaftConsumer consumer, JGroupsRaftEndpoint endpoint, Processor processor) { + this.consumer = consumer; this.endpoint = endpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); } @@ -44,7 +42,7 @@ public class CamelRoleChangeListener implements RAFT.RoleChange { @Override public void roleChanged(Role role) { LOG.trace("New Role {} received.", role); - Exchange exchange = endpoint.createExchange(); + Exchange exchange = createExchange(); switch (role) { case Leader: exchange.getIn().setHeader(JGroupsRaftConstants.HEADER_JGROUPSRAFT_EVENT_TYPE, JGroupsRaftEventType.LEADER); @@ -76,4 +74,11 @@ public class CamelRoleChangeListener implements RAFT.RoleChange { throw new JGroupsRaftException("Error in consumer while dispatching exchange containing role " + role, e); } } + + private Exchange createExchange() { + Exchange exchange = consumer.createExchange(true); + endpoint.populateJGroupsRaftHeaders(exchange); + return exchange; + } + } diff --git a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java index cf275c7..623b4d9 100644 --- a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java +++ b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java @@ -42,7 +42,7 @@ public class JGroupsRaftConsumer extends DefaultConsumer { this.clusterName = clusterName; this.enableRoleChangeEvents = enableRoleChangeEvents; - this.roleListener = new CamelRoleChangeListener(endpoint, processor); + this.roleListener = new CamelRoleChangeListener(this, endpoint, processor); } @Override diff --git a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java index 3e0dfac..075f97e 100644 --- a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java +++ b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java @@ -82,13 +82,6 @@ public class JGroupsRaftEndpoint extends DefaultEndpoint { return consumer; } - @Override - public Exchange createExchange() { - Exchange exchange = super.createExchange(); - populateJGroupsRaftHeaders(exchange); - return exchange; - } - public void populateJGroupsRaftHeaders(Exchange exchange) { exchange.getIn().setHeader(JGroupsRaftConstants.HEADER_JGROUPSRAFT_COMMIT_INDEX, resolvedRaftHandle.commitIndex()); exchange.getIn().setHeader(JGroupsRaftConstants.HEADER_JGROUPSRAFT_CURRENT_TERM, resolvedRaftHandle.currentTerm()); diff --git a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java index 9fcdef8..4383903 100644 --- a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java +++ b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java @@ -21,13 +21,14 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.support.AsyncProcessorConverterHelper; -import org.apache.camel.util.ObjectHelper; import org.jgroups.Message; import org.jgroups.ReceiverAdapter; import org.jgroups.View; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.jgroups.JGroupsEndpoint.HEADER_JGROUPS_CHANNEL_ADDRESS; + /** * Implementation of JGroups message receiver ({@code org.jgroups.Receiver}) wrapping incoming messages into Camel * exchanges. Used by {@link JGroupsConsumer}. @@ -36,13 +37,12 @@ public class CamelJGroupsReceiver extends ReceiverAdapter { private static final transient Logger LOG = LoggerFactory.getLogger(CamelJGroupsReceiver.class); + private final JGroupsConsumer consumer; private final JGroupsEndpoint endpoint; private final AsyncProcessor processor; - public CamelJGroupsReceiver(JGroupsEndpoint endpoint, Processor processor) { - ObjectHelper.notNull(endpoint, "endpoint"); - ObjectHelper.notNull(processor, "processor"); - + public CamelJGroupsReceiver(JGroupsConsumer consumer, JGroupsEndpoint endpoint, Processor processor) { + this.consumer = consumer; this.endpoint = endpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); } @@ -50,7 +50,7 @@ public class CamelJGroupsReceiver extends ReceiverAdapter { @Override public void viewAccepted(View view) { if (endpoint.isEnableViewMessages()) { - Exchange exchange = endpoint.createExchange(view); + Exchange exchange = createExchange(view); try { LOG.debug("Processing view: {}", view); processor.process(exchange, new AsyncCallback() { @@ -80,4 +80,11 @@ public class CamelJGroupsReceiver extends ReceiverAdapter { } } + public Exchange createExchange(View view) { + Exchange exchange = consumer.createExchange(true); + exchange.getIn().setHeader(HEADER_JGROUPS_CHANNEL_ADDRESS, endpoint.getResolvedChannel().getAddress()); + exchange.getIn().setBody(view); + return exchange; + } + } diff --git a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java index 7cb4e90..bb8ce3e 100644 --- a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java +++ b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java @@ -40,7 +40,7 @@ public class JGroupsConsumer extends DefaultConsumer { this.endpoint = endpoint; this.clusterName = clusterName; - this.receiver = new CamelJGroupsReceiver(endpoint, processor); + this.receiver = new CamelJGroupsReceiver(this, endpoint, processor); } @Override diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java index d53dd95..7f9ed14 100644 --- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java +++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java @@ -27,6 +27,7 @@ public interface JiraConstants { String ISSUE_ASSIGNEE = "IssueAssignee"; String ISSUE_COMPONENTS = "IssueComponents"; String ISSUE_COMMENT = "IssueComment"; + String ISSUE_CHANGED = "IssueChanged"; String ISSUE_KEY = "IssueKey"; String ISSUE_PRIORITY_ID = "IssuePriorityId"; String ISSUE_PRIORITY_NAME = "IssuePriorityName"; @@ -35,6 +36,7 @@ public interface JiraConstants { String ISSUE_TRANSITION_ID = "IssueTransitionId"; String ISSUE_TYPE_ID = "IssueTypeId"; String ISSUE_TYPE_NAME = "IssueTypeName"; + String ISSUE_WATCHED_ISSUES = "IssueWatchedIssues"; String ISSUE_WATCHERS_ADD = "IssueWatchersAdd"; String ISSUE_WATCHERS_REMOVE = "IssueWatchersRemove"; String JIRA_REST_CLIENT_FACTORY = "JiraRestClientFactory"; diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java index 0b080fb..00c0b38 100644 --- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java +++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java @@ -52,7 +52,7 @@ public class NewCommentsConsumer extends AbstractJiraConsumer { // retrieve from last to first item LIFO for (int i = max; i > -1; i--) { Comment newComment = newComments.get(i); - Exchange e = getEndpoint().createExchange(); + Exchange e = createExchange(true); e.getIn().setBody(newComment); getProcessor().process(e); } diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java index b2fd19e..b5d09ef 100644 --- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java +++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java @@ -61,7 +61,7 @@ public class NewIssuesConsumer extends AbstractJiraConsumer { // In the end, we want only *new* issues oldest to newest. for (int i = newIssues.size() - 1; i > -1; i--) { Issue newIssue = newIssues.get(i); - Exchange e = getEndpoint().createExchange(); + Exchange e = createExchange(true); e.getIn().setBody(newIssue); getProcessor().process(e); } diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java index a97d5fb..6a512c5 100644 --- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java +++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import com.atlassian.jira.rest.client.api.domain.Issue; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.component.jira.JiraConstants; import org.apache.camel.component.jira.JiraEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,11 +108,11 @@ public class WatchUpdatesConsumer extends AbstractJiraConsumer { } private void processExchange(Object body, String issueKey, String changed) throws Exception { - Exchange e = getEndpoint().createExchange(); + Exchange e = createExchange(true); e.getIn().setBody(body); - e.getIn().setHeader("issueKey", issueKey); - e.getIn().setHeader("changed", changed); - e.getIn().setHeader("watchedIssues", watchedIssuesKeys); + e.getIn().setHeader(JiraConstants.ISSUE_KEY, issueKey); + e.getIn().setHeader(JiraConstants.ISSUE_CHANGED, changed); + e.getIn().setHeader(JiraConstants.ISSUE_WATCHED_ISSUES, watchedIssuesKeys); LOG.debug(" {}: {} changed to {}", issueKey, changed, body); getProcessor().process(e); } diff --git a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java index 18171a3..7fa7853 100644 --- a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java +++ b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java @@ -319,7 +319,7 @@ public class JMXConsumer extends DefaultConsumer implements NotificationListener @Override public void handleNotification(Notification aNotification, Object aHandback) { JMXEndpoint ep = getEndpoint(); - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); Message message = exchange.getIn(); message.setHeader("jmx.handback", aHandback); try { @@ -329,7 +329,7 @@ public class JMXConsumer extends DefaultConsumer implements NotificationListener message.setBody(aNotification); } - // process the notification from thred pool to not block this notification callback thread from the JVM + // process the notification from thread pool to not block this notification callback thread from the JVM executorService.submit(() -> { try { getProcessor().process(exchange); diff --git a/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java b/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java index b2868e3..b7ac8e2 100644 --- a/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java +++ b/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java @@ -78,7 +78,7 @@ public class JooqConsumer extends ScheduledBatchPollingConsumer { } protected Exchange createExchange(Object result) { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setBody(result); return exchange; } diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java index 6bb0774..cf99d19 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java @@ -144,7 +144,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { "Error processing last message due: {}. Will commit all previous successful processed message, and ignore this last failure.", cause.getMessage(), cause); } else { - // rollback all by throwning exception + // rollback all by throwing exception throw cause; } } @@ -200,14 +200,23 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { // process the current exchange LOG.debug("Processing exchange: {}", exchange); - getProcessor().process(exchange); - if (exchange.getException() != null) { - // if we failed then throw exception - throw exchange.getException(); + try { + getProcessor().process(exchange); + } catch (Exception e) { + exchange.setException(e); } - // Run the @Consumed callback - getDeleteHandler().deleteObject(entityManager, result, exchange); + try { + if (exchange.getException() != null) { + // if we failed then throw exception + throw exchange.getException(); + } else { + // Run the @Consumed callback + getDeleteHandler().deleteObject(entityManager, result, exchange); + } + } finally { + releaseExchange(exchange, false); + } } } @@ -514,7 +523,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer { } protected Exchange createExchange(Object result, EntityManager entityManager) { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(false); exchange.getIn().setBody(result); exchange.getIn().setHeader(JpaConstants.ENTITY_MANAGER, entityManager); return exchange; diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java index d73ae11..e7f113c 100644 --- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java +++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java @@ -152,7 +152,7 @@ public class Jt400DataQueueConsumer extends ScheduledPollConsumer { entry = queue.read(key, -1, searchType); } - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); if (entry != null) { exchange.getIn().setHeader(Jt400Endpoint.SENDER_INFORMATION, entry.getSenderInformation()); if (getEndpoint().getFormat() == Jt400Configuration.Format.binary) { diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java index 5dbbe93..8dac242 100755 --- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java +++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java @@ -121,7 +121,7 @@ public class Jt400MsgQueueConsumer extends ScheduledPollConsumer { this.messageKey = entry.getKey(); } - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setHeader(Jt400Constants.SENDER_INFORMATION, entry.getFromJobNumber() + "/" + entry.getUser() + "/" + entry.getFromJobName()); setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_ID, entry.getID());