This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch on-header in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5bc7cb60f2d6ba88ddab36fefa708ccdc6ca130f Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Feb 6 11:37:18 2021 +0100 CAMEL-16102: Source code generate @InvokeOnHeader for reflection free --- .../AtomixMapProducerInvokeOnHeaderFactory.java | 42 ++++---- ...omixMessagingProducerInvokeOnHeaderFactory.java | 6 +- ...tomixMultiMapProducerInvokeOnHeaderFactory.java | 30 +++--- .../AtomixQueueProducerInvokeOnHeaderFactory.java | 28 ++--- .../AtomixSetProducerInvokeOnHeaderFactory.java | 18 ++-- .../AtomixValueProducerInvokeOnHeaderFactory.java | 12 +-- ...l.component.atomix.client.map.AtomixMapProducer | 2 +- ...atomix.client.messaging.AtomixMessagingProducer | 2 +- ...t.atomix.client.multimap.AtomixMultiMapProducer | 2 +- ...mponent.atomix.client.queue.AtomixQueueProducer | 2 +- ...l.component.atomix.client.set.AtomixSetProducer | 2 +- ...mponent.atomix.client.value.AtomixValueProducer | 2 +- .../client/AbstractAtomixClientProducer.java | 15 +-- .../atomix/client/map/AtomixMapProducer.java | 56 +++------- .../client/messaging/AtomixMessagingProducer.java | 16 +-- .../client/multimap/AtomixMultiMapProducer.java | 118 ++++++++------------- .../atomix/client/queue/AtomixQueueProducer.java | 44 ++------ .../atomix/client/set/AtomixSetProducer.java | 32 ++---- .../atomix/client/value/AtomixValueProducer.java | 24 +---- .../camel/support/HeaderSelectorProducer.java | 20 +++- .../packaging/GenerateInvokeOnHeaderMojo.java | 12 ++- 21 files changed, 188 insertions(+), 297 deletions(-) diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/map/AtomixMapProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/map/AtomixMapProducerInvokeOnHeaderFactory.java index d8cc6e8..3480b94 100644 --- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/map/AtomixMapProducerInvokeOnHeaderFactory.java +++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/map/AtomixMapProducerInvokeOnHeaderFactory.java @@ -16,30 +16,30 @@ public class AtomixMapProducerInvokeOnHeaderFactory implements InvokeOnHeaderStr public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception { org.apache.camel.component.atomix.client.map.AtomixMapProducer target = (org.apache.camel.component.atomix.client.map.AtomixMapProducer) obj; switch (key) { - case "values": - case "VALUES": return target.onValues(exchange.getMessage(), callback); - case "is_empty": - case "IS_EMPTY": return target.onIsEmpty(exchange.getMessage(), callback); - case "replace": - case "REPLACE": return target.onReplace(exchange.getMessage(), callback); - case "put_if_absent": - case "PUT_IF_ABSENT": return target.onPutIfAbsent(exchange.getMessage(), callback); + case "clear": + case "CLEAR": target.onClear(exchange.getMessage(), callback); return callback; + case "contains_key": + case "CONTAINS_KEY": target.onContainsKey(exchange.getMessage(), callback); return callback; + case "contains_value": + case "CONTAINS_VALUE": target.onContainsValue(exchange.getMessage(), callback); return callback; + case "entry_set": + case "ENTRY_SET": target.onEntrySet(exchange.getMessage(), callback); return callback; case "get": - case "GET": return target.onGet(exchange.getMessage(), callback); + case "GET": target.onGet(exchange.getMessage(), callback); return callback; + case "is_empty": + case "IS_EMPTY": target.onIsEmpty(exchange.getMessage(), callback); return callback; case "put": - case "PUT": return target.onPut(exchange.getMessage(), callback); - case "entry_set": - case "ENTRY_SET": return target.onEntrySet(exchange.getMessage(), callback); - case "size": - case "SIZE": return target.onSize(exchange.getMessage(), callback); - case "clear": - case "CLEAR": return target.onClear(exchange.getMessage(), callback); + case "PUT": target.onPut(exchange.getMessage(), callback); return callback; + case "put_if_absent": + case "PUT_IF_ABSENT": target.onPutIfAbsent(exchange.getMessage(), callback); return callback; case "remove": - case "REMOVE": return target.onRemove(exchange.getMessage(), callback); - case "contains_value": - case "CONTAINS_VALUE": return target.onContainsValue(exchange.getMessage(), callback); - case "contains_key": - case "CONTAINS_KEY": return target.onContainsKey(exchange.getMessage(), callback); + case "REMOVE": target.onRemove(exchange.getMessage(), callback); return callback; + case "replace": + case "REPLACE": target.onReplace(exchange.getMessage(), callback); return callback; + case "size": + case "SIZE": target.onSize(exchange.getMessage(), callback); return callback; + case "values": + case "VALUES": target.onValues(exchange.getMessage(), callback); return callback; default: return null; } } diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducerInvokeOnHeaderFactory.java index 445c845..995bae3 100644 --- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducerInvokeOnHeaderFactory.java +++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducerInvokeOnHeaderFactory.java @@ -16,10 +16,10 @@ public class AtomixMessagingProducerInvokeOnHeaderFactory implements InvokeOnHea public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception { org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer target = (org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer) obj; switch (key) { - case "direct": - case "DIRECT": return target.onDirect(exchange.getMessage(), callback); case "broadcast": - case "BROADCAST": return target.onBroadcast(exchange.getMessage(), callback); + case "BROADCAST": target.onBroadcast(exchange.getMessage(), callback); return callback; + case "direct": + case "DIRECT": target.onDirect(exchange.getMessage(), callback); return callback; default: return null; } } diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducerInvokeOnHeaderFactory.java index 5597dca..8056159 100644 --- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducerInvokeOnHeaderFactory.java +++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducerInvokeOnHeaderFactory.java @@ -16,22 +16,26 @@ public class AtomixMultiMapProducerInvokeOnHeaderFactory implements InvokeOnHead public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception { org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer target = (org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer) obj; switch (key) { - case "is_empty": - case "IS_EMPTY": return target.onIsEmpty(exchange.getMessage(), callback); - case "size": - case "SIZE": return target.onSize(exchange.getMessage(), callback); + case "clear": + case "CLEAR": target.onClear(exchange.getMessage(), callback); return callback; + case "contains_entry": + case "CONTAINS_ENTRY": target.onContainsEntry(exchange.getMessage(), callback); return callback; case "contains_key": - case "CONTAINS_KEY": return target.onContainsKey(exchange.getMessage(), callback); - case "put": - case "PUT": return target.onPut(exchange.getMessage(), callback); - case "remove_value": - case "REMOVE_VALUE": return target.onRemoveValue(exchange.getMessage(), callback); + case "CONTAINS_KEY": target.onContainsKey(exchange.getMessage(), callback); return callback; + case "contains_value": + case "CONTAINS_VALUE": target.onContainsValue(exchange.getMessage(), callback); return callback; case "get": - case "GET": return target.onGet(exchange.getMessage(), callback); - case "clear": - case "CLEAR": return target.onClear(exchange.getMessage(), callback); + case "GET": target.onGet(exchange.getMessage(), callback); return callback; + case "is_empty": + case "IS_EMPTY": target.onIsEmpty(exchange.getMessage(), callback); return callback; + case "put": + case "PUT": target.onPut(exchange.getMessage(), callback); return callback; case "remove": - case "REMOVE": return target.onRemove(exchange.getMessage(), callback); + case "REMOVE": target.onRemove(exchange.getMessage(), callback); return callback; + case "remove_value": + case "REMOVE_VALUE": target.onRemoveValue(exchange.getMessage(), callback); return callback; + case "size": + case "SIZE": target.onSize(exchange.getMessage(), callback); return callback; default: return null; } } diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducerInvokeOnHeaderFactory.java index 3f60ee6..e62c138 100644 --- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducerInvokeOnHeaderFactory.java +++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducerInvokeOnHeaderFactory.java @@ -16,24 +16,24 @@ public class AtomixQueueProducerInvokeOnHeaderFactory implements InvokeOnHeaderS public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception { org.apache.camel.component.atomix.client.queue.AtomixQueueProducer target = (org.apache.camel.component.atomix.client.queue.AtomixQueueProducer) obj; switch (key) { - case "poll": - case "POLL": return target.onPoll(exchange.getMessage(), callback); + case "add": + case "ADD": target.onAdd(exchange.getMessage(), callback); return callback; case "clear": - case "CLEAR": return target.onClear(exchange.getMessage(), callback); - case "size": - case "SIZE": return target.onSize(exchange.getMessage(), callback); + case "CLEAR": target.onClear(exchange.getMessage(), callback); return callback; case "contains": - case "CONTAINS": return target.onContains(exchange.getMessage(), callback); - case "remove": - case "REMOVE": return target.onRemove(exchange.getMessage(), callback); - case "peek": - case "PEEK": return target.onPeek(exchange.getMessage(), callback); - case "add": - case "ADD": return target.onAdd(exchange.getMessage(), callback); + case "CONTAINS": target.onContains(exchange.getMessage(), callback); return callback; case "is_empty": - case "IS_EMPTY": return target.onIsEmpty(exchange.getMessage(), callback); + case "IS_EMPTY": target.onIsEmpty(exchange.getMessage(), callback); return callback; case "offer": - case "OFFER": return target.onOffer(exchange.getMessage(), callback); + case "OFFER": target.onOffer(exchange.getMessage(), callback); return callback; + case "peek": + case "PEEK": target.onPeek(exchange.getMessage(), callback); return callback; + case "poll": + case "POLL": target.onPoll(exchange.getMessage(), callback); return callback; + case "remove": + case "REMOVE": target.onRemove(exchange.getMessage(), callback); return callback; + case "size": + case "SIZE": target.onSize(exchange.getMessage(), callback); return callback; default: return null; } } diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/set/AtomixSetProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/set/AtomixSetProducerInvokeOnHeaderFactory.java index d432ba5..9311986 100644 --- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/set/AtomixSetProducerInvokeOnHeaderFactory.java +++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/set/AtomixSetProducerInvokeOnHeaderFactory.java @@ -16,18 +16,18 @@ public class AtomixSetProducerInvokeOnHeaderFactory implements InvokeOnHeaderStr public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception { org.apache.camel.component.atomix.client.set.AtomixSetProducer target = (org.apache.camel.component.atomix.client.set.AtomixSetProducer) obj; switch (key) { + case "add": + case "ADD": target.onAdd(exchange.getMessage(), callback); return callback; case "clear": - case "CLEAR": return target.onClear(exchange.getMessage(), callback); + case "CLEAR": target.onClear(exchange.getMessage(), callback); return callback; case "contains": - case "CONTAINS": return target.onContains(exchange.getMessage(), callback); - case "size": - case "SIZE": return target.onSize(exchange.getMessage(), callback); - case "add": - case "ADD": return target.onAdd(exchange.getMessage(), callback); - case "remove": - case "REMOVE": return target.onRemove(exchange.getMessage(), callback); + case "CONTAINS": target.onContains(exchange.getMessage(), callback); return callback; case "is_empty": - case "IS_EMPTY": return target.onIsEmpty(exchange.getMessage(), callback); + case "IS_EMPTY": target.onIsEmpty(exchange.getMessage(), callback); return callback; + case "remove": + case "REMOVE": target.onRemove(exchange.getMessage(), callback); return callback; + case "size": + case "SIZE": target.onSize(exchange.getMessage(), callback); return callback; default: return null; } } diff --git a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/value/AtomixValueProducerInvokeOnHeaderFactory.java b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/value/AtomixValueProducerInvokeOnHeaderFactory.java index ce51057..2aef392 100644 --- a/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/value/AtomixValueProducerInvokeOnHeaderFactory.java +++ b/components/camel-atomix/src/generated/java/org/apache/camel/component/atomix/client/value/AtomixValueProducerInvokeOnHeaderFactory.java @@ -16,14 +16,14 @@ public class AtomixValueProducerInvokeOnHeaderFactory implements InvokeOnHeaderS public Object invoke(Object obj, String key, Exchange exchange, AsyncCallback callback) throws Exception { org.apache.camel.component.atomix.client.value.AtomixValueProducer target = (org.apache.camel.component.atomix.client.value.AtomixValueProducer) obj; switch (key) { - case "set": - case "SET": return target.onSet(exchange.getMessage(), callback); case "compare_and_set": - case "COMPARE_AND_SET": return target.onCompareAndSet(exchange.getMessage(), callback); - case "get_and_set": - case "GET_AND_SET": return target.onGetAndSet(exchange.getMessage(), callback); + case "COMPARE_AND_SET": target.onCompareAndSet(exchange.getMessage(), callback); return callback; case "get": - case "GET": return target.onGet(exchange.getMessage(), callback); + case "GET": target.onGet(exchange.getMessage(), callback); return callback; + case "get_and_set": + case "GET_AND_SET": target.onGetAndSet(exchange.getMessage(), callback); return callback; + case "set": + case "SET": target.onSet(exchange.getMessage(), callback); return callback; default: return null; } } diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.map.AtomixMapProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.map.AtomixMapProducer index 457bff1..ad0a22c 100644 --- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.map.AtomixMapProducer +++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.map.AtomixMapProducer @@ -1,2 +1,2 @@ # Generated by camel build tools - do NOT edit this file! -class=org.apache.camel.component.atomix.client.map.AtomixMapProducer.AtomixMapProducerInvokeOnHeaderFactory +class=org.apache.camel.component.atomix.client.map.AtomixMapProducerInvokeOnHeaderFactory diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer index ab52975..845cc5b 100644 --- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer +++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer @@ -1,2 +1,2 @@ # Generated by camel build tools - do NOT edit this file! -class=org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducer.AtomixMessagingProducerInvokeOnHeaderFactory +class=org.apache.camel.component.atomix.client.messaging.AtomixMessagingProducerInvokeOnHeaderFactory diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer index 0ffdfe3..d4e57ec 100644 --- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer +++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer @@ -1,2 +1,2 @@ # Generated by camel build tools - do NOT edit this file! -class=org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducer.AtomixMultiMapProducerInvokeOnHeaderFactory +class=org.apache.camel.component.atomix.client.multimap.AtomixMultiMapProducerInvokeOnHeaderFactory diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.queue.AtomixQueueProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.queue.AtomixQueueProducer index 4c242db..67df94d 100644 --- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.queue.AtomixQueueProducer +++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.queue.AtomixQueueProducer @@ -1,2 +1,2 @@ # Generated by camel build tools - do NOT edit this file! -class=org.apache.camel.component.atomix.client.queue.AtomixQueueProducer.AtomixQueueProducerInvokeOnHeaderFactory +class=org.apache.camel.component.atomix.client.queue.AtomixQueueProducerInvokeOnHeaderFactory diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.set.AtomixSetProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.set.AtomixSetProducer index 0c00cc8..03bb4d2 100644 --- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.set.AtomixSetProducer +++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.set.AtomixSetProducer @@ -1,2 +1,2 @@ # Generated by camel build tools - do NOT edit this file! -class=org.apache.camel.component.atomix.client.set.AtomixSetProducer.AtomixSetProducerInvokeOnHeaderFactory +class=org.apache.camel.component.atomix.client.set.AtomixSetProducerInvokeOnHeaderFactory diff --git a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.value.AtomixValueProducer b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.value.AtomixValueProducer index 0bbabf2..2ca7b91 100644 --- a/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.value.AtomixValueProducer +++ b/components/camel-atomix/src/generated/resources/META-INF/services/org/apache/camel/invoke-on-header/org.apache.camel.component.atomix.client.value.AtomixValueProducer @@ -1,2 +1,2 @@ # Generated by camel build tools - do NOT edit this file! -class=org.apache.camel.component.atomix.client.value.AtomixValueProducer.AtomixValueProducerInvokeOnHeaderFactory +class=org.apache.camel.component.atomix.client.value.AtomixValueProducerInvokeOnHeaderFactory diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java index c2d766a..f39d217 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java @@ -16,35 +16,28 @@ */ package org.apache.camel.component.atomix.client; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import io.atomix.resource.Resource; import org.apache.camel.AsyncCallback; import org.apache.camel.Message; -import org.apache.camel.component.atomix.AtomixAsyncMessageProcessor; import org.apache.camel.support.HeaderSelectorProducer; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.*; public abstract class AbstractAtomixClientProducer<E extends AbstractAtomixClientEndpoint, R extends Resource> extends HeaderSelectorProducer { private static final Logger LOG = LoggerFactory.getLogger(AbstractAtomixClientProducer.class); - private final Map<String, AtomixAsyncMessageProcessor> processors; private ConcurrentMap<String, R> resources; - protected AbstractAtomixClientProducer(E endpoint) { - super(endpoint, get); - - this.processors = new HashMap<>(); + protected AbstractAtomixClientProducer(E endpoint, String defaultHeader) { + super(endpoint, RESOURCE_ACTION, defaultHeader); this.resources = new ConcurrentHashMap<>(); } @@ -82,8 +75,6 @@ public abstract class AbstractAtomixClientProducer<E extends AbstractAtomixClien return resources.computeIfAbsent(resourceName, name -> createResource(name)); } - protected abstract String getProcessorKey(Message message); - protected abstract String getResourceName(Message message); protected abstract R createResource(String name); diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java index 3d8eac4..ca79ee5 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapProducer.java @@ -26,7 +26,6 @@ import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer; import org.apache.camel.spi.InvokeOnHeader; import org.apache.camel.util.ObjectHelper; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_DEFAULT_VALUE; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_KEY; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; @@ -39,7 +38,7 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix private final AtomixMapConfiguration configuration; protected AtomixMapProducer(AtomixMapEndpoint endpoint) { - super(endpoint); + super(endpoint, endpoint.getConfiguration().getDefaultAction().name()); this.configuration = endpoint.getConfiguration(); } @@ -53,7 +52,7 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix } @InvokeOnHeader("PUT") - boolean onPut(Message message, AsyncCallback callback) throws Exception { + void onPut(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -69,12 +68,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix map.put(key, val).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("PUT_IF_ABSENT") - boolean onPutIfAbsent(Message message, AsyncCallback callback) throws Exception { + void onPutIfAbsent(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -90,12 +87,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix map.putIfAbsent(key, val).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("GET") - boolean onGet(Message message, AsyncCallback callback) throws Exception { + void onGet(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); final Object defaultValue = message.getHeader(RESOURCE_DEFAULT_VALUE); @@ -121,22 +116,18 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix result -> processResult(message, callback, result)); } } - - return false; } @InvokeOnHeader("CLEAR") - boolean onClear(Message message, AsyncCallback callback) throws Exception { + void onClear(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); map.clear().thenAccept( result -> processResult(message, callback, result)); - - return false; } @InvokeOnHeader("SIZE") - boolean onSize(Message message, AsyncCallback callback) throws Exception { + void onSize(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -148,12 +139,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix map.size().thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("IS_EMPTY") - boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception { + void onIsEmpty(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -165,12 +154,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix map.isEmpty().thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("ENTRY_SET") - boolean onEntrySet(Message message, AsyncCallback callback) throws Exception { + void onEntrySet(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -182,12 +169,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix map.entrySet().thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("VALUES") - boolean onValues(Message message, AsyncCallback callback) throws Exception { + void onValues(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -199,12 +184,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix map.values().thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("CONTAINS_KEY") - boolean onContainsKey(Message message, AsyncCallback callback) throws Exception { + void onContainsKey(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -219,12 +202,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix map.containsKey(key).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("CONTAINS_VALUE") - boolean onContainsValue(Message message, AsyncCallback callback) throws Exception { + void onContainsValue(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -239,12 +220,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix map.containsValue(value).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("REMOVE") - boolean onRemove(Message message, AsyncCallback callback) throws Exception { + void onRemove(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -258,12 +237,10 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix map.remove(key).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("REPLACE") - boolean onReplace(Message message, AsyncCallback callback) throws Exception { + void onReplace(Message message, AsyncCallback callback) throws Exception { final DistributedMap<Object, Object> map = getResource(message); final long ttl = getResourceTtl(message); final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); @@ -290,8 +267,6 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix result -> processResult(message, callback, result)); } } - - return false; } // ********************************* @@ -299,11 +274,6 @@ public final class AtomixMapProducer extends AbstractAtomixClientProducer<Atomix // ********************************* @Override - protected String getProcessorKey(Message message) { - return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); - } - - @Override protected String getResourceName(Message message) { return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); } diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java index 1dacf9e..a062824 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingProducer.java @@ -28,7 +28,6 @@ import org.apache.camel.util.ObjectHelper; import static org.apache.camel.component.atomix.client.AtomixClientConstants.BROADCAST_TYPE; import static org.apache.camel.component.atomix.client.AtomixClientConstants.CHANNEL_NAME; import static org.apache.camel.component.atomix.client.AtomixClientConstants.MEMBER_NAME; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE; import static org.apache.camel.component.atomix.client.messaging.AtomixMessaging.OPTIONS_BROADCAST; @@ -39,7 +38,7 @@ public final class AtomixMessagingProducer extends AbstractAtomixClientProducer< private final AtomixMessagingConfiguration configuration; protected AtomixMessagingProducer(AtomixMessagingEndpoint endpoint) { - super(endpoint); + super(endpoint, endpoint.getConfiguration().getDefaultAction().name()); this.configuration = endpoint.getConfiguration(); } @@ -48,7 +47,7 @@ public final class AtomixMessagingProducer extends AbstractAtomixClientProducer< // ********************************* @InvokeOnHeader("DIRECT") - boolean onDirect(Message message, AsyncCallback callback) throws Exception { + void onDirect(Message message, AsyncCallback callback) throws Exception { final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); final String memberName = message.getHeader(MEMBER_NAME, configuration::getMemberName, String.class); final String channelName = message.getHeader(CHANNEL_NAME, configuration::getChannelName, String.class); @@ -63,12 +62,10 @@ public final class AtomixMessagingProducer extends AbstractAtomixClientProducer< producer.send(value).thenAccept( result -> processResult(message, callback, result)); - - return false; } @InvokeOnHeader("BROADCAST") - boolean onBroadcast(Message message, AsyncCallback callback) throws Exception { + void onBroadcast(Message message, AsyncCallback callback) throws Exception { final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); final String channelName = message.getHeader(CHANNEL_NAME, configuration::getChannelName, String.class); final AtomixMessaging.BroadcastType type @@ -86,8 +83,6 @@ public final class AtomixMessagingProducer extends AbstractAtomixClientProducer< producer.send(value).thenRun( () -> processResult(message, callback, null)); - - return false; } // ********************************* @@ -95,11 +90,6 @@ public final class AtomixMessagingProducer extends AbstractAtomixClientProducer< // ********************************* @Override - protected String getProcessorKey(Message message) { - return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); - } - - @Override protected String getResourceName(Message message) { return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); } diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java index 4e2972e..4d89c2f 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/multimap/AtomixMultiMapProducer.java @@ -26,7 +26,6 @@ import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer; import org.apache.camel.spi.InvokeOnHeader; import org.apache.camel.util.ObjectHelper; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_KEY; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY; @@ -37,7 +36,7 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A private final AtomixMultiMapConfiguration configuration; protected AtomixMultiMapProducer(AtomixMultiMapEndpoint endpoint) { - super(endpoint); + super(endpoint, endpoint.getConfiguration().getDefaultAction().name()); this.configuration = endpoint.getConfiguration(); } @@ -51,7 +50,7 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A } @InvokeOnHeader("PUT") - boolean onPut(Message message, AsyncCallback callback) throws Exception { + void onPut(Message message, AsyncCallback callback) throws Exception { final DistributedMultiMap<Object, Object> map = getResource(message); final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -67,12 +66,10 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A map.put(key, val).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("GET") - boolean onGet(Message message, AsyncCallback callback) throws Exception { + void onGet(Message message, AsyncCallback callback) throws Exception { final DistributedMultiMap<Object, Object> map = getResource(message); final Object key = message.getHeader(RESOURCE_KEY, configuration::getKey, Object.class); final ReadConsistency consistency @@ -87,22 +84,18 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A map.get(key).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("CLEAR") - boolean onClear(Message message, AsyncCallback callback) throws Exception { + void onClear(Message message, AsyncCallback callback) throws Exception { final DistributedMultiMap<Object, Object> map = getResource(message); map.clear().thenAccept( result -> processResult(message, callback, result)); - - return false; } @InvokeOnHeader("SIZE") - boolean onSize(Message message, AsyncCallback callback) throws Exception { + void onSize(Message message, AsyncCallback callback) throws Exception { final DistributedMultiMap<Object, Object> map = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -125,12 +118,10 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A result -> processResult(message, callback, result)); } } - - return false; } @InvokeOnHeader("IS_EMPTY") - boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception { + void onIsEmpty(Message message, AsyncCallback callback) throws Exception { final DistributedMultiMap<Object, Object> map = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -142,12 +133,10 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A map.isEmpty().thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("CONTAINS_KEY") - boolean onContainsKey(Message message, AsyncCallback callback) throws Exception { + void onContainsKey(Message message, AsyncCallback callback) throws Exception { final DistributedMultiMap<Object, Object> map = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -162,56 +151,48 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A map.containsKey(key).thenAccept( result -> processResult(message, callback, result)); } + } + + @InvokeOnHeader("CONTAINS_VALUE") + void onContainsValue(Message message, AsyncCallback callback) throws Exception { + final DistributedMultiMap<Object, Object> map = getResource(message); + final ReadConsistency consistency + = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); - return false; + ObjectHelper.notNull(value, RESOURCE_VALUE); + + if (consistency != null) { + map.containsValue(value, consistency).thenAccept( + result -> processResult(message, callback, result)); + } else { + map.containsValue(value).thenAccept( + result -> processResult(message, callback, result)); + } } - // @InvokeOnHeader("CONTAINS_VALUE") - // boolean onContainsValue(Message message, AsyncCallback callback) throws Exception { - // final DistributedMultiMap<Object, Object> map = getResource(message); - // final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); - // final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); - // - // ObjectHelper.notNull(value, RESOURCE_VALUE); - // - // if (consistency != null) { - // map.containsValue(value, consistency).thenAccept( - // result -> processResult(message, callback, result) - // ); - // } else { - // map.containsValue(value).thenAccept( - // result -> processResult(message, callback, result) - // ); - // } - // - // return false; - // } - - // @InvokeOnHeader("CONTAINS_ENTRY") - // boolean onContainsEntry(Message message, AsyncCallback callback) throws Exception { - // final DistributedMultiMap<Object, Object> map = getResource(message); - // final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); - // final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); - // final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); - // - // ObjectHelper.notNull(key, RESOURCE_VALUE); - // ObjectHelper.notNull(value, RESOURCE_KEY); - // - // if (consistency != null) { - // map.containsEntry(key, value, consistency).thenAccept( - // result -> processResult(message, callback, result) - // ); - // } else { - // map.containsEntry(key, value).thenAccept( - // result -> processResult(message, callback, result) - // ); - // } - // - // return false; - // } + @InvokeOnHeader("CONTAINS_ENTRY") + void onContainsEntry(Message message, AsyncCallback callback) throws Exception { + final DistributedMultiMap<Object, Object> map = getResource(message); + final ReadConsistency consistency + = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(key, RESOURCE_VALUE); + ObjectHelper.notNull(value, RESOURCE_KEY); + + if (consistency != null) { + map.containsEntry(key, value, consistency).thenAccept( + result -> processResult(message, callback, result)); + } else { + map.containsEntry(key, value).thenAccept( + result -> processResult(message, callback, result)); + } + } @InvokeOnHeader("REMOVE") - boolean onRemove(Message message, AsyncCallback callback) throws Exception { + void onRemove(Message message, AsyncCallback callback) throws Exception { final DistributedMultiMap<Object, Object> map = getResource(message); final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -225,12 +206,10 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A map.remove(key).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("REMOVE_VALUE") - boolean onRemoveValue(Message message, AsyncCallback callback) throws Exception { + void onRemoveValue(Message message, AsyncCallback callback) throws Exception { final DistributedMultiMap<Object, Object> map = getResource(message); final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -238,8 +217,6 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A map.removeValue(value).thenAccept( result -> processResult(message, callback, result)); - - return false; } // ********************************* @@ -247,11 +224,6 @@ public final class AtomixMultiMapProducer extends AbstractAtomixClientProducer<A // ********************************* @Override - protected String getProcessorKey(Message message) { - return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); - } - - @Override protected String getResourceName(Message message) { return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); } diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java index 5c3f21a..2415fdb 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueProducer.java @@ -24,7 +24,6 @@ import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer; import org.apache.camel.spi.InvokeOnHeader; import org.apache.camel.util.ObjectHelper; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE; @@ -33,7 +32,7 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom private final AtomixQueueConfiguration configuration; protected AtomixQueueProducer(AtomixQueueEndpoint endpoint) { - super(endpoint); + super(endpoint, endpoint.getConfiguration().getDefaultAction().name()); this.configuration = endpoint.getConfiguration(); } @@ -42,7 +41,7 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom // ********************************* @InvokeOnHeader("ADD") - boolean onAdd(Message message, AsyncCallback callback) throws Exception { + void onAdd(Message message, AsyncCallback callback) throws Exception { final DistributedQueue<Object> queue = getResource(message); final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -50,12 +49,10 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom queue.add(val).thenAccept( result -> processResult(message, callback, result)); - - return false; } @InvokeOnHeader("OFFER") - boolean onOffer(Message message, AsyncCallback callback) throws Exception { + void onOffer(Message message, AsyncCallback callback) throws Exception { final DistributedQueue<Object> queue = getResource(message); final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -63,42 +60,34 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom queue.offer(val).thenAccept( result -> processResult(message, callback, result)); - - return false; } @InvokeOnHeader("PEEK") - boolean onPeek(Message message, AsyncCallback callback) throws Exception { + void onPeek(Message message, AsyncCallback callback) throws Exception { final DistributedQueue<Object> queue = getResource(message); queue.peek().thenAccept( result -> processResult(message, callback, result)); - - return false; } @InvokeOnHeader("POLL") - boolean onPoll(Message message, AsyncCallback callback) throws Exception { + void onPoll(Message message, AsyncCallback callback) throws Exception { final DistributedQueue<Object> queue = getResource(message); queue.poll().thenAccept( result -> processResult(message, callback, result)); - - return false; } @InvokeOnHeader("CLEAR") - boolean onClear(Message message, AsyncCallback callback) throws Exception { + void onClear(Message message, AsyncCallback callback) throws Exception { final DistributedQueue<Object> queue = getResource(message); queue.clear().thenAccept( result -> processResult(message, callback, result)); - - return false; } @InvokeOnHeader("CONTAINS") - boolean onContains(Message message, AsyncCallback callback) throws Exception { + void onContains(Message message, AsyncCallback callback) throws Exception { final DistributedQueue<Object> queue = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -113,12 +102,10 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom queue.contains(value).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("IS_EMPTY") - boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception { + void onIsEmpty(Message message, AsyncCallback callback) throws Exception { final DistributedQueue<Object> queue = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -130,12 +117,10 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom queue.isEmpty().thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("REMOVE") - boolean onRemove(Message message, AsyncCallback callback) throws Exception { + void onRemove(Message message, AsyncCallback callback) throws Exception { final DistributedQueue<Object> queue = getResource(message); final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -146,12 +131,10 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom queue.remove(value).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("SIZE") - boolean onSize(Message message, AsyncCallback callback) throws Exception { + void onSize(Message message, AsyncCallback callback) throws Exception { final DistributedQueue<Object> queue = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -163,8 +146,6 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom queue.size().thenAccept( result -> processResult(message, callback, result)); } - - return false; } // ********************************* @@ -172,11 +153,6 @@ public final class AtomixQueueProducer extends AbstractAtomixClientProducer<Atom // ********************************* @Override - protected String getProcessorKey(Message message) { - return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); - } - - @Override protected String getResourceName(Message message) { return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); } diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java index fef2eea..f10c1e4 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetProducer.java @@ -26,7 +26,6 @@ import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer; import org.apache.camel.spi.InvokeOnHeader; import org.apache.camel.util.ObjectHelper; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL; @@ -36,7 +35,7 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix private final AtomixSetConfiguration configuration; protected AtomixSetProducer(AtomixSetEndpoint endpoint) { - super(endpoint); + super(endpoint, endpoint.getConfiguration().getDefaultAction().name()); this.configuration = endpoint.getConfiguration(); } @@ -50,7 +49,7 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix } @InvokeOnHeader("ADD") - boolean onAdd(Message message, AsyncCallback callback) throws Exception { + void onAdd(Message message, AsyncCallback callback) throws Exception { final DistributedSet<Object> set = getResource(message); final long ttl = getResourceTtl(message); final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -64,22 +63,18 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix set.add(val).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("CLEAR") - boolean onClear(Message message, AsyncCallback callback) throws Exception { + void onClear(Message message, AsyncCallback callback) throws Exception { final DistributedSet<Object> set = getResource(message); set.clear().thenAccept( result -> processResult(message, callback, result)); - - return false; } @InvokeOnHeader("CONTAINS") - boolean onContains(Message message, AsyncCallback callback) throws Exception { + void onContains(Message message, AsyncCallback callback) throws Exception { final DistributedSet<Object> set = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -94,12 +89,10 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix set.contains(value).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("IS_EMPTY") - boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception { + void onIsEmpty(Message message, AsyncCallback callback) throws Exception { final DistributedSet<Object> set = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -111,12 +104,10 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix set.isEmpty().thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("REMOVE") - boolean onRemove(Message message, AsyncCallback callback) throws Exception { + void onRemove(Message message, AsyncCallback callback) throws Exception { final DistributedSet<Object> set = getResource(message); final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -124,12 +115,10 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix set.remove(value).thenAccept( result -> processResult(message, callback, result)); - - return false; } @InvokeOnHeader("SIZE") - boolean onSize(Message message, AsyncCallback callback) throws Exception { + void onSize(Message message, AsyncCallback callback) throws Exception { final DistributedSet<Object> set = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -141,8 +130,6 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix set.size().thenAccept( result -> processResult(message, callback, result)); } - - return false; } // ********************************* @@ -150,11 +137,6 @@ public final class AtomixSetProducer extends AbstractAtomixClientProducer<Atomix // ********************************* @Override - protected String getProcessorKey(Message message) { - return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); - } - - @Override protected String getResourceName(Message message) { return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); } diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java index f5a5dfa..f827ecd 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueProducer.java @@ -26,7 +26,6 @@ import org.apache.camel.component.atomix.client.AbstractAtomixClientProducer; import org.apache.camel.spi.InvokeOnHeader; import org.apache.camel.util.ObjectHelper; -import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_OLD_VALUE; import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY; @@ -37,7 +36,7 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom private final AtomixValueConfiguration configuration; protected AtomixValueProducer(AtomixValueEndpoint endpoint) { - super(endpoint); + super(endpoint, endpoint.getConfiguration().getDefaultAction().name()); this.configuration = endpoint.getConfiguration(); } @@ -51,7 +50,7 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom } @InvokeOnHeader("SET") - boolean onSet(Message message, AsyncCallback callback) throws Exception { + void onSet(Message message, AsyncCallback callback) throws Exception { final DistributedValue<Object> value = getResource(message); final long ttl = getResourceTtl(message); final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -65,12 +64,10 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom value.set(val).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("GET") - boolean onGet(Message message, AsyncCallback callback) throws Exception { + void onGet(Message message, AsyncCallback callback) throws Exception { final DistributedValue<Object> value = getResource(message); final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); @@ -82,12 +79,10 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom value.get().thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("GET_AND_SET") - boolean onGetAndSet(Message message, AsyncCallback callback) throws Exception { + void onGetAndSet(Message message, AsyncCallback callback) throws Exception { final DistributedValue<Object> value = getResource(message); final long ttl = getResourceTtl(message); final Object val = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -101,12 +96,10 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom value.getAndSet(val).thenAccept( result -> processResult(message, callback, result)); } - - return false; } @InvokeOnHeader("COMPARE_AND_SET") - boolean onCompareAndSet(Message message, AsyncCallback callback) throws Exception { + void onCompareAndSet(Message message, AsyncCallback callback) throws Exception { final DistributedValue<Object> value = getResource(message); final long ttl = getResourceTtl(message); final Object newVal = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); @@ -122,8 +115,6 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom value.compareAndSet(oldVal, newVal).thenAccept( result -> processResult(message, callback, result)); } - - return false; } // ********************************* @@ -131,11 +122,6 @@ public final class AtomixValueProducer extends AbstractAtomixClientProducer<Atom // ********************************* @Override - protected String getProcessorKey(Message message) { - return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, String.class); - } - - @Override protected String getResourceName(Message message) { return message.getHeader(RESOURCE_NAME, getAtomixEndpoint()::getResourceName, String.class); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/HeaderSelectorProducer.java b/core/camel-support/src/main/java/org/apache/camel/support/HeaderSelectorProducer.java index 523923e..fa65e9a 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/HeaderSelectorProducer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/HeaderSelectorProducer.java @@ -162,6 +162,7 @@ public abstract class HeaderSelectorProducer extends DefaultAsyncProducer implem @Override public boolean process(Exchange exchange, AsyncCallback callback) { + boolean sync = true; try { String header = headerSupplier.get(); String action = exchange.getIn().getHeader(header, String.class); @@ -178,17 +179,28 @@ public abstract class HeaderSelectorProducer extends DefaultAsyncProducer implem if (answer == null && parentStrategy != null) { answer = parentStrategy.invoke(target, action, exchange, callback); } - LOGGER.trace("Invoked @InvokeOnHeader method: {} -> {}", action, answer); + if (answer == callback) { + // okay it was an async invoked so we should return false + sync = false; + answer = null; + } + if (sync) { + LOGGER.trace("Invoked @InvokeOnHeader method: {} -> {}", action, answer); + } else { + LOGGER.trace("Invoked @InvokeOnHeader method: {} is continuing asynchronously", action); + } if (answer != null) { exchange.getMessage().setBody(answer); } - } catch (Exception e) { exchange.setException(e); } - callback.done(true); - return true; + if (sync) { + // callback was not in use, so we must done it here + callback.done(true); + } + return sync; } } diff --git a/tooling/maven/camel-package-maven-plugin/src/main/java/org/apache/camel/maven/packaging/GenerateInvokeOnHeaderMojo.java b/tooling/maven/camel-package-maven-plugin/src/main/java/org/apache/camel/maven/packaging/GenerateInvokeOnHeaderMojo.java index a78b90a..35e3eda 100644 --- a/tooling/maven/camel-package-maven-plugin/src/main/java/org/apache/camel/maven/packaging/GenerateInvokeOnHeaderMojo.java +++ b/tooling/maven/camel-package-maven-plugin/src/main/java/org/apache/camel/maven/packaging/GenerateInvokeOnHeaderMojo.java @@ -221,22 +221,30 @@ public class GenerateInvokeOnHeaderMojo extends AbstractGeneratorMojo { if (!models.isEmpty()) { w.write(" switch (key) {\n"); for (InvokeOnHeaderModel option : models) { + boolean sync = true; String invoke = "target." + option.getMethodName() + "("; if (!option.getArgs().isEmpty()) { StringJoiner sj = new StringJoiner(", "); for (String arg : option.getArgs()) { String ba = bindArg(arg); + // if callback is in use then we are no long synchronous + sync &= !ba.equals("callback"); sj.add(ba); } invoke += sj.toString(); } + String ret = "null"; + if (!sync) { + // return the callback instance in async mode to signal that callback are in use + ret = "callback"; + } invoke += ")"; if (!option.getKey().toLowerCase().equals(option.getKey())) { w.write(String.format(" case \"%s\":\n", option.getKey().toLowerCase())); } - if (option.getReturnType().equals("VOID")) { - w.write(String.format(" case \"%s\": %s; return null;\n", option.getKey(), invoke)); + if (!sync || option.getReturnType().equals("VOID")) { + w.write(String.format(" case \"%s\": %s; return %s;\n", option.getKey(), invoke, ret)); } else { w.write(String.format(" case \"%s\": return %s;\n", option.getKey(), invoke)); }