This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9be726dd3ebcb0293c19ac5d6978a07840910378 Author: rnetuka <rnet...@redhat.com> AuthorDate: Thu Nov 19 15:50:11 2020 +0100 [CAMEL-16008] Implement consumer priority in camel --- components/camel-jms/src/main/docs/jms-component.adoc | 3 ++- .../apache/camel/component/jms/JmsConfiguration.java | 18 ++++++++++++++++++ .../org/apache/camel/component/jms/JmsEndpoint.java | 3 +++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/components/camel-jms/src/main/docs/jms-component.adoc b/components/camel-jms/src/main/docs/jms-component.adoc index f370065..e119799 100644 --- a/components/camel-jms/src/main/docs/jms-component.adoc +++ b/components/camel-jms/src/main/docs/jms-component.adoc @@ -331,7 +331,7 @@ with the following path and query parameters: |=== -=== Query Parameters (94 parameters): +=== Query Parameters (95 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -350,6 +350,7 @@ with the following path and query parameters: | *cacheLevel* (consumer) | Sets the cache level by ID for the underlying JMS resources. See cacheLevelName option for more details. | | int | *cacheLevelName* (consumer) | Sets the cache level by name for the underlying JMS resources. Possible values are: CACHE_AUTO, CACHE_CONNECTION, CACHE_CONSUMER, CACHE_NONE, and CACHE_SESSION. The default setting is CACHE_AUTO. See the Spring documentation and Transactions Cache Levels for more information. There are 5 enums and the value can be one of: CACHE_AUTO, CACHE_CONNECTION, CACHE_CONSUMER, CACHE_NONE, CACHE_SESSION | CACHE_AUTO | String | *concurrentConsumers* (consumer) | Specifies the default number of concurrent consumers when consuming from JMS (not for request/reply over JMS). See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. When doing request/reply over JMS then the option replyToConcurrentConsumers is used to control number of concurrent consumers on the reply message listener. | 1 | int +| *consumerPriority* (consumer) | Consumer priorities allow you to ensure that high priority consumers receive messages while they are active. Normally, active consumers connected to a queue receive messages from it in a round-robin fashion. When consumer priorities are in use, messages are delivered round-robin if multiple active consumers exist with the same high priority. Messages will only going to lower priority consumers when the high priority consumers do not have credit available [...] | *maxConcurrentConsumers* (consumer) | Specifies the maximum number of concurrent consumers when consuming from JMS (not for request/reply over JMS). See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. When doing request/reply over JMS then the option replyToMaxConcurrentConsumers is used to control number of concurrent consumers on the reply message listener. | | int | *replyToDeliveryPersistent* (consumer) | Specifies whether to use persistent delivery by default for replies. | true | boolean | *selector* (consumer) | Sets the JMS selector to use | | String diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java index 57e9a41..9efcdac 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java @@ -521,6 +521,15 @@ public class JmsConfiguration implements Cloneable { description = "Sets whether synchronous processing should be strictly used") private boolean synchronous; + @UriParam(label = "consumer", description = "Consumer priorities allow you to ensure that high priority consumers" + + " receive messages while they are active. Normally, active consumers connected to a queue receive messages" + + " from it in a round-robin fashion. When consumer priorities are in use, messages are delivered round-robin" + + " if multiple active consumers exist with the same high priority. Messages will only going to lower priority" + + " consumers when the high priority consumers do not have credit available to consume the message, or those" + + " high priority consumers have declined to accept the message (for instance because it does not meet the" + + " criteria of any selectors associated with the consumer).") + private int consumerPriority; + public JmsConfiguration() { } @@ -2291,4 +2300,13 @@ public class JmsConfiguration implements Cloneable { public void setSynchronous(boolean synchronous) { this.synchronous = synchronous; } + + public void setConsumerPriority(int priority) { + this.consumerPriority = priority; + } + + public int getConsumerPriority() { + return consumerPriority; + } + } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index fe37b4b..8388e65 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -197,6 +197,9 @@ public class JmsEndpoint extends DefaultEndpoint public void configureListenerContainer(AbstractMessageListenerContainer listenerContainer, JmsConsumer consumer) { if (destinationName != null) { + if (getConfiguration().getConsumerPriority() != 0) { + destinationName += "?consumer-priority=" + getConfiguration().getConsumerPriority(); + } listenerContainer.setDestinationName(destinationName); LOG.debug("Using destinationName: {} on listenerContainer: {}", destinationName, listenerContainer); } else if (destination != null) {