This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6824daa4316007724f6a139a120b4007278b2ee6
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Feb 22 10:45:18 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../sip/listener/SipSubscriptionListener.java      |   7 +-
 .../camel/component/slack/SlackConsumer.java       |  23 +++-
 .../camel/component/slack/SlackEndpoint.java       |  26 -----
 .../smpp/MessageReceiverListenerImpl.java          |  24 ++++-
 .../apache/camel/component/smpp/SmppConsumer.java  |   3 +-
 .../apache/camel/component/smpp/SmppEndpoint.java  |  28 -----
 .../smpp/MessageReceiverListenerImplTest.java      | 117 ---------------------
 .../camel/component/smpp/SmppConsumerTest.java     |  12 +++
 .../apache/camel/component/snmp/SnmpEndpoint.java  |  14 ---
 .../camel/component/snmp/SnmpTrapConsumer.java     |  17 ++-
 .../component/SoroushBotAbstractConsumer.java      |  38 +++----
 .../component/SoroushBotMultiThreadConsumer.java   |   1 +
 .../component/SoroushBotSingleThreadConsumer.java  |   1 +
 .../camel/component/splunk/SplunkConsumer.java     |  28 ++---
 .../camel/component/cron/SpringCronConsumer.java   |   2 +-
 .../apache/camel/component/sql/SqlConsumer.java    |   8 +-
 .../apache/camel/component/ssh/SshConsumer.java    |  26 ++---
 .../camel/component/stomp/StompEndpoint.java       |  10 +-
 .../camel/component/stream/StreamConsumer.java     |  12 ++-
 .../camel/component/stream/StreamEndpoint.java     |   9 --
 20 files changed, 137 insertions(+), 269 deletions(-)

diff --git 
a/components/camel-sip/src/main/java/org/apache/camel/component/sip/listener/SipSubscriptionListener.java
 
b/components/camel-sip/src/main/java/org/apache/camel/component/sip/listener/SipSubscriptionListener.java
index 7951940..6738ed8 100644
--- 
a/components/camel-sip/src/main/java/org/apache/camel/component/sip/listener/SipSubscriptionListener.java
+++ 
b/components/camel-sip/src/main/java/org/apache/camel/component/sip/listener/SipSubscriptionListener.java
@@ -48,14 +48,15 @@ public class SipSubscriptionListener implements SipListener 
{
         this.setSipSubscriber(sipSubscriber);
     }
 
