This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 89c2bd2327d8a64129ac613e5602ebcf3e646515 Author: Otavio R. Piske <angusyo...@gmail.com> AuthorDate: Sat Jan 27 20:21:04 2024 +0100 (chores) camel-jms: cleaned up duplicated code Signed-off-by: Otavio R. Piske <angusyo...@gmail.com> --- .../apache/camel/component/jms/JmsProducer.java | 27 ++++++++------ .../component/jms/StreamMessageInputStream.java | 20 +++------- .../camel/component/jms/reply/JmsReplyHelper.java | 41 +++++++++++++++++++++ .../component/jms/reply/QueueReplyManager.java | 43 ++++++++-------------- .../component/jms/reply/ReplyManagerSupport.java | 10 +++++ .../reply/SharedQueueMessageListenerContainer.java | 13 +------ .../SharedQueueSimpleMessageListenerContainer.java | 13 +------ .../jms/reply/TemporaryQueueReplyManager.java | 8 +--- 8 files changed, 92 insertions(+), 83 deletions(-) diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java index ca330c957aa..bb51ff9f97f 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java @@ -578,12 +578,7 @@ public class JmsProducer extends DefaultAsyncProducer { name = "JmsReplyManagerOnTimeout[" + getEndpoint().getEndpointConfiguredDestinationName() + "]"; // allow the timeout thread to timeout so during normal operation we do not have a idle thread - int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers(); - if (max <= 0) { - throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1"); - } - ExecutorService replyManagerExecutorService - = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max); + ExecutorService replyManagerExecutorService = createReplyManagerExecutorService(replyManager, name); replyManager.setOnTimeoutExecutorService(replyManagerExecutorService); ServiceHelper.startService(replyManager); @@ -591,6 +586,19 @@ public class JmsProducer extends DefaultAsyncProducer { return replyManager; } + private ExecutorService createReplyManagerExecutorService(ReplyManager replyManager, String name) { + int max = doGetMax(); + return getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max); + } + + private int doGetMax() { + int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers(); + if (max <= 0) { + throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1"); + } + return max; + } + protected ReplyManager createReplyManager(String replyTo) throws Exception { // use a regular queue ReplyManager replyManager = new QueueReplyManager(getEndpoint().getCamelContext()); @@ -603,12 +611,7 @@ public class JmsProducer extends DefaultAsyncProducer { name = "JmsReplyManagerOnTimeout[" + replyTo + "]"; // allow the timeout thread to timeout so during normal operation we do not have a idle thread - int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers(); - if (max <= 0) { - throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1"); - } - ExecutorService replyManagerExecutorService - = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max); + ExecutorService replyManagerExecutorService = createReplyManagerExecutorService(replyManager, name); replyManager.setOnTimeoutExecutorService(replyManagerExecutorService); ServiceHelper.startService(replyManager); diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java index 23016719aa3..72295b24abc 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java @@ -46,6 +46,10 @@ public class StreamMessageInputStream extends InputStream { @Override public int read(byte[] array) throws IOException { + return doRead(array); + } + + private int doRead(byte[] array) throws IOException { try { int num = message.readBytes(array); if (num < 0) { @@ -66,21 +70,7 @@ public class StreamMessageInputStream extends InputStream { @Override public int read(byte[] array, int off, int len) throws IOException { // we cannot honor off and len, but assuming off is always 0 - try { - int num = message.readBytes(array); - if (num < 0) { - //the first 128K(FileUtil.BUFFER_SIZE/128K is used when sending JMS StreamMessage) - //buffer reached, give a chance to see if there is the next 128K buffer - num = message.readBytes(array); - } - eof = num < 0; - return num; - } catch (MessageEOFException e) { - eof = true; - return -1; - } catch (JMSException e) { - throw new IOException(e); - } + return doRead(array); } @Override diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java new file mode 100644 index 00000000000..b5d2efd885d --- /dev/null +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms.reply; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class JmsReplyHelper { + private static final Logger LOG = LoggerFactory.getLogger(JmsReplyHelper.class); + + private JmsReplyHelper() { + + } + + static String getMessageSelector(String fixedMessageSelector, MessageSelectorCreator creator) { + // override this method and return the appropriate selector + String id = null; + if (fixedMessageSelector != null) { + id = fixedMessageSelector; + } else if (creator != null) { + id = creator.get(); + } + + LOG.trace("Using MessageSelector[{}]", id); + return id; + } +} diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java index b1d819a1ba8..dda1293838f 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java @@ -29,7 +29,6 @@ import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.component.jms.ConsumerType; import org.apache.camel.component.jms.DefaultSpringErrorHandler; -import org.apache.camel.component.jms.MessageListenerContainerFactory; import org.apache.camel.component.jms.ReplyToType; import org.apache.camel.component.jms.SimpleJmsMessageListenerContainer; import org.springframework.jms.listener.AbstractMessageListenerContainer; @@ -130,12 +129,7 @@ public class QueueReplyManager extends ReplyManagerSupport { } else if (endpoint.getConfiguration().getReplyToConsumerType() == ConsumerType.Simple) { return createSimpleListenerContainer(); } else { - MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory(); - if (factory != null) { - return factory.createMessageListenerContainer(endpoint); - } - throw new IllegalArgumentException( - "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured"); + return getAbstractMessageListenerContainer(endpoint); } } @@ -194,16 +188,7 @@ public class QueueReplyManager extends ReplyManagerSupport { answer.setSessionTransacted(false); // other optional properties - if (endpoint.getExceptionListener() != null) { - answer.setExceptionListener(endpoint.getExceptionListener()); - } - if (endpoint.getErrorHandler() != null) { - answer.setErrorHandler(endpoint.getErrorHandler()); - } else { - answer.setErrorHandler(new DefaultSpringErrorHandler( - endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(), - endpoint.isErrorHandlerLogStackTrace())); - } + setOptionalProperties(answer); // set task executor if (endpoint.getTaskExecutor() != null) { log.debug("Using custom TaskExecutor: {} on listener container: {}", endpoint.getTaskExecutor(), answer); @@ -302,16 +287,7 @@ public class QueueReplyManager extends ReplyManagerSupport { answer.setSessionTransacted(false); // other optional properties - if (endpoint.getExceptionListener() != null) { - answer.setExceptionListener(endpoint.getExceptionListener()); - } - if (endpoint.getErrorHandler() != null) { - answer.setErrorHandler(endpoint.getErrorHandler()); - } else { - answer.setErrorHandler(new DefaultSpringErrorHandler( - endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(), - endpoint.isErrorHandlerLogStackTrace())); - } + setOptionalProperties(answer); if (endpoint.getReceiveTimeout() >= 0) { answer.setReceiveTimeout(endpoint.getReceiveTimeout()); } @@ -344,4 +320,17 @@ public class QueueReplyManager extends ReplyManagerSupport { return answer; } + + private <T extends AbstractMessageListenerContainer> void setOptionalProperties(T answer) { + if (endpoint.getExceptionListener() != null) { + answer.setExceptionListener(endpoint.getExceptionListener()); + } + if (endpoint.getErrorHandler() != null) { + answer.setErrorHandler(endpoint.getErrorHandler()); + } else { + answer.setErrorHandler(new DefaultSpringErrorHandler( + endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(), + endpoint.isErrorHandlerLogStackTrace())); + } + } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index ef91941f669..8118c78ce27 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -35,6 +35,7 @@ import org.apache.camel.component.jms.JmsConstants; import org.apache.camel.component.jms.JmsEndpoint; import org.apache.camel.component.jms.JmsMessage; import org.apache.camel.component.jms.JmsMessageHelper; +import org.apache.camel.component.jms.MessageListenerContainerFactory; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; @@ -313,4 +314,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl answer.setClientId(clientId); } } + + protected static AbstractMessageListenerContainer getAbstractMessageListenerContainer(JmsEndpoint endpoint) { + MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory(); + if (factory != null) { + return factory.createMessageListenerContainer(endpoint); + } + throw new IllegalArgumentException( + "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured"); + } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java index 0e9179de121..39bf18ad1d1 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java @@ -63,19 +63,10 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION); } + // override this method and return the appropriate selector @Override public String getMessageSelector() { - // override this method and return the appropriate selector - String id = null; - if (fixedMessageSelector != null) { - id = fixedMessageSelector; - } else if (creator != null) { - id = creator.get(); - } - if (logger.isTraceEnabled()) { - logger.trace("Using MessageSelector[" + id + "]"); - } - return id; + return JmsReplyHelper.getMessageSelector(fixedMessageSelector, creator); } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java index e9931b8fa94..f145fb3e797 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java @@ -57,19 +57,10 @@ public class SharedQueueSimpleMessageListenerContainer extends SimpleJmsMessageL this.creator = creator; } + // override this method and return the appropriate selector @Override public String getMessageSelector() { - // override this method and return the appropriate selector - String id = null; - if (fixedMessageSelector != null) { - id = fixedMessageSelector; - } else if (creator != null) { - id = creator.get(); - } - if (logger.isTraceEnabled()) { - logger.trace("Using MessageSelector[" + id + "]"); - } - return id; + return JmsReplyHelper.getMessageSelector(fixedMessageSelector, creator); } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java index 2557f14e9e9..ba5d4a22e42 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java @@ -31,7 +31,6 @@ import org.apache.camel.Exchange; import org.apache.camel.component.jms.ConsumerType; import org.apache.camel.component.jms.DefaultJmsMessageListenerContainer; import org.apache.camel.component.jms.DefaultSpringErrorHandler; -import org.apache.camel.component.jms.MessageListenerContainerFactory; import org.apache.camel.component.jms.SimpleJmsMessageListenerContainer; import org.springframework.jms.listener.AbstractMessageListenerContainer; import org.springframework.jms.listener.DefaultMessageListenerContainer; @@ -107,12 +106,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { } else if (endpoint.getConfiguration().getReplyToConsumerType() == ConsumerType.Simple) { return createSimpleListenerContainer(); } else { - MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory(); - if (factory != null) { - return factory.createMessageListenerContainer(endpoint); - } - throw new IllegalArgumentException( - "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured"); + return getAbstractMessageListenerContainer(endpoint); } }