This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6824daa4316007724f6a139a120b4007278b2ee6 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Feb 22 10:45:18 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../sip/listener/SipSubscriptionListener.java | 7 +- .../camel/component/slack/SlackConsumer.java | 23 +++- .../camel/component/slack/SlackEndpoint.java | 26 ----- .../smpp/MessageReceiverListenerImpl.java | 24 ++++- .../apache/camel/component/smpp/SmppConsumer.java | 3 +- .../apache/camel/component/smpp/SmppEndpoint.java | 28 ----- .../smpp/MessageReceiverListenerImplTest.java | 117 --------------------- .../camel/component/smpp/SmppConsumerTest.java | 12 +++ .../apache/camel/component/snmp/SnmpEndpoint.java | 14 --- .../camel/component/snmp/SnmpTrapConsumer.java | 17 ++- .../component/SoroushBotAbstractConsumer.java | 38 +++---- .../component/SoroushBotMultiThreadConsumer.java | 1 + .../component/SoroushBotSingleThreadConsumer.java | 1 + .../camel/component/splunk/SplunkConsumer.java | 28 ++--- .../camel/component/cron/SpringCronConsumer.java | 2 +- .../apache/camel/component/sql/SqlConsumer.java | 8 +- .../apache/camel/component/ssh/SshConsumer.java | 26 ++--- .../camel/component/stomp/StompEndpoint.java | 10 +- .../camel/component/stream/StreamConsumer.java | 12 ++- .../camel/component/stream/StreamEndpoint.java | 9 -- 20 files changed, 137 insertions(+), 269 deletions(-) diff --git a/components/camel-sip/src/main/java/org/apache/camel/component/sip/listener/SipSubscriptionListener.java b/components/camel-sip/src/main/java/org/apache/camel/component/sip/listener/SipSubscriptionListener.java index 7951940..6738ed8 100644 --- a/components/camel-sip/src/main/java/org/apache/camel/component/sip/listener/SipSubscriptionListener.java +++ b/components/camel-sip/src/main/java/org/apache/camel/component/sip/listener/SipSubscriptionListener.java @@ -48,14 +48,15 @@ public class SipSubscriptionListener implements SipListener { this.setSipSubscriber(sipSubscriber); } - private void dispatchExchange(Object response) throws CamelException { + private void dispatchExchange(Object response) { LOG.debug("Consumer Dispatching the received notification along the route"); - Exchange exchange = sipSubscriber.getEndpoint().createExchange(ExchangePattern.InOnly); + Exchange exchange = sipSubscriber.createExchange(true); + exchange.setPattern(ExchangePattern.InOnly); exchange.getIn().setBody(response); try { sipSubscriber.getProcessor().process(exchange); } catch (Exception e) { - throw new CamelException("Error in consumer while dispatching exchange", e); + sipSubscriber.getExceptionHandler().handleException("Error in consumer while dispatching exchange", e); } } diff --git a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java index 396e66e..8ea304c 100644 --- a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java +++ b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java @@ -25,8 +25,11 @@ import java.util.List; import java.util.Queue; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.slack.helper.SlackMessage; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -109,7 +112,7 @@ public class SlackConsumer extends ScheduledBatchPollingConsumer { timestamp = (String) singleMess.get("ts"); } i++; - Exchange exchange = slackEndpoint.createExchange(singleMess); + Exchange exchange = createExchange(singleMess); answer.add(exchange); } } @@ -185,4 +188,22 @@ public class SlackConsumer extends ScheduledBatchPollingConsumer { } } + public Exchange createExchange(JsonObject object) { + Exchange exchange = createExchange(true); + SlackMessage slackMessage = new SlackMessage(); + String text = object.getString(SlackConstants.SLACK_TEXT_FIELD); + String user = object.getString("user"); + slackMessage.setText(text); + slackMessage.setUser(user); + if (ObjectHelper.isNotEmpty(object.get("icons"))) { + JsonObject icons = object.getMap("icons"); + if (ObjectHelper.isNotEmpty(icons.get("emoji"))) { + slackMessage.setIconEmoji(icons.getString("emoji")); + } + } + Message message = exchange.getIn(); + message.setBody(slackMessage); + return exchange; + } + } diff --git a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java index c5bf928..9893b21 100644 --- a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java +++ b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java @@ -18,20 +18,15 @@ package org.apache.camel.component.slack; import org.apache.camel.Category; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.component.slack.helper.SlackMessage; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.support.ScheduledPollEndpoint; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.json.JsonObject; /** * Send and receive messages to/from Slack. @@ -183,25 +178,4 @@ public class SlackEndpoint extends ScheduledPollEndpoint { this.serverUrl = serverUrl; } - public Exchange createExchange(JsonObject object) { - return createExchange(getExchangePattern(), object); - } - - public Exchange createExchange(ExchangePattern pattern, JsonObject object) { - Exchange exchange = super.createExchange(pattern); - SlackMessage slackMessage = new SlackMessage(); - String text = object.getString(SlackConstants.SLACK_TEXT_FIELD); - String user = object.getString("user"); - slackMessage.setText(text); - slackMessage.setUser(user); - if (ObjectHelper.isNotEmpty(object.get("icons"))) { - JsonObject icons = object.getMap("icons"); - if (ObjectHelper.isNotEmpty(icons.get("emoji"))) { - slackMessage.setIconEmoji(icons.getString("emoji")); - } - } - Message message = exchange.getIn(); - message.setBody(slackMessage); - return exchange; - } } diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/MessageReceiverListenerImpl.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/MessageReceiverListenerImpl.java index c076335..9eb4e2e 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/MessageReceiverListenerImpl.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/MessageReceiverListenerImpl.java @@ -17,6 +17,7 @@ package org.apache.camel.component.smpp; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.spi.ExceptionHandler; import org.jsmpp.bean.AlertNotification; @@ -37,11 +38,14 @@ public class MessageReceiverListenerImpl implements MessageReceiverListener { private static final Logger LOG = LoggerFactory.getLogger(MessageReceiverListenerImpl.class); private MessageIDGenerator messageIDGenerator = new RandomMessageIDGenerator(); + private SmppConsumer consumer; private SmppEndpoint endpoint; private Processor processor; private ExceptionHandler exceptionHandler; - public MessageReceiverListenerImpl(SmppEndpoint endpoint, Processor processor, ExceptionHandler exceptionHandler) { + public MessageReceiverListenerImpl(SmppConsumer consumer, SmppEndpoint endpoint, Processor processor, + ExceptionHandler exceptionHandler) { + this.consumer = consumer; this.endpoint = endpoint; this.processor = processor; this.exceptionHandler = exceptionHandler; @@ -51,7 +55,7 @@ public class MessageReceiverListenerImpl implements MessageReceiverListener { public void onAcceptAlertNotification(AlertNotification alertNotification) { LOG.debug("Received an alertNotification {}", alertNotification); - Exchange exchange = endpoint.createOnAcceptAlertNotificationExchange(alertNotification); + Exchange exchange = createOnAcceptAlertNotificationExchange(alertNotification); try { processor.process(exchange); } catch (Exception e) { @@ -62,6 +66,7 @@ public class MessageReceiverListenerImpl implements MessageReceiverListener { exceptionHandler.handleException("Cannot process exchange. This exception will be ignored.", exchange, exchange.getException()); } + consumer.releaseExchange(exchange, false); } @Override @@ -117,4 +122,19 @@ public class MessageReceiverListenerImpl implements MessageReceiverListener { public void setMessageIDGenerator(MessageIDGenerator messageIDGenerator) { this.messageIDGenerator = messageIDGenerator; } + + /** + * Create a new exchange for communicating with this endpoint from a SMSC with the specified {@link ExchangePattern} + * such as whether its going to be an {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} exchange + * + * @param alertNotification the received message from the SMSC + * @return a new exchange + */ + public Exchange createOnAcceptAlertNotificationExchange(AlertNotification alertNotification) { + Exchange exchange = consumer.createExchange(false); + exchange.setProperty(Exchange.BINDING, endpoint.getBinding()); + exchange.setIn(endpoint.getBinding().createSmppMessage(endpoint.getCamelContext(), alertNotification)); + return exchange; + } + } diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java index dee6ec3..c04c7f1 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java @@ -72,7 +72,8 @@ public class SmppConsumer extends DefaultConsumer { } } }; - this.messageReceiverListener = new MessageReceiverListenerImpl(getEndpoint(), getProcessor(), getExceptionHandler()); + this.messageReceiverListener + = new MessageReceiverListenerImpl(this, getEndpoint(), getProcessor(), getExceptionHandler()); } @Override diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppEndpoint.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppEndpoint.java index f9751e0..6f3b6d2 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppEndpoint.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppEndpoint.java @@ -26,7 +26,6 @@ import org.apache.camel.Producer; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.support.DefaultEndpoint; -import org.jsmpp.bean.AlertNotification; import org.jsmpp.bean.DataSm; import org.jsmpp.bean.DeliverSm; @@ -71,33 +70,6 @@ public class SmppEndpoint extends DefaultEndpoint { /** * Create a new exchange for communicating with this endpoint from a SMSC * - * @param alertNotification the received message from the SMSC - * @return a new exchange - */ - public Exchange createOnAcceptAlertNotificationExchange(AlertNotification alertNotification) { - return createOnAcceptAlertNotificationExchange(getExchangePattern(), alertNotification); - } - - /** - * Create a new exchange for communicating with this endpoint from a SMSC with the specified {@link ExchangePattern} - * such as whether its going to be an {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} exchange - * - * @param exchangePattern the message exchange pattern for the exchange - * @param alertNotification the received message from the SMSC - * @return a new exchange - */ - public Exchange createOnAcceptAlertNotificationExchange( - ExchangePattern exchangePattern, - AlertNotification alertNotification) { - Exchange exchange = createExchange(exchangePattern); - exchange.setProperty(Exchange.BINDING, getBinding()); - exchange.setIn(getBinding().createSmppMessage(getCamelContext(), alertNotification)); - return exchange; - } - - /** - * Create a new exchange for communicating with this endpoint from a SMSC - * * @param deliverSm the received message from the SMSC * @return a new exchange */ diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/MessageReceiverListenerImplTest.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/MessageReceiverListenerImplTest.java deleted file mode 100644 index a34f6c8..0000000 --- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/MessageReceiverListenerImplTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.smpp; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.spi.ExceptionHandler; -import org.jsmpp.PDUStringException; -import org.jsmpp.bean.AlertNotification; -import org.jsmpp.bean.DataSm; -import org.jsmpp.bean.DeliverSm; -import org.jsmpp.bean.OptionalParameter; -import org.jsmpp.session.DataSmResult; -import org.jsmpp.session.SMPPSession; -import org.jsmpp.util.MessageIDGenerator; -import org.jsmpp.util.MessageId; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class MessageReceiverListenerImplTest { - - private MessageReceiverListenerImpl listener; - private SmppEndpoint endpoint; - private Processor processor; - private ExceptionHandler exceptionHandler; - - @BeforeEach - public void setUp() { - endpoint = mock(SmppEndpoint.class); - processor = mock(Processor.class); - exceptionHandler = mock(ExceptionHandler.class); - - listener = new MessageReceiverListenerImpl(endpoint, processor, exceptionHandler); - listener.setMessageIDGenerator(new MessageIDGenerator() { - public MessageId newMessageId() { - try { - return new MessageId("1"); - } catch (PDUStringException e) { - throw new RuntimeException(e); - } - } - }); - } - - @Test - public void onAcceptAlertNotificationSuccess() throws Exception { - AlertNotification alertNotification = mock(AlertNotification.class); - Exchange exchange = mock(Exchange.class); - - when(endpoint.createOnAcceptAlertNotificationExchange(alertNotification)) - .thenReturn(exchange); - when(exchange.getException()).thenReturn(null); - - listener.onAcceptAlertNotification(alertNotification); - - verify(endpoint).createOnAcceptAlertNotificationExchange(alertNotification); - verify(processor).process(exchange); - } - - @Test - public void onAcceptDeliverSmException() throws Exception { - DeliverSm deliverSm = mock(DeliverSm.class); - Exchange exchange = mock(Exchange.class); - - when(endpoint.createOnAcceptDeliverSmExchange(deliverSm)) - .thenReturn(exchange); - when(exchange.getException()).thenReturn(null); - - listener.onAcceptDeliverSm(deliverSm); - - verify(endpoint).createOnAcceptDeliverSmExchange(deliverSm); - verify(processor).process(exchange); - } - - @Test - public void onAcceptDataSmSuccess() throws Exception { - SMPPSession session = mock(SMPPSession.class); - DataSm dataSm = mock(DataSm.class); - Exchange exchange = mock(Exchange.class); - OptionalParameter[] optionalParameters = new OptionalParameter[] {}; - - when(endpoint.createOnAcceptDataSm(dataSm, "1")) - .thenReturn(exchange); - when(exchange.getException()).thenReturn(null); - when(dataSm.getOptionalParameters()) - .thenReturn(optionalParameters); - - DataSmResult result = listener.onAcceptDataSm(dataSm, session); - - verify(endpoint).createOnAcceptDataSm(dataSm, "1"); - verify(processor).process(exchange); - - assertEquals("1", result.getMessageId()); - assertSame(optionalParameters, result.getOptionalParameters()); - } - -} diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java index 6404991..2274dbd 100644 --- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java +++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java @@ -16,7 +16,9 @@ */ package org.apache.camel.component.smpp; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; +import org.apache.camel.spi.ExchangeFactory; import org.jsmpp.bean.BindType; import org.jsmpp.bean.NumberingPlanIndicator; import org.jsmpp.bean.TypeOfNumber; @@ -28,6 +30,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -39,6 +42,8 @@ import static org.mockito.Mockito.when; */ public class SmppConsumerTest { + private ExchangeFactory exchangeFactory; + private ExtendedCamelContext context; private SmppConsumer consumer; private SmppEndpoint endpoint; private SmppConfiguration configuration; @@ -51,10 +56,17 @@ public class SmppConsumerTest { configuration.setServiceType("CMT"); configuration.setSystemType("cp"); configuration.setPassword("password"); + context = mock(ExtendedCamelContext.class); + exchangeFactory = mock(ExchangeFactory.class); endpoint = mock(SmppEndpoint.class); processor = mock(Processor.class); session = mock(SMPPSession.class); + when(endpoint.getCamelContext()).thenReturn(context); + when(context.adapt(ExtendedCamelContext.class)).thenReturn(context); + when(context.getExchangeFactory()).thenReturn(exchangeFactory); + when(exchangeFactory.newExchangeFactory(any())).thenReturn(exchangeFactory); + // the construction of SmppConsumer will trigger the getCamelContext call consumer = new SmppConsumer( endpoint, diff --git a/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpEndpoint.java b/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpEndpoint.java index 3cf11b7..69303b4 100644 --- a/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpEndpoint.java +++ b/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpEndpoint.java @@ -30,7 +30,6 @@ import org.apache.camel.spi.UriPath; import org.apache.camel.support.DefaultPollingEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.snmp4j.CommandResponderEvent; import org.snmp4j.PDU; import org.snmp4j.mp.SnmpConstants; import org.snmp4j.security.SecurityLevel; @@ -140,19 +139,6 @@ public class SnmpEndpoint extends DefaultPollingEndpoint { } /** - * creates an exchange for the given message - * - * @param pdu the pdu - * @param event a snmp4j CommandResponderEvent - * @return an exchange - */ - public Exchange createExchange(PDU pdu, CommandResponderEvent event) { - Exchange exchange = super.createExchange(); - exchange.setIn(new SnmpMessage(getCamelContext(), pdu, event)); - return exchange; - } - - /** * creates and configures the endpoint * * @throws Exception if unable to setup connection diff --git a/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpTrapConsumer.java b/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpTrapConsumer.java index 4639e8c..0de0f65 100644 --- a/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpTrapConsumer.java +++ b/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpTrapConsumer.java @@ -137,7 +137,7 @@ public class SnmpTrapConsumer extends DefaultConsumer implements CommandResponde if (LOG.isDebugEnabled()) { LOG.debug("Received trap event for {} : {}", this.endpoint.getAddress(), pdu); } - Exchange exchange = endpoint.createExchange(pdu, event); + Exchange exchange = createExchange(pdu, event); try { getProcessor().process(exchange); } catch (Exception e) { @@ -146,5 +146,20 @@ public class SnmpTrapConsumer extends DefaultConsumer implements CommandResponde if (exchange.getException() != null) { getExceptionHandler().handleException(exchange.getException()); } + releaseExchange(exchange, false); } + + /** + * creates an exchange for the given message + * + * @param pdu the pdu + * @param event a snmp4j CommandResponderEvent + * @return an exchange + */ + public Exchange createExchange(PDU pdu, CommandResponderEvent event) { + Exchange exchange = createExchange(false); + exchange.setIn(new SnmpMessage(getEndpoint().getCamelContext(), pdu, event)); + return exchange; + } + } diff --git a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java index fbff371..c9340e7 100644 --- a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java +++ b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.soroushbot.component; -import java.io.IOException; import java.util.concurrent.TimeUnit; import com.fasterxml.jackson.databind.ObjectMapper; @@ -65,15 +64,6 @@ public abstract class SoroushBotAbstractConsumer extends DefaultConsumer impleme run(); } - protected final void handleExceptionThrownWhileCreatingOrProcessingExchange( - Exchange exchange, SoroushMessage soroushMessage, Exception ex) { - //set originalMessage property to the created soroushMessage to let Error Handler access the message - exchange.setProperty("OriginalMessage", soroushMessage); - //use this instead of handleException() to manually set the exchange. - getExceptionHandler().handleException("message can not be processed due to :" + ex.getMessage(), exchange, ex); - - } - /** * handle how processing of the exchange should be started * @@ -142,25 +132,21 @@ public abstract class SoroushBotAbstractConsumer extends DefaultConsumer impleme @Override public void onEvent(EventSource eventSource, String id, String type, String data) { - Exchange exchange = endpoint.createExchange(); + Exchange exchange = createExchange(false); try { SoroushMessage soroushMessage = objectMapper.readValue(data, SoroushMessage.class); - try { - exchange.getIn().setBody(soroushMessage); - if (LOG.isDebugEnabled()) { - LOG.debug("event data is: " + data); - } - // if autoDownload is true, download the resource if provided in the message - if (endpoint.isAutoDownload()) { - endpoint.handleDownloadFiles(soroushMessage); - } - //let each subclass decide how to start processing of each exchange - sendExchange(exchange); - } catch (Exception ex) { - handleExceptionThrownWhileCreatingOrProcessingExchange(exchange, soroushMessage, ex); + exchange.getIn().setBody(soroushMessage); + if (LOG.isDebugEnabled()) { + LOG.debug("event data is: " + data); + } + // if autoDownload is true, download the resource if provided in the message + if (endpoint.isAutoDownload()) { + endpoint.handleDownloadFiles(soroushMessage); } - } catch (IOException ex) { - LOG.error("can not parse data due to following error", ex); + //let each subclass decide how to start processing of each exchange + sendExchange(exchange); + } catch (Exception ex) { + getExceptionHandler().handleException(ex); } } diff --git a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotMultiThreadConsumer.java b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotMultiThreadConsumer.java index 96cecac..4badfee 100644 --- a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotMultiThreadConsumer.java +++ b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotMultiThreadConsumer.java @@ -57,6 +57,7 @@ public class SoroushBotMultiThreadConsumer extends SoroushBotAbstractConsumer { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + releaseExchange(exchange, false); }); } catch (IllegalStateException ex) { throw new CongestionException(ex, exchange.getIn().getBody(SoroushMessage.class)); diff --git a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSingleThreadConsumer.java b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSingleThreadConsumer.java index 5926b3b..1ed907c 100644 --- a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSingleThreadConsumer.java +++ b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSingleThreadConsumer.java @@ -42,5 +42,6 @@ public class SoroushBotSingleThreadConsumer extends SoroushBotAbstractConsumer { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + releaseExchange(exchange, false); } } diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java index dca8f97..72834d2 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java @@ -66,28 +66,18 @@ public class SplunkConsumer extends ScheduledBatchPollingConsumer { @Override public void process(SplunkEvent splunkEvent) { - final Exchange exchange = getEndpoint().createExchange(); + final Exchange exchange = createExchange(true); Message message = exchange.getIn(); message.setBody(splunkEvent); - try { - LOG.trace("Processing exchange [{}]...", exchange); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - LOG.trace("Done processing exchange [{}]...", exchange); - } - }); - } catch (Exception e) { - exchange.setException(e); - } - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, - exchange.getException()); - } - + LOG.trace("Processing exchange [{}]...", exchange); + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + LOG.trace("Done processing exchange [{}]...", exchange); + } + }); } - }); // Return 0: no exchanges returned by poll, as exchanges have been returned asynchronously return 0; @@ -107,7 +97,7 @@ public class SplunkConsumer extends ScheduledBatchPollingConsumer { LOG.trace("Received {} messages in this poll", splunkEvents.size()); Queue<Exchange> answer = new LinkedList<>(); for (SplunkEvent splunkEvent : splunkEvents) { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); Message message = exchange.getIn(); message.setBody(splunkEvent); answer.add(exchange); diff --git a/components/camel-spring/src/main/java/org/apache/camel/component/cron/SpringCronConsumer.java b/components/camel-spring/src/main/java/org/apache/camel/component/cron/SpringCronConsumer.java index 217d9e7..e60ad73 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/component/cron/SpringCronConsumer.java +++ b/components/camel-spring/src/main/java/org/apache/camel/component/cron/SpringCronConsumer.java @@ -29,7 +29,7 @@ public class SpringCronConsumer extends ScheduledPollConsumer { @Override protected int poll() throws Exception { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); getProcessor().process(exchange); return 1; } diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java index 036e9b7..145b7f1 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java @@ -231,7 +231,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { } protected Exchange createExchange(Object data) { - final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly); + final Exchange exchange = createExchange(false); Message msg = exchange.getIn(); if (getEndpoint().getOutputHeader() != null) { msg.setHeader(getEndpoint().getOutputHeader(), data); @@ -274,10 +274,12 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { if (getEndpoint().isTransacted() && exchange.isFailed()) { // break out as we are transacted and should rollback Exception cause = exchange.getException(); + // must release exchange + releaseExchange(exchange, false); if (cause != null) { throw cause; } else { - throw new RollbackExchangeException("Rollback transaction due error processing exchange", exchange); + throw new RollbackExchangeException("Rollback transaction due error processing exchange", null); } } @@ -306,6 +308,8 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { } else { handleException("Error executing onConsume/onConsumeFailed query " + sql, e); } + } finally { + releaseExchange(exchange, false); } } diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java index 8f7c229..a4b29f1 100644 --- a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java +++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java @@ -56,22 +56,21 @@ public class SshConsumer extends ScheduledPollConsumer { } String command = endpoint.getPollCommand(); - Exchange exchange = endpoint.createExchange(); - - String knownHostResource = endpoint.getKnownHostsResource(); - if (knownHostResource != null) { - client.setServerKeyVerifier(new ResourceBasedSSHKeyVerifier( - exchange.getContext(), knownHostResource, - endpoint.isFailOnUnknownHost())); - } + Exchange exchange = createExchange(false); + try { + String knownHostResource = endpoint.getKnownHostsResource(); + if (knownHostResource != null) { + client.setServerKeyVerifier(new ResourceBasedSSHKeyVerifier( + exchange.getContext(), knownHostResource, + endpoint.isFailOnUnknownHost())); + } - SshResult result = SshHelper.sendExecCommand(exchange.getIn().getHeaders(), command, endpoint, client); + SshResult result = SshHelper.sendExecCommand(exchange.getIn().getHeaders(), command, endpoint, client); - exchange.getIn().setBody(result.getStdout()); - exchange.getIn().setHeader(SshResult.EXIT_VALUE, result.getExitValue()); - exchange.getIn().setHeader(SshResult.STDERR, result.getStderr()); + exchange.getIn().setBody(result.getStdout()); + exchange.getIn().setHeader(SshResult.EXIT_VALUE, result.getExitValue()); + exchange.getIn().setHeader(SshResult.STDERR, result.getStderr()); - try { // send message to next processor in the route getProcessor().process(exchange); return 1; // number of messages polled @@ -80,6 +79,7 @@ public class SshConsumer extends ScheduledPollConsumer { if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + releaseExchange(exchange, false); } } } diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java index e1143d6..d6f122d 100644 --- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java @@ -133,13 +133,15 @@ public class StompEndpoint extends DefaultEndpoint implements AsyncEndpoint, Hea @Override public void onSuccess(StompFrame value) { if (!consumers.isEmpty()) { - Exchange exchange = createExchange(); - exchange.getIn().setBody(value.content()); - exchange.getIn().setHeaders(value.headerMap().entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue))); for (StompConsumer consumer : consumers) { + Exchange exchange = consumer.createExchange(false); + exchange.getIn().setBody(value.content()); + exchange.getIn().setHeaders(value.headerMap().entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue))); consumer.processExchange(exchange); + consumer.releaseExchange(exchange, false); } + } } }); diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java index 8416abf..a88886a 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java @@ -237,7 +237,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { List<String> copy = new ArrayList<>(lines); Object body = endpoint.getGroupStrategy().groupLines(copy); // remember to inc index when we create an exchange - Exchange exchange = endpoint.createExchange(body, index++, last); + Exchange exchange = createExchange(body, index++, last); // clear lines lines.clear(); @@ -247,7 +247,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { } else if (line != null) { // single line // remember to inc index when we create an exchange - Exchange exchange = endpoint.createExchange(line, index++, last); + Exchange exchange = createExchange(line, index++, last); getProcessor().process(exchange); } @@ -333,4 +333,12 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { } } + protected Exchange createExchange(Object body, long index, boolean last) { + Exchange exchange = createExchange(true); + exchange.getIn().setBody(body); + exchange.getIn().setHeader(StreamConstants.STREAM_INDEX, index); + exchange.getIn().setHeader(StreamConstants.STREAM_COMPLETE, last); + return exchange; + } + } diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java index cf1f7c3..8a3a675 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java @@ -21,7 +21,6 @@ import java.nio.charset.Charset; import org.apache.camel.Category; import org.apache.camel.Component; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.spi.Metadata; @@ -96,14 +95,6 @@ public class StreamEndpoint extends DefaultEndpoint { return new StreamProducer(this, getEndpointUri()); } - protected Exchange createExchange(Object body, long index, boolean last) { - Exchange exchange = createExchange(); - exchange.getIn().setBody(body); - exchange.getIn().setHeader(StreamConstants.STREAM_INDEX, index); - exchange.getIn().setHeader(StreamConstants.STREAM_COMPLETE, last); - return exchange; - } - // Properties //-------------------------------------------------------------------------