Component docs
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bf5cea17 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bf5cea17 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bf5cea17 Branch: refs/heads/master Commit: bf5cea17182d3bb33ac31b380d35a2c284688d26 Parents: e6c3bb2 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu May 7 15:35:10 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu May 7 15:35:10 2015 +0200 ---------------------------------------------------------------------- .../component/disruptor/DisruptorComponent.java | 29 +++++++++- .../component/disruptor/DisruptorEndpoint.java | 61 +++++++++++++++----- 2 files changed, 72 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bf5cea17/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java index 112c158..3ce8c17 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java @@ -172,6 +172,9 @@ public class DisruptorComponent extends UriEndpointComponent { return defaultConcurrentConsumers; } + /** + * To configure the default number of concurrent consumers + */ public void setDefaultConcurrentConsumers(final int defaultConcurrentConsumers) { this.defaultConcurrentConsumers = defaultConcurrentConsumers; } @@ -180,6 +183,9 @@ public class DisruptorComponent extends UriEndpointComponent { return defaultMultipleConsumers; } + /** + * To configure the default value for multiple consumers + */ public void setDefaultMultipleConsumers(final boolean defaultMultipleConsumers) { this.defaultMultipleConsumers = defaultMultipleConsumers; } @@ -188,6 +194,11 @@ public class DisruptorComponent extends UriEndpointComponent { return defaultProducerType; } + /** + * To configure the default value for DisruptorProducerType + * <p/> + * The default value is Multi. + */ public void setDefaultProducerType(final DisruptorProducerType defaultProducerType) { this.defaultProducerType = defaultProducerType; } @@ -196,6 +207,11 @@ public class DisruptorComponent extends UriEndpointComponent { return defaultWaitStrategy; } + /** + * To configure the default value for DisruptorWaitStrategy + * <p/> + * The default value is Blocking. + */ public void setDefaultWaitStrategy(final DisruptorWaitStrategy defaultWaitStrategy) { this.defaultWaitStrategy = defaultWaitStrategy; } @@ -204,22 +220,31 @@ public class DisruptorComponent extends UriEndpointComponent { return defaultBlockWhenFull; } + /** + * To configure the default value for block when full + * <p/> + * The default value is true. + */ public void setDefaultBlockWhenFull(boolean defaultBlockWhenFull) { this.defaultBlockWhenFull = defaultBlockWhenFull; } + /** + * To configure the ring buffer size + */ @Deprecated public void setQueueSize(final int size) { - LOGGER.warn("Using deprecated queueSize parameter for SEDA compatibility, use bufferSize instead"); queueSize = size; } @Deprecated public int getQueueSize() { - LOGGER.warn("Using deprecated queueSize parameter for SEDA compatibility, use bufferSize instead"); return queueSize; } + /** + * To configure the ring buffer size + */ public void setBufferSize(final int size) { bufferSize = size; } http://git-wip-us.apache.org/repos/asf/camel/blob/bf5cea17/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java index 8132472..ef83f41 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java @@ -35,6 +35,7 @@ import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; @@ -52,28 +53,27 @@ public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsum public static final String DISRUPTOR_IGNORE_EXCHANGE = "disruptor.ignoreExchange"; private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorEndpoint.class); - @UriPath(description = "Name of queue") + private final Set<DisruptorProducer> producers = new CopyOnWriteArraySet<DisruptorProducer>(); + private final Set<DisruptorConsumer> consumers = new CopyOnWriteArraySet<DisruptorConsumer>(); + private final DisruptorReference disruptorReference; + + @UriPath(description = "Name of queue") @Metadata(required = "true") private String name; - @UriParam(defaultValue = "1") + @UriParam(label = "consumer", defaultValue = "1") private final int concurrentConsumers; - @UriParam + @UriParam(label = "consumer") private final boolean multipleConsumers; - @UriParam(defaultValue = "IfReplyExpected") + @UriParam(label = "producer", defaultValue = "IfReplyExpected") private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected; - @UriParam(defaultValue = "30000") + @UriParam(label = "producer", defaultValue = "30000") private long timeout = 30000; - @UriParam + @UriParam(label = "producer") private boolean blockWhenFull; - @UriParam(defaultValue = "Blocking") + @UriParam(label = "consumer", defaultValue = "Blocking") private DisruptorWaitStrategy waitStrategy; - @UriParam(defaultValue = "Multi") + @UriParam(label = "producer", defaultValue = "Multi") private DisruptorProducerType producerType; - private final Set<DisruptorProducer> producers = new CopyOnWriteArraySet<DisruptorProducer>(); - private final Set<DisruptorConsumer> consumers = new CopyOnWriteArraySet<DisruptorConsumer>(); - - private final DisruptorReference disruptorReference; - public DisruptorEndpoint(final String endpointUri, final Component component, final DisruptorReference disruptorReference, final int concurrentConsumers, final boolean multipleConsumers, boolean blockWhenFull) throws Exception { @@ -130,6 +130,9 @@ public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsum return getDisruptor().getPendingExchangeCount(); } + /** + * Number of concurrent threads processing exchanges. + */ @ManagedAttribute(description = "Number of concurrent consumers") public int getConcurrentConsumers() { return concurrentConsumers; @@ -139,6 +142,11 @@ public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsum return waitForTaskToComplete; } + /** + * Option to specify whether the caller should wait for the async task to complete or not before continuing. + * The following three options are supported: Always, Never or IfReplyExpected. The first two values are self-explanatory. + * The last value, IfReplyExpected, will only wait if the message is Request Reply based. + */ public void setWaitForTaskToComplete(final WaitForTaskToComplete waitForTaskToComplete) { this.waitForTaskToComplete = waitForTaskToComplete; } @@ -148,10 +156,20 @@ public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsum return timeout; } + /** + * Timeout (in milliseconds) before a producer will stop waiting for an asynchronous task to complete. + * You can disable timeout by using 0 or a negative value. + */ public void setTimeout(final long timeout) { this.timeout = timeout; } + /** + * Specifies whether multiple consumers are allowed. + * If enabled, you can use Disruptor for Publish-Subscribe messaging. + * That is, you can send a message to the queue and have each consumer receive a copy of the message. + * When enabled, this option should be specified on every consumer endpoint. + */ @ManagedAttribute public boolean isMultipleConsumers() { return multipleConsumers; @@ -182,6 +200,11 @@ public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsum return blockWhenFull; } + /** + * Whether a thread that sends messages to a full Disruptor will block until the ringbuffer's capacity is no longer exhausted. + * By default, the calling thread will block and wait until the message can be accepted. + * By disabling this option, an exception will be thrown stating that the queue is full. + */ public void setBlockWhenFull(boolean blockWhenFull) { this.blockWhenFull = blockWhenFull; } @@ -190,6 +213,10 @@ public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsum return waitStrategy; } + /** + * Defines the strategy used by consumer threads to wait on new exchanges to be published. + * The options allowed are:Blocking, Sleeping, BusySpin and Yielding. + */ public void setWaitStrategy(DisruptorWaitStrategy waitStrategy) { this.waitStrategy = waitStrategy; } @@ -198,6 +225,11 @@ public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsum return producerType; } + /** + * Defines the producers allowed on the Disruptor. + * The options allowed are: Multi to allow multiple producers and Single to enable certain optimizations only + * allowed when one concurrent producer (on one thread or otherwise synchronized) is active. + */ public void setProducerType(DisruptorProducerType producerType) { this.producerType = producerType; } @@ -299,8 +331,6 @@ public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsum /** * Called by DisruptorProducers to publish new exchanges on the RingBuffer, blocking when full - * - * @param exchange */ void publish(final Exchange exchange) throws DisruptorNotStartedException { disruptorReference.publish(exchange); @@ -310,7 +340,6 @@ public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsum * Called by DisruptorProducers to publish new exchanges on the RingBuffer, throwing InsufficientCapacityException * when full * - * @param exchange * @throws InsufficientCapacityException when the Ringbuffer is full. */ void tryPublish(final Exchange exchange) throws DisruptorNotStartedException, InsufficientCapacityException {