This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 1fc5e9a CAMEL-16861: Cleanup and update EIP docs 1fc5e9a is described below commit 1fc5e9a96d4d048c0b5716d3c5945ec15110f4b2 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Oct 14 14:46:13 2021 +0200 CAMEL-16861: Cleanup and update EIP docs --- .../main/docs/modules/eips/pages/wireTap-eip.adoc | 240 ++++++++++----------- 1 file changed, 113 insertions(+), 127 deletions(-) diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/wireTap-eip.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/wireTap-eip.adoc index 39a88c2..4b4f058 100644 --- a/core/camel-core-engine/src/main/docs/modules/eips/pages/wireTap-eip.adoc +++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/wireTap-eip.adoc @@ -6,181 +6,167 @@ :supportlevel: Stable http://www.enterpriseintegrationpatterns.com/WireTap.html[Wire Tap] -(from the xref:enterprise-integration-patterns.adoc[EIP patterns]) +from the xref:enterprise-integration-patterns.adoc[EIP patterns] allows you to route messages to a separate location while they are being forwarded to the ultimate destination. image::eip/WireTap.gif[image] -== Streams - -If you xref:wireTap-eip.adoc[Wire Tap] a stream message body then you -should consider enabling xref:latest@manual:ROOT:stream-caching.adoc[Stream caching] to -ensure the message body can be read at each endpoint. See more details -at xref:latest@manual:ROOT:stream-caching.adoc[Stream caching]. - -TIP: See the `cacheSize` option for more details on _how much cache_ to use depending on how many or few unique endpoints are used. - == Options // eip options: START include::partial$eip-options.adoc[] // eip options: END -== WireTap Thread pool - -The WireTap uses a thread pool to process the -tapped messages. This thread pool will by default use the settings -detailed at xref:latest@manual:ROOT:threading-model.adoc[Threading Model]. In particular, -when the pool is exhausted (with all threads utilized), further wiretaps -will be executed synchronously by the calling thread. To remedy this, -you can configure an explicit thread pool on the xref:wireTap-eip.adoc[Wire -Tap] having either a different rejection policy, a larger worker queue, -or more worker threads. - -== WireTap Node +== WireTap Nodes -Camel's Wire Tap node supports two flavors when tapping an +Camel's Wire Tap node supports two modes when wire tapping an xref:latest@manual:ROOT:exchange.adoc[Exchange]: -- With the traditional Wire Tap, Camel will copy the original +- _Default mode_: Camel will copy the original xref:latest@manual:ROOT:exchange.adoc[Exchange] and set its xref:latest@manual:ROOT:exchange-pattern.adoc[Exchange Pattern] to *`InOnly`*, as we want the tapped xref:latest@manual:ROOT:exchange.adoc[Exchange] to be sent in a fire and forget style. The tapped xref:latest@manual:ROOT:exchange.adoc[Exchange] is then sent in a -separate thread so it can run in parallel with the original. Beware that -only the Exchange is copied - Wire Tap won't do a deep clone (unless you +separate thread, so it can run in parallel with the original. Beware that +only the `Exchange` is copied - Wire Tap won't do a deep clone (unless you specify a custom processor via *`onPrepareRef`* which does that). So all -copies could share objects from the original Exchange. -- Camel also provides an option of sending a new +copies could share objects from the original `Exchange`. + +- _New mode_: Camel also provides an option of sending a new xref:latest@manual:ROOT:exchange.adoc[Exchange] allowing you to populate it with new values. -== Sending a Copy (traditional wiretap) +=== Using Wire Tap + +In the example below the exchange is wire tapped to the direct:tap route. +This route delays the message 1 second before continuing. This is because +it allows you to see that the tapped message is routed independently from the original +route, so that you would see log:result happens before log:tap [source,java] ---- - protected RouteBuilder createRouteBuilder() { - return new RouteBuilder() { - public void configure() { - // START SNIPPET: e1 - from("direct:start") - .to("log:foo") - .wireTap("direct:tap") - .to("mock:result"); - // END SNIPPET: e1 - - from("direct:tap") - .delay(1000).setBody().constant("Tapped") - .to("mock:result", "mock:tap"); - - from("direct:test").wireTap("direct:a").id("wiretap_1").to("mock:a"); - from("direct:a").to("mock:b"); - } - }; - } +from("direct:start") + .to("log:foo") + .wireTap("direct:tap") + .to("log:result"); + +from("direct:tap") + .delay(1000).setBody().constant("Tapped") + .to("log:tap"); ---- -== Sending a New xref:latest@manual:ROOT:exchange.adoc[Exchange] +And in XML: -Camel supports either a processor or an -xref:latest@manual:ROOT:expression.adoc[Expression] to populate the new -xref:latest@manual:ROOT:exchange.adoc[Exchange]. Using a processor gives you full power -over how the xref:latest@manual:ROOT:exchange.adoc[Exchange] is populated as you can set -properties, headers, etc. An xref:latest@manual:ROOT:expression.adoc[Expression] can only -be used to set the *`IN`* body. +[source,xml] +---- +<routes> -The xref:latest@manual:ROOT:expression.adoc[Expression] or -xref:latest@manual:ROOT:processor.adoc[Processor] is pre-populated with a copy of the -original xref:latest@manual:ROOT:exchange.adoc[Exchange], which allows you to access the -original message when you prepare a new xref:latest@manual:ROOT:exchange.adoc[Exchange] to -be sent. You can use the *`copy`* option (enabled by default) to -indicate whether you want this. + <route> + <from uri="direct:start"/> + <wireTap uri="direct:tap"/> + <to uri="log:result"/> + </route> -Below is the processor variation, -where we disable *`copy`* by passing in *`false`* to create a new, empty -xref:latest@manual:ROOT:exchange.adoc[Exchange] + <route> + <from uri="direct:tap"/> + <to uri="log:log"/> + </route> -[source,java] ----- - public void testFireAndForgetUsingProcessor() throws Exception { - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - // START SNIPPET: e1 - from("direct:start") - .wireTap("direct:foo").copy(false).newExchange(exchange -> { - exchange.getIn().setBody("Bye World"); - exchange.getIn().setHeader("foo", "bar"); - }).to("mock:result"); - - from("direct:foo").to("mock:foo"); - // END SNIPPET: e1 - } - }); - } +</routes> ---- +=== Wire tapping with dynamic URIs -== Using Dynamic URIs +For example to wire tap to a dynamic URI, then the URI +uses the xref:components:languages:simple-language.adoc[Simple] language that +allows to construct dynamic URIs. -For example to wire tap to a dynamic URI, then it supports the same -dynamic URIs as documented in xref:message-endpoint.adoc[Message -Endpoint]. For example to wire tap to a JMS queue where the header ID is -part of the queue name: +For example to wire tap to a JMS queue where the header ID is part of the queue name: [source,java] ---- - from("direct:start") .wireTap("jms:queue:backup-$\{header.id}") - .to("bean:doSomething"); +from("direct:start") + .wireTap("jms:queue:backup-${header.id}") + .to("bean:doSomething"); ---- -== Sending a New exchange and Set Headers in DSL +And in XML: -If you send a new message using xref:wireTap-eip.adoc[Wire Tap], then you -could only set the message body using an -xref:latest@manual:ROOT:expression.adoc[Expression] from the DSL. If you also need to set -headers, you would have to use a xref:latest@manual:ROOT:processor.adoc[Processor]. From -It's possible to set headers as well using the DSL. +[source,xml] +---- +<route> + <from uri="direct:start"/> + <wireTap uri="jms:queue:backup-${header.id}"/> + <to uri="bean:doSomething"/> +</route> +---- -The following example sends a new message which has +=== Wire tapping a new message -* *`Bye World`* as message body. -* A header with key *`id`* with the value *`123`*. -* A header with key *`date`* which has current date as value. +In the _new mode_ then the wire tap will create a new `Exchange` that gets populated with new data, to be sent +out from the wire tap. The idea is to allow you to construct the tapped message with information from the original +message. -== Java DSL +The construction of the new message is done via a `Processor` where you populate the `Exchange` [source,java] ---- - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - // START SNIPPET: e1 - from("direct:start") - // tap a new message and send it to direct:tap - // the new message should be Bye World with 2 headers - .wireTap("direct:tap") - // create the new tap message body and headers - .newExchangeBody(constant("Bye World")) - .newExchangeHeader("id", constant(123)) - .newExchangeHeader("date", simple("${date:now:yyyyMMdd}")) - .end() - // here we continue routing the original messages - .to("mock:result"); - - // this is the tapped route - from("direct:tap") - .to("mock:tap"); - // END SNIPPET: e1 - } - }; - } +from("direct:start") + .wireTap("direct:tap").newExchange(exchange -> { + String oldBody = exchange.getMessage().getBody(String.class); + String newBody = "We are listening: " + oldBody; + exchange.getMessage().setBody(newBody); + exchange.getMessage().setHeader("tapId", "123"); + }) + .to("log:result"); + +from("direct:tap") + .to("log:tap"); +---- + +And in XML we are using `<body>` and `<setHeader`> (This is also possible in Java DSL) +to construct the message, instead of using a `Processor`: + +[source,xml] +---- +<routes> + + <route> + <from uri="direct:start"/> + <wireTap uri="direct:tap"> + <body><simple>We are listening: ${body}</simple></body> + <setHeader name="tapId"><constant>123</constant></setHeader> + </wireTap> + <to uri="log:result"/> + </route> + + <route> + <from uri="direct:tap"/> + <to uri="log:log"/> + </route> + +</routes> ---- -== Using `onPrepare` to Execute Custom Logic when Preparing Messages -See details at xref:multicast-eip.adoc[Multicast] +== WireTap Thread Pools + +The WireTap uses a thread pool to process the +tapped messages. This thread pool will by default use the settings +detailed at xref:latest@manual:ROOT:threading-model.adoc[Threading Model]. + +In particular, when the pool is exhausted (with all threads utilized), further wiretaps +will be executed synchronously by the calling thread. To remedy this, +you can configure an explicit thread pool on the Wire Tap having either +a different rejection policy, a larger worker queue, or more worker threads. + +== Wire tapping Streaming based messages + +If you Wire Tap a stream message body then you +should consider enabling xref:latest@manual:ROOT:stream-caching.adoc[Stream caching] to +ensure the message body can be read at each endpoint. + +See more details at xref:latest@manual:ROOT:stream-caching.adoc[Stream caching]. +