CAMEL-11237: Changes based on @nicolaferraro code review comments Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/90bb2136 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/90bb2136 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/90bb2136
Branch: refs/heads/master Commit: 90bb213612c24ebf29b8cc8891cb283d7aa0225a Parents: 3025f91 Author: Dmitry Volodin <dmvo...@gmail.com> Authored: Tue May 23 18:45:00 2017 +0300 Committer: Dmitry Volodin <dmvo...@gmail.com> Committed: Tue May 23 18:45:00 2017 +0300 ---------------------------------------------------------------------- .../src/main/docs/grpc-component.adoc | 6 ++-- .../camel/component/grpc/GrpcConfiguration.java | 35 ++++++++++++++++++++ .../camel/component/grpc/GrpcConsumer.java | 18 ++++++---- .../GrpcRequestAggregationStreamObserver.java | 35 ++++++++++++++------ .../GrpcRequestPropagationStreamObserver.java | 34 ++++++++++++++++--- .../grpc/GrpcConsumerPropagationTest.java | 35 +++++++++++++------- 6 files changed, 127 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/docs/grpc-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/docs/grpc-component.adoc b/components/camel-grpc/src/main/docs/grpc-component.adoc index d12bf85..a74c6df 100644 --- a/components/camel-grpc/src/main/docs/grpc-component.adoc +++ b/components/camel-grpc/src/main/docs/grpc-component.adoc @@ -47,7 +47,7 @@ with the following path and query parameters: | **service** | *Required* Fully qualified service name from the protocol buffer descriptor file (package dot service definition name) | | String |======================================================================= -#### Query Parameters (10 parameters): +#### Query Parameters (12 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -55,6 +55,8 @@ with the following path and query parameters: | **host** (common) | The gRPC server host name | | String | **port** (common) | The gRPC server port | | int | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **forwardOnCompleted** (consumer) | Determines if onCompleted events should be pushed to the Camel route. | false | boolean +| **forwardOnError** (consumer) | Determines if onError events should be pushed to the Camel route. Exceptions will be set as message body. | false | boolean | **processingStrategy** (consumer) | This option specifies the top-level strategy for processing service requests and responses in streaming mode. If an aggregation strategy is selected all requests will be accumulated in the list then transferred to the flow and the accumulated responses will be sent to the sender. If a propagation strategy is selected request is sent to the stream and the response will be immediately sent back to the sender. | | GrpcProcessing Strategies | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern @@ -92,7 +94,7 @@ The table below shows the types of objects in the message body, depending on the |Header name |Description|Possible values |*CamelGrpcMethodName*|Method name handled by the consumer service| -|*CamelGrpcEventType*|Received event type from the sended request|onNext, onCompleted or onError +|*CamelGrpcEventType*|Received event type from the sent request|onNext, onCompleted or onError |*CamelGrpcUserAgent*|If provided, the given agent will prepend the gRPC library's user agent information| |======================================================================= http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java index c298c02..123de61 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java @@ -27,18 +27,30 @@ public class GrpcConfiguration { @UriPath @Metadata(required = "true") private String service; + @UriParam(label = "producer") private String method; + @UriParam private String host; + @UriParam private int port; + @UriParam(label = "producer") private String target; + @UriParam(label = "producer", defaultValue = "true") private Boolean usePlainText = true; + @UriParam(label = "consumer") private GrpcProcessingStrategies processingStrategy = GrpcProcessingStrategies.PROPAGATION; + + @UriParam(label = "consumer", defaultValue = "false") + private boolean forwardOnCompleted; + + @UriParam(label = "consumer", defaultValue = "false") + private boolean forwardOnError; private String serviceName; private String servicePackage; @@ -127,6 +139,29 @@ public class GrpcConfiguration { } /** + * Determines if onCompleted events should be pushed to the Camel route. + */ + public void setForwardOnCompleted(boolean forwardOnCompleted) { + this.forwardOnCompleted = forwardOnCompleted; + } + + public boolean isForwardOnCompleted() { + return forwardOnCompleted; + } + + /** + * Determines if onError events should be pushed to the Camel route. + * Exceptions will be set as message body. + */ + public void setForwardOnError(boolean forwardOnError) { + this.forwardOnError = forwardOnError; + } + + public boolean isForwardOnError() { + return forwardOnError; + } + + /** * The service name extracted from the full service name */ protected String getServiceName() { http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java index 3bcdec0..27a7d4a 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java @@ -111,17 +111,21 @@ public class GrpcConsumer extends DefaultConsumer { } public void onCompleted(Exchange exchange) { - exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED); - doSend(exchange, done -> { - }); + if (configuration.isForwardOnCompleted()) { + exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED); + doSend(exchange, done -> { + }); + } } public void onError(Exchange exchange, Throwable error) { - exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR); - exchange.getIn().setBody(error); + if (configuration.isForwardOnError()) { + exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR); + exchange.getIn().setBody(error); - doSend(exchange, done -> { - }); + doSend(exchange, done -> { + }); + } } private boolean doSend(Exchange exchange, AsyncCallback callback) { http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java index 145029e..9f79b22 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java @@ -19,6 +19,7 @@ package org.apache.camel.component.grpc.server; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import io.grpc.stub.StreamObserver; import org.apache.camel.component.grpc.GrpcConsumer; @@ -47,23 +48,37 @@ public class GrpcRequestAggregationStreamObserver extends GrpcRequestAbstractStr } @Override - @SuppressWarnings("unchecked") public void onCompleted() { + CountDownLatch latch = new CountDownLatch(1); + Object responseBody = null; + exchange.getIn().setBody(requestList); exchange.getIn().setHeaders(headers); consumer.process(exchange, doneSync -> { + latch.countDown(); }); + + try { + latch.await(); + + if (exchange.hasOut()) { + responseBody = exchange.getOut().getBody(); + } else { + responseBody = exchange.getIn().getBody(); + } - Object responseBody = exchange.getIn().getBody(); - if (responseBody instanceof List) { - List<Object> responseList = (List<Object>)responseBody; - responseList.forEach((responseItem) -> { - responseObserver.onNext(responseItem); - }); - } else { - responseObserver.onNext(responseBody); + if (responseBody instanceof List) { + List<?> responseList = (List<?>)responseBody; + responseList.forEach((responseItem) -> { + responseObserver.onNext(responseItem); + }); + } else { + responseObserver.onNext(responseBody); + } + responseObserver.onCompleted(); + } catch (InterruptedException e) { + responseObserver.onError(e); } - responseObserver.onCompleted(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java index ae51100..632ff2d 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java @@ -16,7 +16,9 @@ */ package org.apache.camel.component.grpc.server; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import io.grpc.stub.StreamObserver; import org.apache.camel.component.grpc.GrpcConsumer; @@ -34,17 +36,39 @@ public class GrpcRequestPropagationStreamObserver extends GrpcRequestAbstractStr @Override public void onNext(Object request) { + CountDownLatch latch = new CountDownLatch(1); + Object responseBody = null; + exchange = endpoint.createExchange(); exchange.getIn().setBody(request); exchange.getIn().setHeaders(headers); + consumer.process(exchange, doneSync -> { + latch.countDown(); }); - if (exchange.hasOut()) { - responseObserver.onNext(exchange.getOut().getBody()); - } else { - responseObserver.onNext(exchange.getIn().getBody()); + + try { + latch.await(); + + if (exchange.hasOut()) { + responseBody = exchange.getOut().getBody(); + } else { + responseBody = exchange.getIn().getBody(); + } + + if (responseBody instanceof List) { + List<?> responseList = (List<?>)responseBody; + responseList.forEach((responseItem) -> { + responseObserver.onNext(responseItem); + }); + } else { + responseObserver.onNext(responseBody); + } + responseObserver.onCompleted(); + + } catch (InterruptedException e) { + responseObserver.onError(e); } - responseObserver.onCompleted(); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java index d4a0641..e7cb8c7 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java @@ -35,23 +35,28 @@ import org.slf4j.LoggerFactory; public class GrpcConsumerPropagationTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerPropagationTest.class); - private static final int GRPC_ASYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int GRPC_ASYNC_NEXT_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); private static final int GRPC_TEST_PING_ID = 1; private static final String GRPC_TEST_PING_VALUE = "PING"; private static final String GRPC_TEST_PONG_VALUE = "PONG"; - private ManagedChannel asyncRequestChannel; - private PingPongGrpc.PingPongStub asyncNonBlockingStub; + private ManagedChannel asyncOnNextChannel; + private ManagedChannel asyncOnCompletedChannel; + private PingPongGrpc.PingPongStub asyncOnNextStub; + private PingPongGrpc.PingPongStub asyncOnCompletedStub; @Before public void startGrpcChannels() { - asyncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build(); - asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); + asyncOnNextChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_NEXT_REQUEST_TEST_PORT).usePlaintext(true).build(); + asyncOnCompletedChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT).usePlaintext(true).build(); + asyncOnNextStub = PingPongGrpc.newStub(asyncOnNextChannel); + asyncOnCompletedStub = PingPongGrpc.newStub(asyncOnCompletedChannel); } @After public void stopGrpcChannels() throws Exception { - asyncRequestChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + asyncOnNextChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } @Test @@ -62,14 +67,15 @@ public class GrpcConsumerPropagationTest extends CamelTestSupport { PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); - StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncSync(responseObserver); + StreamObserver<PingRequest> requestObserver = asyncOnNextStub.pingAsyncSync(responseObserver); requestObserver.onNext(pingRequest); latch.await(5, TimeUnit.SECONDS); - MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation"); + MockEndpoint mockEndpoint = getMockEndpoint("mock:async-on-next-propagation"); mockEndpoint.expectedMessageCount(1); mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT); mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER, "pingAsyncSync"); + mockEndpoint.assertIsSatisfied(); PongResponse pongResponse = responseObserver.getPongResponse(); assertNotNull(pongResponse); @@ -85,14 +91,15 @@ public class GrpcConsumerPropagationTest extends CamelTestSupport { PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); - StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); + StreamObserver<PingRequest> requestObserver = asyncOnCompletedStub.pingAsyncAsync(responseObserver); requestObserver.onCompleted(); latch.await(5, TimeUnit.SECONDS); - MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation"); + MockEndpoint mockEndpoint = getMockEndpoint("mock:async-on-completed-propagation"); mockEndpoint.expectedMessageCount(1); mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED); mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER, "pingAsyncAsync"); + mockEndpoint.assertIsSatisfied(); } @Override @@ -101,8 +108,12 @@ public class GrpcConsumerPropagationTest extends CamelTestSupport { @Override public void configure() { - from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&host=localhost&port=" + GRPC_ASYNC_REQUEST_TEST_PORT) - .to("mock:async-propagation") + from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&host=localhost&port=" + GRPC_ASYNC_NEXT_REQUEST_TEST_PORT) + .to("mock:async-on-next-propagation") + .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); + + from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&forwardOnCompleted=true&host=localhost&port=" + GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT) + .to("mock:async-on-completed-propagation") .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); } };