CAMEL-9445 camel-ignite: Adjust endpoint metadata and add logs to consumers.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/99c55941 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/99c55941 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/99c55941 Branch: refs/heads/master Commit: 99c559419847d1d635fbe4c6af909aea121996f2 Parents: d26af3d Author: Raul Kripalani <ra...@apache.org> Authored: Wed Dec 23 15:55:55 2015 +0000 Committer: Raul Kripalani <ra...@apache.org> Committed: Wed Dec 23 15:55:55 2015 +0000 ---------------------------------------------------------------------- .../ignite/cache/IgniteCacheContinuousQueryConsumer.java | 10 ++++++++++ .../component/ignite/compute/IgniteComputeEndpoint.java | 2 +- .../component/ignite/events/IgniteEventsConsumer.java | 9 +++++++++ .../component/ignite/events/IgniteEventsEndpoint.java | 2 +- .../ignite/messaging/IgniteMessagingConsumer.java | 10 ++++++++++ .../ignite/messaging/IgniteMessagingEndpoint.java | 6 +++++- .../ignite/messaging/IgniteMessagingProducer.java | 3 +++ .../ignite/messaging/IgniteMessagingSendMode.java | 3 +++ 8 files changed, 42 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java index e4c7302..cb06424 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.ignite.cache; +import java.util.Arrays; + import javax.cache.Cache.Entry; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; @@ -59,6 +61,8 @@ public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer { launchContinuousQuery(); + LOG.info("Started Ignite Cache Continuous Query consumer for cache {} with query:Â {}.", cache.getName(), endpoint.getQuery()); + maybeFireExistingQueryResults(); } @@ -96,6 +100,10 @@ public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer { continuousQuery.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> events) throws CacheEntryListenerException { + if (LOG.isTraceEnabled()) { + LOG.info("Processing Continuous Query event(s):Â {}.", events); + } + if (!endpoint.isOneExchangePerUpdate()) { fireGroupedExchange(events); return; @@ -119,6 +127,8 @@ public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer { super.doStop(); cursor.close(); + + LOG.info("Stopped Ignite Cache Continuous Query consumer for cache {} with query:Â {}.", cache.getName(), endpoint.getQuery()); } private void fireSingleExchange(CacheEntryEvent<? extends Object, ? extends Object> entry) { http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java index 1dc0663..d6a3eb2 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java @@ -33,7 +33,7 @@ import org.apache.ignite.IgniteCompute; /** * Ignite Compute endpoint. */ -@UriEndpoint(scheme = "ignite:compute", title = "Ignite Compute", syntax = "ignite:compute:endpointId", label = "nosql,cache,compute", producerOnly = true) +@UriEndpoint(scheme = "ignite:compute", title = "Ignite Compute", syntax = "ignite:compute:[endpointId]", label = "nosql,cache,compute", producerOnly = true) public class IgniteComputeEndpoint extends AbstractIgniteEndpoint { @UriParam http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java index 5d63611..7df33ac 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.ignite.events; +import java.util.Arrays; + import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; @@ -48,6 +50,9 @@ public class IgniteEventsConsumer extends DefaultConsumer { Message in = exchange.getIn(); in.setBody(event); try { + if (LOG.isTraceEnabled()) { + LOG.trace("Processing Ignite Event: {}.", event); + } getAsyncProcessor().process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { @@ -80,6 +85,8 @@ public class IgniteEventsConsumer extends DefaultConsumer { } events.localListen(predicate, eventTypes); + + LOG.info("Started local Ignite Events consumer for events: {}.", Arrays.asList(eventTypes)); } @Override @@ -87,6 +94,8 @@ public class IgniteEventsConsumer extends DefaultConsumer { super.doStop(); events.stopLocalListen(predicate, eventTypes); + + LOG.info("Stopped local Ignite Events consumer for events: {}.", Arrays.asList(eventTypes)); } } http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java index 6237ad8..1d48a30 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java @@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory; /** * Ignite Events endpoint. Only supports consumers. */ -@UriEndpoint(scheme = "ignite:events", title = "Ignite Events", syntax = "ignite:events:endpointId", label = "nosql,cache,compute,messaging,data", +@UriEndpoint(scheme = "ignite:events", title = "Ignite Events", syntax = "ignite:events:[endpointId]", label = "nosql,cache,compute,messaging,data", consumerOnly = true, consumerClass = IgniteEventsConsumer.class) public class IgniteEventsEndpoint extends AbstractIgniteEndpoint { http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java index 6579437..03a8c67 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java @@ -29,6 +29,9 @@ import org.apache.ignite.lang.IgniteBiPredicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Ignite Messaging consumer. + */ public class IgniteMessagingConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(IgniteMessagingConsumer.class); @@ -47,6 +50,9 @@ public class IgniteMessagingConsumer extends DefaultConsumer { in.setHeader(IgniteConstants.IGNITE_MESSAGING_TOPIC, endpoint.getTopic()); in.setHeader(IgniteConstants.IGNITE_MESSAGING_UUID, uuid); try { + if (LOG.isTraceEnabled()) { + LOG.trace("Processing Ignite message for subscription {} with payload {}.", uuid, payload); + } getProcessor().process(exchange); } catch (Exception e) { LOG.error(String.format("Exception while processing Ignite Message from topic %s", endpoint.getTopic()), e); @@ -66,6 +72,8 @@ public class IgniteMessagingConsumer extends DefaultConsumer { super.doStart(); messaging.localListen(endpoint.getTopic(), predicate); + + LOG.info("Started Ignite Messaging consumer for topic {}.", endpoint.getTopic()); } @Override @@ -73,6 +81,8 @@ public class IgniteMessagingConsumer extends DefaultConsumer { super.doStop(); messaging.stopLocalListen(endpoint.getTopic(), predicate); + + LOG.info("Stopped Ignite Messaging consumer for topic {}.", endpoint.getTopic()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java index 2277a11..ca375f2 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java @@ -31,7 +31,11 @@ import org.apache.camel.spi.UriParam; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteMessaging; -@UriEndpoint(scheme = "ignite:messaging", title = "Ignite Messaging", syntax = "ignite:messaging:[topic]", label = "nosql,cache,messaging") +/** + * Ignite Messaging endpoint. + */ +@UriEndpoint(scheme = "ignite:messaging", title = "Ignite Messaging", syntax = "ignite:messaging:[topic]", label = "nosql,cache,messaging", + consumerClass = IgniteMessagingConsumer.class) public class IgniteMessagingEndpoint extends AbstractIgniteEndpoint { @UriParam http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java index c4946b8..5a33c43 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java @@ -28,6 +28,9 @@ import org.apache.camel.util.MessageHelper; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteMessaging; +/** + * Ignite Messaging producer. + */ public class IgniteMessagingProducer extends DefaultAsyncProducer { private IgniteMessagingEndpoint endpoint; http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java index 0bf472c..a1c8900 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.ignite.messaging; +/** + * Enum for Ignite Messaging send modes. + */ public enum IgniteMessagingSendMode { ORDERED, UNORDERED