Repository: camel Updated Branches: refs/heads/master 5ab310241 -> 622e515d4
Added camel-distruptor docs to gitbook Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/622e515d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/622e515d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/622e515d Branch: refs/heads/master Commit: 622e515d44d86d699dc0eaebc0dd937015b4d944 Parents: 5ab3102 Author: Andrea Cosentino <anco...@gmail.com> Authored: Tue Feb 23 11:33:09 2016 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Tue Feb 23 11:33:09 2016 +0100 ---------------------------------------------------------------------- .../src/main/docs/disruptor.adoc | 350 +++++++++++++++++++ docs/user-manual/en/SUMMARY.md | 1 + 2 files changed, 351 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/622e515d/components/camel-disruptor/src/main/docs/disruptor.adoc ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/docs/disruptor.adoc b/components/camel-disruptor/src/main/docs/disruptor.adoc new file mode 100644 index 0000000..fcda340 --- /dev/null +++ b/components/camel-disruptor/src/main/docs/disruptor.adoc @@ -0,0 +1,350 @@ +[[Disruptor-DisruptorComponent]] +Disruptor Component +~~~~~~~~~~~~~~~~~~~ + +*Available as of Camel 2.12* + +The *disruptor:* component provides asynchronous +http://www.eecs.harvard.edu/~mdw/proj/seda/[SEDA] behavior much as the +standard SEDA Component, but utilizes a +https://github.com/LMAX-Exchange/disruptor[Disruptor] instead of a +http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html[BlockingQueue] +utilized by the standard link:seda.html[SEDA]. Alternatively, a + +*disruptor-vm:* endpoint is supported by this component, providing an +alternative to the standard link:vm.html[VM]. As with the SEDA +component, buffers of the *disruptor:* endpoints are only visible within +a *single* link:camelcontext.html[CamelContext] and no support is +provided for persistence or recovery. The buffers of the +**disruptor-vm:** endpoints also provides support for communication +across CamelContexts instances so you can use this mechanism to +communicate across web applications (provided that *camel-disruptor.jar* +is on the *system/boot* classpath). + +The main advantage of choosing to use the Disruptor Component over the +SEDA or the VM Component is performance in use cases where there is high +contention between producer(s) and/or multicasted or concurrent +Consumers. In those cases, significant increases of throughput and +reduction of latency has been observed. Performance in scenarios without +contention is comparable to the SEDA and VM Components. + +The Disruptor is implemented with the intention of mimicing the +behaviour and options of the SEDA and VM Components as much as possible. +The main differences with the them are the following: + +* The buffer used is always bounded in size (default 1024 exchanges). +* As a the buffer is always bouded, the default behaviour for the +Disruptor is to block while the buffer is full instead of throwing an +exception. This default behaviour may be configured on the component +(see options). +* The Disruptor enpoints don't implement the BrowsableEndpoint +interface. As such, the exchanges currently in the Disruptor can't be +retrieved, only the amount of exchanges. +* The Disruptor requires its consumers (multicasted or otherwise) to be +statically configured. Adding or removing consumers on the fly requires +complete flushing of all pending exchanges in the Disruptor. +* As a result of the reconfiguration: Data sent over a Disruptor is +directly processed and 'gone' if there is at least one consumer, late +joiners only get new exchanges published after they've joined. +* The *pollTimeout* option is not supported by the Disruptor Component. +* When a producer blocks on a full Disruptor, it does not respond to +thread interrupts. + +Maven users will need to add the following dependency to their `pom.xml` +for this component: + +[source,java] +------------------------------------------------------------ +<dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-disruptor</artifactId> + <version>x.x.x</version> + <!-- use the same version as your Camel core version --> +</dependency> +------------------------------------------------------------ + +[[Disruptor-URIformat]] +URI format +^^^^^^^^^^ + +[source,java] +----------------------------- + disruptor:someName[?options] +----------------------------- + +or + +[source,java] +-------------------------------- + disruptor-vm:someName[?options] +-------------------------------- + +Where **someName** can be any string that uniquely identifies the +endpoint within the current link:camelcontext.html[CamelContext] (or +across contexts in case of + + **disruptor-vm:**). + + You can append query options to the URI in the following format: + +[source,java] +------------------------------ + ?option=value&option=value&⦠+------------------------------ + +[[Disruptor-Options]] +Options +^^^^^^^ + +All the following options are valid for both the **disruptor:** and +**disruptor-vm:** components. + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Name | Default | Description + +|size |1024 |The maximum capacity of the Disruptors ringbuffer. Will be effectively +increased to the nearest power of two. *Notice:* Mind if you use this +option, then its the first endpoint being created with the queue name, +that determines the size. To make sure all endpoints use same size, then +configure the size option on all of them, or the first endpoint being +created. + +|bufferSize | | *Component only:* The maximum default size (capacity of the number of +messages it can hold) of the Disruptors ringbuffer. This option is used +if size is not in use. + +|queueSize | | *Component only:* Additional option to specify the <em>bufferSize</em> +to maintain maximum compatibility with the link:seda.html[SEDA] +Component. + +|concurrentConsumers |1 |Number of concurrent threads processing exchanges. + +|waitForTaskToComplete |IfReplyExpected |Option to specify whether the caller should wait for the async task to +complete or not before continuing. The following three options are +supported: _Always_, _Never_ or _IfReplyExpected_. The first two values +are self-explanatory. The last value, _IfReplyExpected_, will only wait +if the message is link:request-reply.html[Request Reply] based. See more +information about link:async.html[Async] messaging. + +|timeout |30000 |Timeout (in milliseconds) before a producer will stop waiting for an +asynchronous task to complete. See _waitForTaskToComplete_ and +link:async.html[Async] for more details. You can disable timeout by +using 0 or a negative value. + +|defaultMultipleConsumers | | *Component only:* Allows to set the default allowance of multiple +consumers for endpoints created by this component used when +_multipleConsumers_ is not provided. + +|multipleConsumers |false |Specifies whether multiple consumers are allowed. If enabled, you can +use Disruptor for http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern[Publish-Subscribe] messaging. +That is, you can send a message to the SEDA queue and have each consumer +receive a copy of the message. When enabled, this option should be +specified on every consumer endpoint. + +|limitConcurrentConsumers |true |Whether to limit the number of concurrentConsumers to the maximum of +500. By default, an exception will be thrown if a Disruptor endpoint is +configured with a greater number. You can disable that check by turning +this option off. + +|blockWhenFull |true |Whether a thread that sends messages to a full Disruptor will block +until the ringbuffer's capacity is no longer exhausted. By default, the +calling thread will block and wait until the message can be accepted. By +disabling this option, an exception will be thrown stating that the +queue is full. + +|defaultBlockWhenFull | | *Component only:* Allows to set the default producer behaviour when the +ringbuffer is full for endpoints created by this comonent used when +_blockWhenFull_ is not provided. + +|waitStrategy |Blocking |Defines the strategy used by consumer threads to wait on new exchanges +to be published. The options allowed are:_Blocking_, _Sleeping_, +_BusySpin_ and _Yielding_. Refer to the section below for more +information on this subject + +|defaultWaitStrategy | | *Component only:* Allows to set the default wait strategy for endpoints +created by this comonent used when _waitStrategy_ is not provided. + +|producerType |Multi | Defines the producers allowed on the Disruptor. The options allowed are: +_Multi_ to allow multiple producers and _Single_ to enable certain +optimizations only allowed when one concurrent producer (on one thread +or otherwise synchronized) is active. + +|defaultProducerType | | *Component only:* Allows to set the default producer type for endpoints +created by this comonent used when _producerType_ is not provided. +|======================================================================= + +[[Disruptor-Waitstrategies]] +Wait strategies +^^^^^^^^^^^^^^^ + +The wait strategy effects the type of waiting performed by the consumer +threads that are currently waiting for the next exchange to be +published. The following strategies can be chosen: + +[width="100%",cols="10%,45%,45%",options="header",] +|======================================================================= +|Name |Description |Advice + +|Blocking | Blocking strategy that uses a lock and condition variable for Consumers +waiting on a barrier. | This strategy can be used when throughput and low-latency are not as +important as CPU resource. + +|Sleeping |Sleeping strategy that initially spins, then uses a Thread.yield(), and +eventually for the minimum number of nanos the OS and JVM will allow +while the Consumers are waiting on a barrier. |This strategy is a good compromise between performance and CPU resource. +Latency spikes can occur after quiet periods. + +|BusySpin |Busy Spin strategy that uses a busy spin loop for Consumers waiting on a +barrier. |This strategy will use CPU resource to avoid syscalls which can +introduce latency jitter. It is best used when threads can be bound to +specific CPU cores. + +|Yielding |Yielding strategy that uses a Thread.yield() for Consumers waiting on a +barrier after an initially spinning. |This strategy is a good compromise between performance and CPU resource +without incurring significant latency spikes. +|======================================================================= + +[[Disruptor-UseofRequestReply]] +Use of Request Reply +^^^^^^^^^^^^^^^^^^^^ + +The Disruptor component supports using link:request-reply.html[Request +Reply], where the caller will wait for the Async route to complete. For +instance: + +[source,java] +------------------------------------------------------------------------------ +from("mina:tcp://0.0.0.0:9876?textline=true&sync=true").to("disruptor:input"); +from("disruptor:input").to("bean:processInput").to("bean:createResponse"); +------------------------------------------------------------------------------ + +In the route above, we have a TCP listener on port 9876 that accepts +incoming requests. The request is routed to the _disruptor:input_ +buffer. As it is a link:request-reply.html[Request Reply] message, we +wait for the response. When the consumer on the _disruptor:input_ buffer +is complete, it copies the response to the original message response. + +[[Disruptor-Concurrentconsumers]] +Concurrent consumers +^^^^^^^^^^^^^^^^^^^^ + +By default, the Disruptor endpoint uses a single consumer thread, but +you can configure it to use concurrent consumer threads. So instead of +thread pools you can use: + +[source,java] +-------------------------------------------------------------- +from("disruptor:stageName?concurrentConsumers=5").process(...) +-------------------------------------------------------------- + +As for the difference between the two, note a thread pool can +increase/shrink dynamically at runtime depending on load, whereas the +number of concurrent consumers is always fixed and supported by the +Disruptor internally so performance will be higher. + +[[Disruptor-Threadpools]] +Thread pools +^^^^^^^^^^^^ + +Be aware that adding a thread pool to a Disruptor endpoint by doing +something like: + +[source,java] +-------------------------------------------------- +from("disruptor:stageName").thread(5).process(...) +-------------------------------------------------- + +Can wind up with adding a normal +http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html[BlockingQueue] +to be used in conjunction with the Disruptor, effectively negating part +of the performance gains achieved by using the Disruptor. Instead, it is +advices to directly configure number of threads that process messages on +a Disruptor endpoint using the concurrentConsumers option. + +[[Disruptor-Sample]] +Sample +^^^^^^ + +In the route below we use the Disruptor to send the request to this +async queue to be able to send a fire-and-forget message for further +processing in another thread, and return a constant reply in this thread +to the original caller. + +[source,java] +------------------------------------------------- +public void configure() throws Exception { + from("direct:start") + // send it to the disruptor that is async + .to("disruptor:next") + // return a constant response + .transform(constant("OK")); + + from("disruptor:next").to("mock:result"); +} +------------------------------------------------- + +Here we send a Hello World message and expects the reply to be OK. + +[source,java] +----------------------------------------------------------------- +Object out = template.requestBody("direct:start", "Hello World"); +assertEquals("OK", out); +----------------------------------------------------------------- + +The "Hello World" message will be consumed from the Disruptor from +another thread for further processing. Since this is from a unit test, +it will be sent to a mock endpoint where we can do assertions in the +unit test. + +[[Disruptor-UsingmultipleConsumers]] +Using multipleConsumers +^^^^^^^^^^^^^^^^^^^^^^^ + +In this example we have defined two consumers and registered them as +spring beans. + +[source,java] +------------------------------------------------------------------------------------------- +<!-- define the consumers as spring beans --> +<bean id="consumer1" class="org.apache.camel.spring.example.FooEventConsumer"/> + +<bean id="consumer2" class="org.apache.camel.spring.example.AnotherFooEventConsumer"/> + +<camelContext xmlns="http://camel.apache.org/schema/spring"> + <!-- define a shared endpoint which the consumers can refer to instead of using url --> + <endpoint id="foo" uri="disruptor:foo?multipleConsumers=true"/> +</camelContext> +------------------------------------------------------------------------------------------- + +Since we have specified multipleConsumers=true on the Disruptor foo +endpoint we can have those two or more consumers receive their own copy +of the message as a kind of pub-sub style messaging. As the beans are +part of an unit test they simply send the message to a mock endpoint, +but notice how we can use @Consume to consume from the Disruptor. + +[source,java] +------------------------------------------- +public class FooEventConsumer { + + @EndpointInject(uri = "mock:result") + private ProducerTemplate destination; + + @Consume(ref = "foo") + public void doSomething(String body) { + destination.sendBody("foo" + body); + } + +} +------------------------------------------- + +[[Disruptor-Extractingdisruptorinformation]] +Extracting disruptor information +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If needed, information such as buffer size, etc. can be obtained without +using JMX in this fashion: + +[source,java] +-------------------------------------------------------------------- +DisruptorEndpoint disruptor = context.getEndpoint("disruptor:xxxx"); +int size = disruptor.getBufferSize(); +-------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/622e515d/docs/user-manual/en/SUMMARY.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md index ca286cd..8590d75 100644 --- a/docs/user-manual/en/SUMMARY.md +++ b/docs/user-manual/en/SUMMARY.md @@ -111,6 +111,7 @@ * [Crypto](crypto.adoc) * [Crypto Digital Signatures](crypto-digital-signatures.adoc) * [CSV](csv.adoc) + * [Disruptor](disruptor.adoc) * [JMS](jms.adoc) * [Metrics](metrics.adoc) * [Properties](properties.adoc)