CAMEL-11237: Changes based on @nicolaferraro code review comments

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/90bb2136
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/90bb2136
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/90bb2136

Branch: refs/heads/master
Commit: 90bb213612c24ebf29b8cc8891cb283d7aa0225a
Parents: 3025f91
Author: Dmitry Volodin <dmvo...@gmail.com>
Authored: Tue May 23 18:45:00 2017 +0300
Committer: Dmitry Volodin <dmvo...@gmail.com>
Committed: Tue May 23 18:45:00 2017 +0300

----------------------------------------------------------------------
 .../src/main/docs/grpc-component.adoc           |  6 ++--
 .../camel/component/grpc/GrpcConfiguration.java | 35 ++++++++++++++++++++
 .../camel/component/grpc/GrpcConsumer.java      | 18 ++++++----
 .../GrpcRequestAggregationStreamObserver.java   | 35 ++++++++++++++------
 .../GrpcRequestPropagationStreamObserver.java   | 34 ++++++++++++++++---
 .../grpc/GrpcConsumerPropagationTest.java       | 35 +++++++++++++-------
 6 files changed, 127 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/docs/grpc-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-grpc/src/main/docs/grpc-component.adoc 
b/components/camel-grpc/src/main/docs/grpc-component.adoc
index d12bf85..a74c6df 100644
--- a/components/camel-grpc/src/main/docs/grpc-component.adoc
+++ b/components/camel-grpc/src/main/docs/grpc-component.adoc
@@ -47,7 +47,7 @@ with the following path and query parameters:
 | **service** | *Required* Fully qualified service name from the protocol 
buffer descriptor file (package dot service definition name) |  | String
 |=======================================================================
 
-#### Query Parameters (10 parameters):
+#### Query Parameters (12 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -55,6 +55,8 @@ with the following path and query parameters:
 | **host** (common) | The gRPC server host name |  | String
 | **port** (common) | The gRPC server port |  | int
 | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| **forwardOnCompleted** (consumer) | Determines if onCompleted events should 
be pushed to the Camel route. | false | boolean
+| **forwardOnError** (consumer) | Determines if onError events should be 
pushed to the Camel route. Exceptions will be set as message body. | false | 
boolean
 | **processingStrategy** (consumer) | This option specifies the top-level 
