This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new 401890f CAMEL-16222: PooledExchangeFactory experiment 401890f is described below commit 401890f0696e15b0646206f5dc19e387651152aa Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Feb 22 09:38:50 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../apache/camel/component/nats/NatsConsumer.java | 40 ++++++++++++---------- .../camel/component/nitrite/NitriteConsumer.java | 3 +- .../apache/camel/component/nsq/NsqConsumer.java | 13 ++++--- .../camel/oaipmh/handler/AbstractHandler.java | 8 +++-- .../org/apache/camel/oaipmh/handler/Harvester.java | 8 ++--- .../component/optaplanner/OptaPlannerConsumer.java | 8 ++--- 6 files changed, 44 insertions(+), 36 deletions(-) diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java index 11b15d7..0c78677 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java @@ -153,30 +153,32 @@ public class NatsConsumer extends DefaultConsumer { @Override public void onMessage(Message msg) throws InterruptedException { LOG.debug("Received Message: {}", msg); - Exchange exchange = getEndpoint().createExchange(); - exchange.getIn().setBody(msg.getData()); - exchange.getIn().setHeader(NatsConstants.NATS_REPLY_TO, msg.getReplyTo()); - exchange.getIn().setHeader(NatsConstants.NATS_SID, msg.getSID()); - exchange.getIn().setHeader(NatsConstants.NATS_SUBJECT, msg.getSubject()); - exchange.getIn().setHeader(NatsConstants.NATS_QUEUE_NAME, msg.getSubscription().getQueueName()); - exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis()); + Exchange exchange = createExchange(false); try { + exchange.getIn().setBody(msg.getData()); + exchange.getIn().setHeader(NatsConstants.NATS_REPLY_TO, msg.getReplyTo()); + exchange.getIn().setHeader(NatsConstants.NATS_SID, msg.getSID()); + exchange.getIn().setHeader(NatsConstants.NATS_SUBJECT, msg.getSubject()); + exchange.getIn().setHeader(NatsConstants.NATS_QUEUE_NAME, msg.getSubscription().getQueueName()); + exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis()); + processor.process(exchange); - } catch (Exception e) { - getExceptionHandler().handleException("Error during processing", exchange, e); - } - // is there a reply? - if (!configuration.isReplyToDisabled() - && msg.getReplyTo() != null && msg.getConnection() != null) { - Connection con = msg.getConnection(); - byte[] data = exchange.getMessage().getBody(byte[].class); - if (data != null) { - LOG.debug("Publishing replyTo: {} message", msg.getReplyTo()); - con.publish(msg.getReplyTo(), data); + // is there a reply? + if (!configuration.isReplyToDisabled() + && msg.getReplyTo() != null && msg.getConnection() != null) { + Connection con = msg.getConnection(); + byte[] data = exchange.getMessage().getBody(byte[].class); + if (data != null) { + LOG.debug("Publishing replyTo: {} message", msg.getReplyTo()); + con.publish(msg.getReplyTo(), data); + } } + } catch (Exception e){ + getExceptionHandler().handleException("Error during processing", exchange, e); + } finally { + releaseExchange(exchange, false); } - } } } diff --git a/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java b/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java index a9fd003..8742460 100644 --- a/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java +++ b/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java @@ -61,7 +61,7 @@ public class NitriteConsumer extends DefaultConsumer { @Override public void onChange(ChangeInfo changeInfo) { for (ChangedItem changedItem : changeInfo.getChangedItems()) { - Exchange exchange = endpoint.createExchange(); + Exchange exchange = createExchange(false); Message message = exchange.getMessage(); message.setHeader(NitriteConstants.CHANGE_TIMESTAMP, changedItem.getChangeTimestamp()); message.setHeader(NitriteConstants.CHANGE_TYPE, changedItem.getChangeType()); @@ -75,6 +75,7 @@ public class NitriteConsumer extends DefaultConsumer { if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + releaseExchange(exchange, false); } } } diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java index 10c44fb..04b9514 100644 --- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java +++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java @@ -111,12 +111,13 @@ public class NsqConsumer extends DefaultConsumer { @Override public void message(NSQMessage msg) { LOG.debug("Received Message: {}", msg); - Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly); - exchange.getIn().setBody(msg.getMessage()); - exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ID, msg.getId()); - exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS, msg.getAttempts()); - exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_TIMESTAMP, msg.getTimestamp()); + Exchange exchange = createExchange(false); try { + exchange.setPattern(ExchangePattern.InOnly); + exchange.getIn().setBody(msg.getMessage()); + exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ID, msg.getId()); + exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS, msg.getAttempts()); + exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_TIMESTAMP, msg.getTimestamp()); if (configuration.getAutoFinish()) { msg.finished(); } else { @@ -129,6 +130,8 @@ public class NsqConsumer extends DefaultConsumer { msg.requeue((int) configuration.getRequeueInterval()); } getExceptionHandler().handleException("Error during processing", exchange, e); + } finally { + releaseExchange(exchange, false); } } } diff --git a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java index 88cfd7d..c204028 100644 --- a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java +++ b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java @@ -16,6 +16,7 @@ */ package org.apache.camel.oaipmh.handler; +import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -30,18 +31,20 @@ public abstract class AbstractHandler { private static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class); + protected final Consumer consumer; protected final Endpoint endpoint; protected final Processor processor; protected final ExceptionHandler exceptionHandler; public AbstractHandler(OAIPMHConsumer consumer) { + this.consumer = consumer; this.endpoint = consumer.getEndpoint(); this.processor = consumer.getAsyncProcessor(); this.exceptionHandler = consumer.getExceptionHandler(); } protected void send(OAIPMHResponse message) { - Exchange exchange = endpoint.createExchange(); + Exchange exchange = consumer.createExchange(false); String xml = message.getRawResponse(); exchange.getIn().setBody(xml); try { @@ -49,12 +52,13 @@ public abstract class AbstractHandler { LOG.trace("sending exchange: {}", exchange); processor.process(exchange); } catch (Exception e) { - throw new RuntimeCamelException("Error sending exchange: " + exchange, e); + exchange.setException(e); } finally { // log exception if an exception occurred and was not handled if (exchange.getException() != null) { exceptionHandler.handleException("Error processing exchange", exchange, exchange.getException()); } + consumer.releaseExchange(exchange, false); } } diff --git a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java index 494b426..84a795b 100644 --- a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java +++ b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java @@ -69,7 +69,7 @@ public class Harvester { } - private boolean harvest() throws IOException, URISyntaxException, ParserConfigurationException, SAXException, Exception { + private boolean harvest() throws Exception { boolean hasNext = false; if (!this.empty) { String responseXML = httpClient.doRequest(this.baseURI, this.verb, this.set, this.from, this.until, this.metadata, @@ -88,13 +88,11 @@ public class Harvester { return hasNext; } - public void asynHarvest() throws IOException, URISyntaxException, ParserConfigurationException, SAXException, Exception { + public void asynHarvest() throws Exception { this.harvest(); - } - public List<String> synHarvest(boolean onlyFirst) - throws IOException, URISyntaxException, ParserConfigurationException, SAXException, Exception { + public List<String> synHarvest(boolean onlyFirst) throws Exception { while (this.harvest()) { if (onlyFirst) { break; diff --git a/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java b/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java index 3ea6ccf..a4dc33f 100644 --- a/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java +++ b/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java @@ -59,22 +59,22 @@ public class OptaPlannerConsumer extends DefaultConsumer { } public void processEvent(BestSolutionChangedEvent<Object> event) { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getMessage().setHeader(OptaPlannerConstants.BEST_SOLUTION, event.getNewBestSolution()); try { getProcessor().process(exchange); } catch (Exception e) { - LOG.error("Error processing event ", e); + getExceptionHandler().handleException(e); } } public void processSolverJobEvent(OptaplannerSolutionEvent event) { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getMessage().setHeader(OptaPlannerConstants.BEST_SOLUTION, event.getBestSolution()); try { getProcessor().process(exchange); } catch (Exception e) { - LOG.error("Error processing event ", e); + getExceptionHandler().handleException(e); } }