-    private void dispatchExchange(Object response) throws CamelException {
+    private void dispatchExchange(Object response) {
         LOG.debug("Consumer Dispatching the received notification along the 
route");
-        Exchange exchange = 
sipSubscriber.getEndpoint().createExchange(ExchangePattern.InOnly);
+        Exchange exchange = sipSubscriber.createExchange(true);
+        exchange.setPattern(ExchangePattern.InOnly);
         exchange.getIn().setBody(response);
         try {
             sipSubscriber.getProcessor().process(exchange);
         } catch (Exception e) {
-            throw new CamelException("Error in consumer while dispatching 
exchange", e);
+            sipSubscriber.getExceptionHandler().handleException("Error in 
consumer while dispatching exchange", e);
         }
     }
 
diff --git 
a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
 
b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
index 396e66e..8ea304c 100644
--- 
a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
+++ 
b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
@@ -25,8 +25,11 @@ import java.util.List;
 import java.util.Queue;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.slack.helper.SlackMessage;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -109,7 +112,7 @@ public class SlackConsumer extends 
ScheduledBatchPollingConsumer {
                     timestamp = (String) singleMess.get("ts");
                 }
                 i++;
-                Exchange exchange = slackEndpoint.createExchange(singleMess);
+                Exchange exchange = createExchange(singleMess);
                 answer.add(exchange);
             }
         }
@@ -185,4 +188,22 @@ public class SlackConsumer extends 
ScheduledBatchPollingConsumer {
         }
     }
 
+    public Exchange createExchange(JsonObject object) {
+        Exchange exchange = createExchange(true);
+        SlackMessage slackMessage = new SlackMessage();
+        String text = object.getString(SlackConstants.SLACK_TEXT_FIELD);
+        String user = object.getString("user");
+        slackMessage.setText(text);
+        slackMessage.setUser(user);
+        if (ObjectHelper.isNotEmpty(object.get("icons"))) {
+            JsonObject icons = object.getMap("icons");
+            if (ObjectHelper.isNotEmpty(icons.get("emoji"))) {
+                slackMessage.setIconEmoji(icons.getString("emoji"));
+            }
+        }
+        Message message = exchange.getIn();
+        message.setBody(slackMessage);
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java
 
b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java
index c5bf928..9893b21 100644
--- 
a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java
+++ 
b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java
@@ -18,20 +18,15 @@ package org.apache.camel.component.slack;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.component.slack.helper.SlackMessage;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.ScheduledPollEndpoint;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.json.JsonObject;
 
 /**
  * Send and receive messages to/from Slack.
@@ -183,25 +178,4 @@ public class SlackEndpoint extends ScheduledPollEndpoint {
         this.serverUrl = serverUrl;
     }
 
-    public Exchange createExchange(JsonObject object) {
-        return createExchange(getExchangePattern(), object);
-    }
-
-    public Exchange createExchange(ExchangePattern pattern, JsonObject object) 
{
-        Exchange exchange = super.createExchange(pattern);
-        SlackMessage slackMessage = new SlackMessage();
-        String text = object.getString(SlackConstants.SLACK_TEXT_FIELD);
-        String user = object.getString("user");
-        slackMessage.setText(text);
-        slackMessage.setUser(user);
-        if (ObjectHelper.isNotEmpty(object.get("icons"))) {
-            JsonObject icons = object.getMap("icons");
-            if (ObjectHelper.isNotEmpty(icons.get("emoji"))) {
-                slackMessage.setIconEmoji(icons.getString("emoji"));
-            }
-        }
-        Message message = exchange.getIn();
-        message.setBody(slackMessage);
-        return exchange;
-    }
 }
diff --git 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/MessageReceiverListenerImpl.java
 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/MessageReceiverListenerImpl.java
index c076335..9eb4e2e 100644
--- 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/MessageReceiverListenerImpl.java
+++ 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/MessageReceiverListenerImpl.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.smpp;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ExceptionHandler;
 import org.jsmpp.bean.AlertNotification;
@@ -37,11 +38,14 @@ public class MessageReceiverListenerImpl implements 
MessageReceiverListener {
     private static final Logger LOG = 
LoggerFactory.getLogger(MessageReceiverListenerImpl.class);
 
     private MessageIDGenerator messageIDGenerator = new 
RandomMessageIDGenerator();
+    private SmppConsumer consumer;
     private SmppEndpoint endpoint;
     private Processor processor;
     private ExceptionHandler exceptionHandler;
 
-    public MessageReceiverListenerImpl(SmppEndpoint endpoint, Processor 
processor, ExceptionHandler exceptionHandler) {
+    public MessageReceiverListenerImpl(SmppConsumer consumer, SmppEndpoint 
endpoint, Processor processor,
+                                       ExceptionHandler exceptionHandler) {
+        this.consumer = consumer;
         this.endpoint = endpoint;
         this.processor = processor;
         this.exceptionHandler = exceptionHandler;
@@ -51,7 +55,7 @@ public class MessageReceiverListenerImpl implements 
MessageReceiverListener {
     public void onAcceptAlertNotification(AlertNotification alertNotification) 
{
         LOG.debug("Received an alertNotification {}", alertNotification);
 
-        Exchange exchange = 
endpoint.createOnAcceptAlertNotificationExchange(alertNotification);
+        Exchange exchange = 
createOnAcceptAlertNotificationExchange(alertNotification);
         try {
             processor.process(exchange);
         } catch (Exception e) {
@@ -62,6 +66,7 @@ public class MessageReceiverListenerImpl implements 
MessageReceiverListener {
             exceptionHandler.handleException("Cannot process exchange. This 
exception will be ignored.", exchange,
                     exchange.getException());
         }
+        consumer.releaseExchange(exchange, false);
     }
 
     @Override
@@ -117,4 +122,19 @@ public class MessageReceiverListenerImpl implements 
MessageReceiverListener {
     public void setMessageIDGenerator(MessageIDGenerator messageIDGenerator) {
         this.messageIDGenerator = messageIDGenerator;
     }
+
+    /**
+     * Create a new exchange for communicating with this endpoint from a SMSC 
with the specified {@link ExchangePattern}
+     * such as whether its going to be an {@link ExchangePattern#InOnly} or 
{@link ExchangePattern#InOut} exchange
+     *
+     * @param  alertNotification the received message from the SMSC
+     * @return                   a new exchange
+     */
+    public Exchange createOnAcceptAlertNotificationExchange(AlertNotification 
alertNotification) {
+        Exchange exchange = consumer.createExchange(false);
+        exchange.setProperty(Exchange.BINDING, endpoint.getBinding());
+        
exchange.setIn(endpoint.getBinding().createSmppMessage(endpoint.getCamelContext(),
 alertNotification));
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index dee6ec3..c04c7f1 100644
--- 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -72,7 +72,8 @@ public class SmppConsumer extends DefaultConsumer {
                 }
             }
         };
-        this.messageReceiverListener = new 
MessageReceiverListenerImpl(getEndpoint(), getProcessor(), 
getExceptionHandler());
+        this.messageReceiverListener
+                = new MessageReceiverListenerImpl(this, getEndpoint(), 
getProcessor(), getExceptionHandler());
     }
 
     @Override
