CAMEL-7999: More components include documentation
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ef88dcd8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ef88dcd8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ef88dcd8 Branch: refs/heads/master Commit: ef88dcd8173feb88156d066761b7af43fd1779ec Parents: 1faf266 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jan 6 09:06:34 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jan 6 09:06:34 2015 +0100 ---------------------------------------------------------------------- .../component/disruptor/DisruptorComponent.java | 45 +++++++++----------- .../component/disruptor/DisruptorEndpoint.java | 19 ++++++++- .../component/disruptor/DisruptorReference.java | 8 +++- 3 files changed, 44 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ef88dcd8/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java index 81ffa3c..85db091 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java @@ -14,14 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.disruptor; import java.util.HashMap; import java.util.Map; import org.apache.camel.Endpoint; -import org.apache.camel.impl.DefaultComponent; +import org.apache.camel.impl.UriEndpointComponent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +29,7 @@ import org.slf4j.LoggerFactory; * for asynchronous SEDA exchanges on an * <a href="https://github.com/LMAX-Exchange/disruptor">LMAX Disruptor</a> within a CamelContext */ -public class DisruptorComponent extends DefaultComponent { +public class DisruptorComponent extends UriEndpointComponent { public static final int DEFAULT_BUFFER_SIZE = 1024; public static final int MAX_CONCURRENT_CONSUMERS = 500; @@ -50,13 +49,17 @@ public class DisruptorComponent extends DefaultComponent { //synchronized access guarded by this private final Map<String, DisruptorReference> disruptors = new HashMap<String, DisruptorReference>(); + public DisruptorComponent() { + super(DisruptorEndpoint.class); + } + @Override protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters) throws Exception { - final int concurrentConsumers = getAndRemoveParameter(parameters, "concurrentConsumers", - Integer.class, defaultConcurrentConsumers); - final boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", - Boolean.class, true); + + final int concurrentConsumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers); + final boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", Boolean.class, true); + if (limitConcurrentConsumers && concurrentConsumers > MAX_CONCURRENT_CONSUMERS) { throw new IllegalArgumentException( "The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than " @@ -79,32 +82,22 @@ public class DisruptorComponent extends DefaultComponent { // Check if the pollTimeout argument is set (may be the case if Disruptor component is used as drop-in // replacement for the SEDA component. if (parameters.containsKey("pollTimeout")) { - throw new IllegalArgumentException( - "The 'pollTimeout' argument is not supported by the Disruptor component"); + throw new IllegalArgumentException("The 'pollTimeout' argument is not supported by the Disruptor component"); } - final DisruptorWaitStrategy waitStrategy = getAndRemoveParameter(parameters, "waitStrategy", - DisruptorWaitStrategy.class, defaultWaitStrategy); - - final DisruptorProducerType producerType = getAndRemoveParameter(parameters, "producerType", - DisruptorProducerType.class, defaultProducerType); - - final boolean multipleConsumers = getAndRemoveParameter(parameters, "multipleConsumers", - boolean.class, defaultMultipleConsumers); - - final boolean blockWhenFull = getAndRemoveParameter(parameters, "blockWhenFull", boolean.class, - defaultBlockWhenFull); + final DisruptorWaitStrategy waitStrategy = getAndRemoveParameter(parameters, "waitStrategy", DisruptorWaitStrategy.class, defaultWaitStrategy); + final DisruptorProducerType producerType = getAndRemoveParameter(parameters, "producerType", DisruptorProducerType.class, defaultProducerType); + final boolean multipleConsumers = getAndRemoveParameter(parameters, "multipleConsumers", boolean.class, defaultMultipleConsumers); + final boolean blockWhenFull = getAndRemoveParameter(parameters, "blockWhenFull", boolean.class, defaultBlockWhenFull); - final DisruptorReference disruptorReference = getOrCreateDisruptor(uri, size, producerType, - waitStrategy); - final DisruptorEndpoint disruptorEndpoint = new DisruptorEndpoint(uri, this, disruptorReference, - concurrentConsumers, multipleConsumers, blockWhenFull); + final DisruptorReference disruptorReference = getOrCreateDisruptor(uri, remaining, size, producerType, waitStrategy); + final DisruptorEndpoint disruptorEndpoint = new DisruptorEndpoint(uri, this, disruptorReference, concurrentConsumers, multipleConsumers, blockWhenFull); disruptorEndpoint.configureProperties(parameters); return disruptorEndpoint; } - private DisruptorReference getOrCreateDisruptor(final String uri, final int size, + private DisruptorReference getOrCreateDisruptor(final String uri, final String name, final int size, final DisruptorProducerType producerType, final DisruptorWaitStrategy waitStrategy) throws Exception { final String key = getDisruptorKey(uri); @@ -125,7 +118,7 @@ public class DisruptorComponent extends DefaultComponent { DisruptorReference ref = getDisruptors().get(key); if (ref == null) { LOGGER.debug("Creating new disruptor for key {}", key); - ref = new DisruptorReference(this, uri, sizeToUse, producerType, waitStrategy); + ref = new DisruptorReference(this, uri, name, sizeToUse, producerType, waitStrategy); getDisruptors().put(key, ref); } else { //if size was explicitly requested, the size to use should match the retrieved DisruptorReference http://git-wip-us.apache.org/repos/asf/camel/blob/ef88dcd8/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java index 9ad8cfb..e2973e1 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java @@ -24,7 +24,6 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import com.lmax.disruptor.InsufficientCapacityException; - import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; @@ -36,6 +35,9 @@ import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,15 +47,24 @@ import org.slf4j.LoggerFactory; * <a href="https://github.com/LMAX-Exchange/disruptor">LMAX Disruptor</a> within a CamelContext */ @ManagedResource(description = "Managed Disruptor Endpoint") +@UriEndpoint(scheme = "disruptor,disruptor-vm", consumerClass = DisruptorConsumer.class, label = "endpoint") public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsumersSupport { public static final String DISRUPTOR_IGNORE_EXCHANGE = "disruptor.ignoreExchange"; private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorEndpoint.class); + @UriPath(description = "Name of queue") + private String name; + @UriParam(defaultValue = "1") private final int concurrentConsumers; + @UriParam(defaultValue = "false") private final boolean multipleConsumers; + @UriParam(defaultValue = "IfReplyExpected") private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected; + @UriParam(defaultValue = "30000") private long timeout = 30000; + @UriParam(defaultValue = "false") private boolean blockWhenFull; + private final Set<DisruptorProducer> producers = new CopyOnWriteArraySet<DisruptorProducer>(); private final Set<DisruptorConsumer> consumers = new CopyOnWriteArraySet<DisruptorConsumer>(); @@ -64,6 +75,7 @@ public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsum final boolean multipleConsumers, boolean blockWhenFull) throws Exception { super(endpointUri, component); this.disruptorReference = disruptorReference; + this.name = disruptorReference.getName(); this.concurrentConsumers = concurrentConsumers; this.multipleConsumers = multipleConsumers; this.blockWhenFull = blockWhenFull; @@ -94,6 +106,11 @@ public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsum return status.name(); } + @ManagedAttribute(description = "Queue name") + public String getName() { + return name; + } + @ManagedAttribute(description = "Buffer max capacity") public int getBufferSize() { return disruptorReference.getBufferSize(); http://git-wip-us.apache.org/repos/asf/camel/blob/ef88dcd8/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java index 581473f..d5622b9 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java @@ -56,6 +56,7 @@ public class DisruptorReference { .newSetFromMap(new WeakHashMap<DisruptorEndpoint, Boolean>(4)); private final DisruptorComponent component; private final String uri; + private final String name; //The mark on the reference indicates if we are in the process of reconfiguring the Disruptor: //(ref, mark) : Description @@ -82,10 +83,11 @@ public class DisruptorReference { private int uniqueConsumerCount; - DisruptorReference(final DisruptorComponent component, final String uri, final int size, + DisruptorReference(final DisruptorComponent component, final String uri, final String name, final int size, final DisruptorProducerType producerType, final DisruptorWaitStrategy waitStrategy) throws Exception { this.component = component; this.uri = uri; + this.name = name; this.size = size; this.producerType = producerType; this.waitStrategy = waitStrategy; @@ -318,6 +320,10 @@ public class DisruptorReference { resizeThreadPoolExecutor(0); } + public String getName() { + return name; + } + public long getRemainingCapacity() throws DisruptorNotStartedException { return getCurrentDisruptor().getRingBuffer().remainingCapacity(); }