Repository: camel Updated Branches: refs/heads/master 18b0c4981 -> 7cff09902
CAMEL-10612: aligned examples with new API features Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7cff0990 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7cff0990 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7cff0990 Branch: refs/heads/master Commit: 7cff0990283bc600844572b87c5661f6b7bd2c0d Parents: 8195f3e Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Fri Feb 3 18:41:30 2017 +0100 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Fri Feb 3 18:41:41 2017 +0100 ---------------------------------------------------------------------- .../streams/ReactiveStreamsProducer.java | 2 +- .../api/CamelReactiveStreamsService.java | 2 +- .../engine/CamelReactiveStreamsServiceImpl.java | 2 +- .../streams/util/UnwrapStreamProcessor.java | 2 +- .../support/ReactiveStreamsTestService.java | 2 +- .../reactive/streams/ClientAPIRestExample.java | 85 +++++++++++++++++++ .../streams/ClientAPIWorkflowExample.java | 89 ++++++++++++++++++++ .../example/reactive/streams/RestExample.java | 2 +- .../main/resources/META-INF/spring.factories | 4 +- .../src/main/resources/application.yml | 6 +- 10 files changed, 187 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7cff0990/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java index 4c75789..eab3e1b 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java @@ -41,7 +41,7 @@ public class ReactiveStreamsProducer extends DefaultAsyncProducer { @Override public boolean process(Exchange exchange, AsyncCallback callback) { - service.process(name, exchange, (data, error) -> { + service.sendCamelExchange(name, exchange, (data, error) -> { if (error != null) { data.setException(error); } http://git-wip-us.apache.org/repos/asf/camel/blob/7cff0990/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java index 6b639cf..3e3e17c 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java @@ -259,7 +259,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service * @param exchange the exchange to be forwarded to the external subscribers * @param callback the callback that signals the delivery of the exchange */ - void process(String name, Exchange exchange, DispatchCallback<Exchange> callback); + void sendCamelExchange(String name, Exchange exchange, DispatchCallback<Exchange> callback); /* * Methods for Camel consumers. http://git-wip-us.apache.org/repos/asf/camel/blob/7cff0990/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java index f5f5f4e..ec279b8 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java @@ -103,7 +103,7 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ } @Override - public void process(String name, Exchange exchange, DispatchCallback<Exchange> callback) { + public void sendCamelExchange(String name, Exchange exchange, DispatchCallback<Exchange> callback) { StreamPayload<Exchange> payload = new StreamPayload<>(exchange, callback); getPayloadPublisher(name).publish(payload); } http://git-wip-us.apache.org/repos/asf/camel/blob/7cff0990/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java index c5bb03f..2f11f2f 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java @@ -84,7 +84,7 @@ public class UnwrapStreamProcessor implements AsyncProcessor { exchange.getProperties().clear(); exchange.getProperties().putAll(copy.getProperties()); } else { - exchange.getIn().setBody(body); + exchange.getOut().setBody(body); } } http://git-wip-us.apache.org/repos/asf/camel/blob/7cff0990/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java index 186a9b5..822f41a 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java @@ -82,7 +82,7 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService { } @Override - public void process(String name, Exchange exchange, DispatchCallback<Exchange> callback) { + public void sendCamelExchange(String name, Exchange exchange, DispatchCallback<Exchange> callback) { } http://git-wip-us.apache.org/repos/asf/camel/blob/7cff0990/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIRestExample.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIRestExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIRestExample.java new file mode 100644 index 0000000..8a01965 --- /dev/null +++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIRestExample.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.reactive.streams; + +import javax.annotation.PostConstruct; + +import org.apache.camel.Exchange; +import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; +import org.reactivestreams.Publisher; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import reactor.core.publisher.Flux; + +/** + * This example shows how to define Camel rest endpoints using the direct client API. + * + * Note: the code is not spring-boot related and could have been placed in a standalone main(). + */ +@Configuration +@ConditionalOnProperty("examples.client-api.rest") +public class ClientAPIRestExample { + + @Component + public static class ClientAPIRestExampleStreams { + + @Autowired + private CamelReactiveStreamsService camel; + + @PostConstruct + public void setup() { + + // Rest endpoint to retrieve all orders: http://localhost:8080/camel/orders + camel.processFromURI("rest:get:orders", exchange -> + Flux.from(exchange) + .flatMap(ex -> allOrders())); + + + // Rest endpoint to retrieve an order. + // Try: http://localhost:8080/camel/orders/1 + // Or: http://localhost:8080/camel/orders/xxx + camel.processFromURI("rest:get:orders/{orderId}", exchange -> + Flux.from(exchange) + .map(ex -> ex.getIn().getHeader("orderId", String.class)) + .flatMap(this::toOrderInfo) + .map(Object.class::cast) + .switchIfEmpty( + Flux.from(exchange) + .doOnNext(ex -> ex.getOut().setBody("Not found")) + .doOnNext(ex -> ex.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 404)) + )); + + } + + private Publisher<String> toOrderInfo(String orderId) { + // Simulate a retrieval from DB + return allOrders() + .filter(o -> o.equals(orderId)) // Ensure the order exists + .map("Detailed Info on "::concat) // Add detailed info + .next(); + } + + private Flux<String> allOrders() { + return Flux.just("1", "2"); + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7cff0990/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIWorkflowExample.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIWorkflowExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIWorkflowExample.java new file mode 100644 index 0000000..c1ad65c --- /dev/null +++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIWorkflowExample.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.reactive.streams; + +import java.io.InputStream; +import javax.annotation.PostConstruct; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import reactor.core.publisher.Flux; + +/** + * This example shows how to define a complex workflow using Camel direct client API. + * + * Note: the code is not spring-boot related and could have been placed in a standalone main(). + */ +@Configuration +@ConditionalOnProperty("examples.client-api.workflow") +public class ClientAPIWorkflowExample { + + /** + * The reactor streams. + */ + @Component + public static class ClientAPIWorkflowExampleStreams { + + @Autowired + private CamelReactiveStreamsService camel; + + @PostConstruct + public void setup() { + + /** + * This workflow reads all files from the directory named "input", + * marshals them using the Camel marshalling features (simulation) + * and sends them to an external system (simulation) + * only if they contain the word "camel". + */ + Flux.from(camel.publishURI("file:input", InputStream.class)) + .flatMap(camel.requestURI("direct:unmarshal", String.class)) + .filter(text -> text.contains("camel")) + .flatMap(camel.requestURI("direct:send", String.class)) + .subscribe(); + + } + + } + + /** + * The Camel Configuration. + */ + @Component + public static class BasicReactorToCamelExampleRoutes extends RouteBuilder { + + @Override + public void configure() throws Exception { + + from("direct:unmarshal") + // This can be far more complex, using marshal() + .convertBodyTo(String.class) + .log("Content marshalled to string: ${body}"); + + from("direct:send") + .log("Sending the file to an external system (simulation)"); + + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7cff0990/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java index 4a650d2..bfae8a3 100644 --- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java +++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java @@ -36,7 +36,7 @@ import reactor.core.publisher.Flux; * but they can be moved in their own files. */ @Configuration -@ConditionalOnProperty("examples.others.rest-example") +@ConditionalOnProperty("examples.others.rest") public class RestExample { /** http://git-wip-us.apache.org/repos/asf/camel/blob/7cff0990/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories ---------------------------------------------------------------------- diff --git a/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories b/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories index 8be4929..88f33e1 100644 --- a/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories +++ b/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories @@ -20,5 +20,7 @@ org.apache.camel.example.reactive.streams.BasicCamelToReactorExample,\ org.apache.camel.example.reactive.streams.BasicReactorToCamelExample,\ org.apache.camel.example.reactive.streams.BasicReactorToCamelInOutExample,\ org.apache.camel.example.reactive.streams.BasicCamelToReactorInOutExample,\ -org.apache.camel.example.reactive.streams.RestExample +org.apache.camel.example.reactive.streams.RestExample,\ +org.apache.camel.example.reactive.streams.ClientAPIRestExample,\ +org.apache.camel.example.reactive.streams.ClientAPIWorkflowExample http://git-wip-us.apache.org/repos/asf/camel/blob/7cff0990/examples/camel-example-reactive-streams/src/main/resources/application.yml ---------------------------------------------------------------------- diff --git a/examples/camel-example-reactive-streams/src/main/resources/application.yml b/examples/camel-example-reactive-streams/src/main/resources/application.yml index eeeb3ac..b8fb3ed 100644 --- a/examples/camel-example-reactive-streams/src/main/resources/application.yml +++ b/examples/camel-example-reactive-streams/src/main/resources/application.yml @@ -23,7 +23,9 @@ examples: reactor-to-camel: true camel-to-reactor-in-out: true reactor-to-camel-in-out: true + client-api: + rest: true + workflow: true others: - rest-example: true - + rest: true