diff --git 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppEndpoint.java
 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppEndpoint.java
index f9751e0..6f3b6d2 100644
--- 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppEndpoint.java
+++ 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppEndpoint.java
@@ -26,7 +26,6 @@ import org.apache.camel.Producer;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.DefaultEndpoint;
-import org.jsmpp.bean.AlertNotification;
 import org.jsmpp.bean.DataSm;
 import org.jsmpp.bean.DeliverSm;
 
@@ -71,33 +70,6 @@ public class SmppEndpoint extends DefaultEndpoint {
     /**
      * Create a new exchange for communicating with this endpoint from a SMSC
      *
-     * @param  alertNotification the received message from the SMSC
-     * @return                   a new exchange
-     */
-    public Exchange createOnAcceptAlertNotificationExchange(AlertNotification 
alertNotification) {
-        return createOnAcceptAlertNotificationExchange(getExchangePattern(), 
alertNotification);
-    }
-
-    /**
-     * Create a new exchange for communicating with this endpoint from a SMSC 
with the specified {@link ExchangePattern}
-     * such as whether its going to be an {@link ExchangePattern#InOnly} or 
{@link ExchangePattern#InOut} exchange
-     *
-     * @param  exchangePattern   the message exchange pattern for the exchange
-     * @param  alertNotification the received message from the SMSC
-     * @return                   a new exchange
-     */
-    public Exchange createOnAcceptAlertNotificationExchange(
-            ExchangePattern exchangePattern,
-            AlertNotification alertNotification) {
-        Exchange exchange = createExchange(exchangePattern);
-        exchange.setProperty(Exchange.BINDING, getBinding());
-        exchange.setIn(getBinding().createSmppMessage(getCamelContext(), 
alertNotification));
-        return exchange;
-    }
-
-    /**
-     * Create a new exchange for communicating with this endpoint from a SMSC
-     *
      * @param  deliverSm the received message from the SMSC
      * @return           a new exchange
      */
diff --git 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/MessageReceiverListenerImplTest.java
 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/MessageReceiverListenerImplTest.java
deleted file mode 100644
index a34f6c8..0000000
--- 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/MessageReceiverListenerImplTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.smpp;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.spi.ExceptionHandler;
-import org.jsmpp.PDUStringException;
-import org.jsmpp.bean.AlertNotification;
-import org.jsmpp.bean.DataSm;
-import org.jsmpp.bean.DeliverSm;
-import org.jsmpp.bean.OptionalParameter;
-import org.jsmpp.session.DataSmResult;
-import org.jsmpp.session.SMPPSession;
-import org.jsmpp.util.MessageIDGenerator;
-import org.jsmpp.util.MessageId;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class MessageReceiverListenerImplTest {
-
-    private MessageReceiverListenerImpl listener;
-    private SmppEndpoint endpoint;
-    private Processor processor;
-    private ExceptionHandler exceptionHandler;
-
-    @BeforeEach
-    public void setUp() {
-        endpoint = mock(SmppEndpoint.class);
-        processor = mock(Processor.class);
-        exceptionHandler = mock(ExceptionHandler.class);
-
-        listener = new MessageReceiverListenerImpl(endpoint, processor, 
exceptionHandler);
-        listener.setMessageIDGenerator(new MessageIDGenerator() {
-            public MessageId newMessageId() {
-                try {
-                    return new MessageId("1");
-                } catch (PDUStringException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
-    }
-
-    @Test
-    public void onAcceptAlertNotificationSuccess() throws Exception {
-        AlertNotification alertNotification = mock(AlertNotification.class);
-        Exchange exchange = mock(Exchange.class);
-
-        
when(endpoint.createOnAcceptAlertNotificationExchange(alertNotification))
-                .thenReturn(exchange);
-        when(exchange.getException()).thenReturn(null);
-
-        listener.onAcceptAlertNotification(alertNotification);
-
-        
verify(endpoint).createOnAcceptAlertNotificationExchange(alertNotification);
-        verify(processor).process(exchange);
-    }
-
-    @Test
-    public void onAcceptDeliverSmException() throws Exception {
-        DeliverSm deliverSm = mock(DeliverSm.class);
-        Exchange exchange = mock(Exchange.class);
-
-        when(endpoint.createOnAcceptDeliverSmExchange(deliverSm))
-                .thenReturn(exchange);
-        when(exchange.getException()).thenReturn(null);
-
-        listener.onAcceptDeliverSm(deliverSm);
-
-        verify(endpoint).createOnAcceptDeliverSmExchange(deliverSm);
-        verify(processor).process(exchange);
-    }
-
-    @Test
-    public void onAcceptDataSmSuccess() throws Exception {
-        SMPPSession session = mock(SMPPSession.class);
-        DataSm dataSm = mock(DataSm.class);
-        Exchange exchange = mock(Exchange.class);
-        OptionalParameter[] optionalParameters = new OptionalParameter[] {};
-
-        when(endpoint.createOnAcceptDataSm(dataSm, "1"))
-                .thenReturn(exchange);
-        when(exchange.getException()).thenReturn(null);
-        when(dataSm.getOptionalParameters())
-                .thenReturn(optionalParameters);
-
-        DataSmResult result = listener.onAcceptDataSm(dataSm, session);
-
-        verify(endpoint).createOnAcceptDataSm(dataSm, "1");
-        verify(processor).process(exchange);
-
-        assertEquals("1", result.getMessageId());
-        assertSame(optionalParameters, result.getOptionalParameters());
-    }
-
-}
diff --git 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
index 6404991..2274dbd 100644
--- 
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
+++ 
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.component.smpp;
 
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
+import org.apache.camel.spi.ExchangeFactory;
 import org.jsmpp.bean.BindType;
 import org.jsmpp.bean.NumberingPlanIndicator;
 import org.jsmpp.bean.TypeOfNumber;
@@ -28,6 +30,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
@@ -39,6 +42,8 @@ import static org.mockito.Mockito.when;
  */
 public class SmppConsumerTest {
 
+    private ExchangeFactory exchangeFactory;
+    private ExtendedCamelContext context;
     private SmppConsumer consumer;
     private SmppEndpoint endpoint;
     private SmppConfiguration configuration;
@@ -51,10 +56,17 @@ public class SmppConsumerTest {
         configuration.setServiceType("CMT");
         configuration.setSystemType("cp");
         configuration.setPassword("password");
+        context = mock(ExtendedCamelContext.class);
+        exchangeFactory = mock(ExchangeFactory.class);
         endpoint = mock(SmppEndpoint.class);
         processor = mock(Processor.class);
         session = mock(SMPPSession.class);
 
+        when(endpoint.getCamelContext()).thenReturn(context);
+        when(context.adapt(ExtendedCamelContext.class)).thenReturn(context);
+        when(context.getExchangeFactory()).thenReturn(exchangeFactory);
+        
when(exchangeFactory.newExchangeFactory(any())).thenReturn(exchangeFactory);
+
         // the construction of SmppConsumer will trigger the getCamelContext 
call
         consumer = new SmppConsumer(
                 endpoint,
diff --git 
a/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpEndpoint.java
 
b/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpEndpoint.java
index 3cf11b7..69303b4 100644
--- 
a/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpEndpoint.java
+++ 
b/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpEndpoint.java
@@ -30,7 +30,6 @@ import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultPollingEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.snmp4j.CommandResponderEvent;
 import org.snmp4j.PDU;
 import org.snmp4j.mp.SnmpConstants;
 import org.snmp4j.security.SecurityLevel;
@@ -140,19 +139,6 @@ public class SnmpEndpoint extends DefaultPollingEndpoint {
     }
 
     /**
-     * creates an exchange for the given message
-     *
-     * @param  pdu   the pdu
-     * @param  event a snmp4j CommandResponderEvent
-     * @return       an exchange
-     */
-    public Exchange createExchange(PDU pdu, CommandResponderEvent event) {
-        Exchange exchange = super.createExchange();
-        exchange.setIn(new SnmpMessage(getCamelContext(), pdu, event));
-        return exchange;
-    }
-
-    /**
      * creates and configures the endpoint
      *
      * @throws     Exception if unable to setup connection
diff --git 
a/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpTrapConsumer.java
 
b/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpTrapConsumer.java
index 4639e8c..0de0f65 100644
--- 
a/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpTrapConsumer.java
+++ 
b/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpTrapConsumer.java
@@ -137,7 +137,7 @@ public class SnmpTrapConsumer extends DefaultConsumer 
implements CommandResponde
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received trap event for {} : {}", 
this.endpoint.getAddress(), pdu);
         }
-        Exchange exchange = endpoint.createExchange(pdu, event);
+        Exchange exchange = createExchange(pdu, event);
         try {
             getProcessor().process(exchange);
         } catch (Exception e) {
@@ -146,5 +146,20 @@ public class SnmpTrapConsumer extends DefaultConsumer 
implements CommandResponde
         if (exchange.getException() != null) {
             getExceptionHandler().handleException(exchange.getException());
         }
+        releaseExchange(exchange, false);
     }
+
+    /**
+     * creates an exchange for the given message
+     *
+     * @param  pdu   the pdu
+     * @param  event a snmp4j CommandResponderEvent
+     * @return       an exchange
+     */
+    public Exchange createExchange(PDU pdu, CommandResponderEvent event) {
+        Exchange exchange = createExchange(false);
+        exchange.setIn(new SnmpMessage(getEndpoint().getCamelContext(), pdu, 
event));
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java
 
b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java
index fbff371..c9340e7 100644
--- 
a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java
+++ 
b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.soroushbot.component;
 
-import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -65,15 +64,6 @@ public abstract class SoroushBotAbstractConsumer extends 
DefaultConsumer impleme
         run();
     }
 
-    protected final void 
handleExceptionThrownWhileCreatingOrProcessingExchange(
-            Exchange exchange, SoroushMessage soroushMessage, Exception ex) {
-        //set originalMessage property to the created soroushMessage to let  
Error Handler access the message
-        exchange.setProperty("OriginalMessage", soroushMessage);
-        //use this instead of handleException() to manually set the exchange.
-        getExceptionHandler().handleException("message can not be processed 
due to :" + ex.getMessage(), exchange, ex);
-
-    }
-
     /**
      * handle how processing of the exchange should be started
      *
@@ -142,25 +132,21 @@ public abstract class SoroushBotAbstractConsumer extends 
DefaultConsumer impleme
 
             @Override
             public void onEvent(EventSource eventSource, String id, String 
type, String data) {
-                Exchange exchange = endpoint.createExchange();
+                Exchange exchange = createExchange(false);
                 try {
                     SoroushMessage soroushMessage = 
objectMapper.readValue(data, SoroushMessage.class);
-                    try {
-                        exchange.getIn().setBody(soroushMessage);
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("event data is: " + data);
-                        }
-                        // if autoDownload is true, download the resource if 
provided in the message
-                        if (endpoint.isAutoDownload()) {
-                            endpoint.handleDownloadFiles(soroushMessage);
-                        }
-                        //let each subclass decide how to start processing of 
each exchange
-                        sendExchange(exchange);
-                    } catch (Exception ex) {
-                        
handleExceptionThrownWhileCreatingOrProcessingExchange(exchange, 
soroushMessage, ex);
+                    exchange.getIn().setBody(soroushMessage);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("event data is: " + data);
+                    }
+                    // if autoDownload is true, download the resource if 
provided in the message
+                    if (endpoint.isAutoDownload()) {
+                        endpoint.handleDownloadFiles(soroushMessage);
                     }
-                } catch (IOException ex) {
-                    LOG.error("can not parse data due to following error", ex);
+                    //let each subclass decide how to start processing of each 
exchange
+                    sendExchange(exchange);
+                } catch (Exception ex) {
+                    getExceptionHandler().handleException(ex);
                 }
             }
 
diff --git 
a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotMultiThreadConsumer.java
 
b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotMultiThreadConsumer.java
index 96cecac..4badfee 100644
--- 
a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotMultiThreadConsumer.java
+++ 
b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotMultiThreadConsumer.java
@@ -57,6 +57,7 @@ public class SoroushBotMultiThreadConsumer extends 
SoroushBotAbstractConsumer {
                     getExceptionHandler().handleException("Error processing 
exchange",
                             exchange, exchange.getException());
                 }
+                releaseExchange(exchange, false);
             });
         } catch (IllegalStateException ex) {
             throw new CongestionException(ex, 
exchange.getIn().getBody(SoroushMessage.class));
diff --git 
a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSingleThreadConsumer.java
 
b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSingleThreadConsumer.java
index 5926b3b..1ed907c 100644
--- 
a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSingleThreadConsumer.java
+++ 
b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSingleThreadConsumer.java
@@ -42,5 +42,6 @@ public class SoroushBotSingleThreadConsumer extends 
SoroushBotAbstractConsumer {
             getExceptionHandler().handleException("Error processing exchange",
                     exchange, exchange.getException());
         }
+        releaseExchange(exchange, false);
     }
 }
diff --git 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
index dca8f97..72834d2 100644
--- 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
+++ 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
@@ -66,28 +66,18 @@ public class SplunkConsumer extends 
ScheduledBatchPollingConsumer {
 
                     @Override
                     public void process(SplunkEvent splunkEvent) {
-                        final Exchange exchange = 
getEndpoint().createExchange();
+                        final Exchange exchange = createExchange(true);
                         Message message = exchange.getIn();
                         message.setBody(splunkEvent);
 
-                        try {
-                            LOG.trace("Processing exchange [{}]...", exchange);
-                            getAsyncProcessor().process(exchange, new 
AsyncCallback() {
-                                @Override
-                                public void done(boolean doneSync) {
-                                    LOG.trace("Done processing exchange 
[{}]...", exchange);
-                                }
-                            });
-                        } catch (Exception e) {
-                            exchange.setException(e);
-                        }
-                        if (exchange.getException() != null) {
-                            getExceptionHandler().handleException("Error 
processing exchange", exchange,
-                                    exchange.getException());
-                        }
-
+                        LOG.trace("Processing exchange [{}]...", exchange);
+                        getAsyncProcessor().process(exchange, new 
AsyncCallback() {
+                            @Override
+                            public void done(boolean doneSync) {
+                                LOG.trace("Done processing exchange [{}]...", 
exchange);
+                            }
+                        });
                     }
-
                 });
                 // Return 0: no exchanges returned by poll, as exchanges have 
been returned asynchronously
                 return 0;
@@ -107,7 +97,7 @@ public class SplunkConsumer extends 
ScheduledBatchPollingConsumer {
         LOG.trace("Received {} messages in this poll", splunkEvents.size());
         Queue<Exchange> answer = new LinkedList<>();
         for (SplunkEvent splunkEvent : splunkEvents) {
-            Exchange exchange = getEndpoint().createExchange();
+            Exchange exchange = createExchange(true);
             Message message = exchange.getIn();
             message.setBody(splunkEvent);
             answer.add(exchange);
diff --git 
a/components/camel-spring/src/main/java/org/apache/camel/component/cron/SpringCronConsumer.java
 
b/components/camel-spring/src/main/java/org/apache/camel/component/cron/SpringCronConsumer.java
index 217d9e7..e60ad73 100644
--- 
a/components/camel-spring/src/main/java/org/apache/camel/component/cron/SpringCronConsumer.java
+++ 
b/components/camel-spring/src/main/java/org/apache/camel/component/cron/SpringCronConsumer.java
@@ -29,7 +29,7 @@ public class SpringCronConsumer extends ScheduledPollConsumer 
{
 
     @Override
     protected int poll() throws Exception {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         getProcessor().process(exchange);
         return 1;
     }
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index 036e9b7..145b7f1 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -231,7 +231,7 @@ public class SqlConsumer extends 
ScheduledBatchPollingConsumer {
     }
 
     protected Exchange createExchange(Object data) {
-        final Exchange exchange = 
getEndpoint().createExchange(ExchangePattern.InOnly);
+        final Exchange exchange = createExchange(false);
         Message msg = exchange.getIn();
         if (getEndpoint().getOutputHeader() != null) {
             msg.setHeader(getEndpoint().getOutputHeader(), data);
@@ -274,10 +274,12 @@ public class SqlConsumer extends 
ScheduledBatchPollingConsumer {
             if (getEndpoint().isTransacted() && exchange.isFailed()) {
                 // break out as we are transacted and should rollback
                 Exception cause = exchange.getException();
+                // must release exchange
+                releaseExchange(exchange, false);
                 if (cause != null) {
                     throw cause;
                 } else {
-                    throw new RollbackExchangeException("Rollback transaction 
due error processing exchange", exchange);
+                    throw new RollbackExchangeException("Rollback transaction 
due error processing exchange", null);
                 }
             }
 
@@ -306,6 +308,8 @@ public class SqlConsumer extends 
ScheduledBatchPollingConsumer {
                 } else {
                     handleException("Error executing onConsume/onConsumeFailed 
query " + sql, e);
                 }
+            } finally {
+                releaseExchange(exchange, false);
             }
         }
 
diff --git 
a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
 
b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
index 8f7c229..a4b29f1 100644
--- 
a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
+++ 
b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
@@ -56,22 +56,21 @@ public class SshConsumer extends ScheduledPollConsumer {
         }
 
         String command = endpoint.getPollCommand();
-        Exchange exchange = endpoint.createExchange();
-
-        String knownHostResource = endpoint.getKnownHostsResource();
-        if (knownHostResource != null) {
-            client.setServerKeyVerifier(new ResourceBasedSSHKeyVerifier(
-                    exchange.getContext(), knownHostResource,
-                    endpoint.isFailOnUnknownHost()));
-        }
+        Exchange exchange = createExchange(false);
+        try {
+            String knownHostResource = endpoint.getKnownHostsResource();
+            if (knownHostResource != null) {
+                client.setServerKeyVerifier(new ResourceBasedSSHKeyVerifier(
+                        exchange.getContext(), knownHostResource,
+                        endpoint.isFailOnUnknownHost()));
+            }
 
-        SshResult result = 
SshHelper.sendExecCommand(exchange.getIn().getHeaders(), command, endpoint, 
client);
+            SshResult result = 
SshHelper.sendExecCommand(exchange.getIn().getHeaders(), command, endpoint, 
client);
 
-        exchange.getIn().setBody(result.getStdout());
-        exchange.getIn().setHeader(SshResult.EXIT_VALUE, 
result.getExitValue());
-        exchange.getIn().setHeader(SshResult.STDERR, result.getStderr());
+            exchange.getIn().setBody(result.getStdout());
+            exchange.getIn().setHeader(SshResult.EXIT_VALUE, 
result.getExitValue());
+            exchange.getIn().setHeader(SshResult.STDERR, result.getStderr());
 
-        try {
             // send message to next processor in the route
             getProcessor().process(exchange);
             return 1; // number of messages polled
@@ -80,6 +79,7 @@ public class SshConsumer extends ScheduledPollConsumer {
             if (exchange.getException() != null) {
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
             }
+            releaseExchange(exchange, false);
         }
     }
 }
diff --git 
a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
index e1143d6..d6f122d 100644
--- 
a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
+++ 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
@@ -133,13 +133,15 @@ public class StompEndpoint extends DefaultEndpoint 
implements AsyncEndpoint, Hea
                     @Override
                     public void onSuccess(StompFrame value) {
                         if (!consumers.isEmpty()) {
-                            Exchange exchange = createExchange();
-                            exchange.getIn().setBody(value.content());
-                            
exchange.getIn().setHeaders(value.headerMap().entrySet().stream()
-                                    .collect(Collectors.toMap(e -> 
e.getKey().toString(), Map.Entry::getValue)));
                             for (StompConsumer consumer : consumers) {
+                                Exchange exchange = 
consumer.createExchange(false);
+                                exchange.getIn().setBody(value.content());
+                                
exchange.getIn().setHeaders(value.headerMap().entrySet().stream()
+                                        .collect(Collectors.toMap(e -> 
e.getKey().toString(), Map.Entry::getValue)));
                                 consumer.processExchange(exchange);
+                                consumer.releaseExchange(exchange, false);
                             }
+
                         }
                     }
                 });
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
index 8416abf..a88886a 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
@@ -237,7 +237,7 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
                 List<String> copy = new ArrayList<>(lines);
                 Object body = endpoint.getGroupStrategy().groupLines(copy);
                 // remember to inc index when we create an exchange
-                Exchange exchange = endpoint.createExchange(body, index++, 
last);
+                Exchange exchange = createExchange(body, index++, last);
 
                 // clear lines
                 lines.clear();
@@ -247,7 +247,7 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
         } else if (line != null) {
             // single line
             // remember to inc index when we create an exchange
-            Exchange exchange = endpoint.createExchange(line, index++, last);
+            Exchange exchange = createExchange(line, index++, last);
             getProcessor().process(exchange);
         }
 
@@ -333,4 +333,12 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
         }
     }
 
+    protected Exchange createExchange(Object body, long index, boolean last) {
+        Exchange exchange = createExchange(true);
+        exchange.getIn().setBody(body);
+        exchange.getIn().setHeader(StreamConstants.STREAM_INDEX, index);
+        exchange.getIn().setHeader(StreamConstants.STREAM_COMPLETE, last);
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
index cf1f7c3..8a3a675 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
@@ -21,7 +21,6 @@ import java.nio.charset.Charset;
 import org.apache.camel.Category;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
@@ -96,14 +95,6 @@ public class StreamEndpoint extends DefaultEndpoint {
         return new StreamProducer(this, getEndpointUri());
     }
 
-    protected Exchange createExchange(Object body, long index, boolean last) {
-        Exchange exchange = createExchange();
-        exchange.getIn().setBody(body);
-        exchange.getIn().setHeader(StreamConstants.STREAM_INDEX, index);
-        exchange.getIn().setHeader(StreamConstants.STREAM_COMPLETE, last);
-        return exchange;
-    }
-
     // Properties
     //-------------------------------------------------------------------------
 

Reply via email to