strategy for processing service requests and responses in streaming mode. If an 
aggregation strategy is selected all requests will be accumulated in the list 
then transferred to the flow and the accumulated responses will be sent to the 
sender. If a propagation strategy is selected request is sent to the stream and 
the response will be immediately sent back to the sender. |  | GrpcProcessing 
Strategies
 | **exceptionHandler** (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
@@ -92,7 +94,7 @@ The table below shows the types of objects in the message 
body, depending on the
 |Header name |Description|Possible values
 
 |*CamelGrpcMethodName*|Method name handled by the consumer service|
-|*CamelGrpcEventType*|Received event type from the sended request|onNext, 
onCompleted or onError
+|*CamelGrpcEventType*|Received event type from the sent request|onNext, 
onCompleted or onError
 |*CamelGrpcUserAgent*|If provided, the given agent will prepend the gRPC 
library's user agent information|
 
 |=======================================================================

http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
index c298c02..123de61 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
@@ -27,18 +27,30 @@ public class GrpcConfiguration {
     @UriPath
     @Metadata(required = "true")
     private String service;
+    
     @UriParam(label = "producer")
     private String method;
+    
     @UriParam
     private String host;
+    
     @UriParam
     private int port;
+    
     @UriParam(label = "producer")
     private String target;
+    
     @UriParam(label = "producer", defaultValue = "true")
     private Boolean usePlainText = true;
+    
     @UriParam(label = "consumer")
     private GrpcProcessingStrategies processingStrategy = 
GrpcProcessingStrategies.PROPAGATION;
+    
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean forwardOnCompleted;
+
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean forwardOnError;
 
     private String serviceName;
     private String servicePackage;
@@ -127,6 +139,29 @@ public class GrpcConfiguration {
     }
 
     /**
+     * Determines if onCompleted events should be pushed to the Camel route.
+     */
+    public void setForwardOnCompleted(boolean forwardOnCompleted) {
+        this.forwardOnCompleted = forwardOnCompleted;
+    }
+
+    public boolean isForwardOnCompleted() {
+        return forwardOnCompleted;
+    }
+
+    /**
+     * Determines if onError events should be pushed to the Camel route.
+     * Exceptions will be set as message body.
+     */
+    public void setForwardOnError(boolean forwardOnError) {
+        this.forwardOnError = forwardOnError;
+    }
+
+    public boolean isForwardOnError() {
+        return forwardOnError;
+    }
+
+    /**
      * The service name extracted from the full service name
      */
     protected String getServiceName() {

http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
index 3bcdec0..27a7d4a 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
@@ -111,17 +111,21 @@ public class GrpcConsumer extends DefaultConsumer {
     }
     
     public void onCompleted(Exchange exchange) {
-        exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, 
GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
-        doSend(exchange, done -> {
-        });
+        if (configuration.isForwardOnCompleted()) {
+            exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, 
GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
+            doSend(exchange, done -> {
+            });
+        }
     }
 
     public void onError(Exchange exchange, Throwable error) {
-        exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, 
GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR);
-        exchange.getIn().setBody(error);
+        if (configuration.isForwardOnError()) {
+            exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, 
GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR);
+            exchange.getIn().setBody(error);
         
-        doSend(exchange, done -> {
-        });
+            doSend(exchange, done -> {
+            });
+        }
     }
         
     private boolean doSend(Exchange exchange, AsyncCallback callback) {

http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
index 145029e..9f79b22 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.grpc.server;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import io.grpc.stub.StreamObserver;
 import org.apache.camel.component.grpc.GrpcConsumer;
@@ -47,23 +48,37 @@ public class GrpcRequestAggregationStreamObserver extends 
GrpcRequestAbstractStr
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public void onCompleted() {
+        CountDownLatch latch = new CountDownLatch(1);
+        Object responseBody = null;
+        
         exchange.getIn().setBody(requestList);
         exchange.getIn().setHeaders(headers);
 
         consumer.process(exchange, doneSync -> {
+            latch.countDown();
         });
+        
+        try {
+            latch.await();
+            
+            if (exchange.hasOut()) {
+                responseBody = exchange.getOut().getBody();
+            } else {
+                responseBody = exchange.getIn().getBody();
+            }
 
-        Object responseBody = exchange.getIn().getBody();
-        if (responseBody instanceof List) {
-            List<Object> responseList = (List<Object>)responseBody;
-            responseList.forEach((responseItem) -> {
-                responseObserver.onNext(responseItem);
-            });
-        } else {
-            responseObserver.onNext(responseBody);
+            if (responseBody instanceof List) {
+                List<?> responseList = (List<?>)responseBody;
+                responseList.forEach((responseItem) -> {
+                    responseObserver.onNext(responseItem);
+                });
+            } else {
+                responseObserver.onNext(responseBody);
+            }
+            responseObserver.onCompleted();
+        } catch (InterruptedException e) {
+            responseObserver.onError(e);
         }
-        responseObserver.onCompleted();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
index ae51100..632ff2d 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.component.grpc.server;
 
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import io.grpc.stub.StreamObserver;
 import org.apache.camel.component.grpc.GrpcConsumer;
@@ -34,17 +36,39 @@ public class GrpcRequestPropagationStreamObserver extends 
GrpcRequestAbstractStr
 
     @Override
     public void onNext(Object request) {
+        CountDownLatch latch = new CountDownLatch(1);
+        Object responseBody = null;
+        
         exchange = endpoint.createExchange();
         exchange.getIn().setBody(request);
         exchange.getIn().setHeaders(headers);
+        
         consumer.process(exchange, doneSync -> {
+            latch.countDown();
         });
-        if (exchange.hasOut()) {
-            responseObserver.onNext(exchange.getOut().getBody());
-        } else {
-            responseObserver.onNext(exchange.getIn().getBody());
+        
+        try {
+            latch.await();
+            
+            if (exchange.hasOut()) {
+                responseBody = exchange.getOut().getBody();
+            } else {
+                responseBody = exchange.getIn().getBody();
+            }
+            
+            if (responseBody instanceof List) {
+                List<?> responseList = (List<?>)responseBody;
+                responseList.forEach((responseItem) -> {
+                    responseObserver.onNext(responseItem);
+                });
+            } else {
+                responseObserver.onNext(responseBody);
+            }
+            responseObserver.onCompleted();
+
+        } catch (InterruptedException e) {
+            responseObserver.onError(e);
         }
-        responseObserver.onCompleted();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/90bb2136/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java
 
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java
index d4a0641..e7cb8c7 100644
--- 
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java
+++ 
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java
@@ -35,23 +35,28 @@ import org.slf4j.LoggerFactory;
 public class GrpcConsumerPropagationTest extends CamelTestSupport {
     private static final Logger LOG = 
LoggerFactory.getLogger(GrpcConsumerPropagationTest.class);
 
-    private static final int GRPC_ASYNC_REQUEST_TEST_PORT = 
AvailablePortFinder.getNextAvailable();
+    private static final int GRPC_ASYNC_NEXT_REQUEST_TEST_PORT = 
AvailablePortFinder.getNextAvailable();
+    private static final int GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT = 
AvailablePortFinder.getNextAvailable();
     private static final int GRPC_TEST_PING_ID = 1;
     private static final String GRPC_TEST_PING_VALUE = "PING";
     private static final String GRPC_TEST_PONG_VALUE = "PONG";
 
-    private ManagedChannel asyncRequestChannel;
-    private PingPongGrpc.PingPongStub asyncNonBlockingStub;
+    private ManagedChannel asyncOnNextChannel;
+    private ManagedChannel asyncOnCompletedChannel;
+    private PingPongGrpc.PingPongStub asyncOnNextStub;
+    private PingPongGrpc.PingPongStub asyncOnCompletedStub;
 
     @Before
     public void startGrpcChannels() {
-        asyncRequestChannel = ManagedChannelBuilder.forAddress("localhost", 
GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build();
-        asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);
+        asyncOnNextChannel = ManagedChannelBuilder.forAddress("localhost", 
GRPC_ASYNC_NEXT_REQUEST_TEST_PORT).usePlaintext(true).build();
+        asyncOnCompletedChannel = 
ManagedChannelBuilder.forAddress("localhost", 
GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT).usePlaintext(true).build();
+        asyncOnNextStub = PingPongGrpc.newStub(asyncOnNextChannel);
+        asyncOnCompletedStub = PingPongGrpc.newStub(asyncOnCompletedChannel);
     }
 
     @After
     public void stopGrpcChannels() throws Exception {
-        asyncRequestChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+        asyncOnNextChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
     }
 
     @Test
@@ -62,14 +67,15 @@ public class GrpcConsumerPropagationTest extends 
CamelTestSupport {
         PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
         PongResponseStreamObserver responseObserver = new 
PongResponseStreamObserver(latch);
 
-        StreamObserver<PingRequest> requestObserver = 
asyncNonBlockingStub.pingAsyncSync(responseObserver);
+        StreamObserver<PingRequest> requestObserver = 
asyncOnNextStub.pingAsyncSync(responseObserver);
         requestObserver.onNext(pingRequest);
         latch.await(5, TimeUnit.SECONDS);
 
-        MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation");
+        MockEndpoint mockEndpoint = 
getMockEndpoint("mock:async-on-next-propagation");
         mockEndpoint.expectedMessageCount(1);
         
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
 GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT);
         
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER,
 "pingAsyncSync");
+        mockEndpoint.assertIsSatisfied();
         
         PongResponse pongResponse = responseObserver.getPongResponse();
         assertNotNull(pongResponse);
@@ -85,14 +91,15 @@ public class GrpcConsumerPropagationTest extends 
CamelTestSupport {
         PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
         PongResponseStreamObserver responseObserver = new 
PongResponseStreamObserver(latch);
 
-        StreamObserver<PingRequest> requestObserver = 
asyncNonBlockingStub.pingAsyncAsync(responseObserver);
+        StreamObserver<PingRequest> requestObserver = 
asyncOnCompletedStub.pingAsyncAsync(responseObserver);
         requestObserver.onCompleted();
         latch.await(5, TimeUnit.SECONDS);
 
-        MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation");
+        MockEndpoint mockEndpoint = 
getMockEndpoint("mock:async-on-completed-propagation");
         mockEndpoint.expectedMessageCount(1);
         
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
 GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
         
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER,
 "pingAsyncAsync");
+        mockEndpoint.assertIsSatisfied();
     }
 
     @Override
@@ -101,8 +108,12 @@ public class GrpcConsumerPropagationTest extends 
CamelTestSupport {
             @Override
             public void configure() {
                 
-                
from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&host=localhost&port="
 + GRPC_ASYNC_REQUEST_TEST_PORT)
-                    .to("mock:async-propagation")
+                
from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&host=localhost&port="
 + GRPC_ASYNC_NEXT_REQUEST_TEST_PORT)
+                    .to("mock:async-on-next-propagation")
+                    .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
+                
+                
from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&forwardOnCompleted=true&host=localhost&port="
 + GRPC_ASYNC_COMPLETED_REQUEST_TEST_PORT)
+                    .to("mock:async-on-completed-propagation")
                     .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
             }
         };

Reply via email to