This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new 86881b4 CAMEL-16222: PooledExchangeFactory experiment 86881b4 is described below commit 86881b4b6e448b7b2346cc2a47485fde855636e1 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Feb 22 17:24:11 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../apache/camel/component/xmpp/XmppConsumer.java | 23 +++++++++++++++++----- .../apache/camel/component/xmpp/XmppEndpoint.java | 9 --------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java index cac49fd..67d5669 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java @@ -215,16 +215,20 @@ public class XmppConsumer extends DefaultConsumer implements IncomingChatMessage endpoint.getUser(), endpoint.getParticipant(), message.getBody()); } - Exchange exchange = endpoint.createExchange(message); - - if (endpoint.isDoc()) { - exchange.getIn().setHeader(XmppConstants.DOC_HEADER, message); - } + Exchange exchange = createExchange(message); try { + if (endpoint.isDoc()) { + exchange.getIn().setHeader(XmppConstants.DOC_HEADER, message); + } getProcessor().process(exchange); } catch (Exception e) { exchange.setException(e); } finally { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + releaseExchange(exchange, false); + // must remove message from muc to avoid messages stacking up and causing OutOfMemoryError // pollMessage is a non blocking method // (see http://issues.igniterealtime.org/browse/SMACK-129) @@ -233,9 +237,18 @@ public class XmppConsumer extends DefaultConsumer implements IncomingChatMessage muc.pollMessage(); } catch (MultiUserChatException.MucNotJoinedException e) { LOG.debug("Error while polling message from MultiUserChat. This exception will be ignored.", e); + } catch (Throwable e) { + // ignore others } } } } + private Exchange createExchange(Stanza packet) { + Exchange exchange = createExchange(false); + exchange.setProperty(Exchange.BINDING, endpoint.getBinding()); + exchange.setIn(new XmppMessage(exchange, packet)); + return exchange; + } + } diff --git a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java index 427fde4..e2f6cf0 100644 --- a/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java +++ b/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.camel.Category; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.spi.HeaderFilterStrategy; @@ -41,7 +40,6 @@ import org.jivesoftware.smack.SmackException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.XMPPException; import org.jivesoftware.smack.XMPPException.XMPPErrorException; -import org.jivesoftware.smack.packet.Stanza; import org.jivesoftware.smack.packet.StanzaError; import org.jivesoftware.smack.packet.StanzaError.Condition; import org.jivesoftware.smack.tcp.XMPPTCPConnection; @@ -155,13 +153,6 @@ public class XmppEndpoint extends DefaultEndpoint implements HeaderFilterStrateg return answer; } - public Exchange createExchange(Stanza packet) { - Exchange exchange = super.createExchange(); - exchange.setProperty(Exchange.BINDING, getBinding()); - exchange.setIn(new XmppMessage(exchange, packet)); - return exchange; - } - @Override protected String createEndpointUri() { return "xmpp://" + host + ":" + port + "/" + getParticipant() + "?serviceName=" + serviceName;