This is an automated email from the ASF dual-hosted git repository.

dmvolod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7fc8adc06b989c9a37f8f487f0dda67ecfb42ef6
Author: Rajasekhar <rajasekh...@live.com>
AuthorDate: Sun Oct 25 22:54:00 2020 +0530

    camel-15754-Improve the propagation consumer strategy
---
 .../grpc/server/GrpcRequestPropagationStreamObserver.java     | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)

diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
index 9dd7323..e4044eb 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
@@ -25,7 +25,8 @@ import org.apache.camel.component.grpc.GrpcConsumer;
 import org.apache.camel.component.grpc.GrpcEndpoint;
 
 /**
- * gRPC request stream observer which is propagating every onNext(), onError() 
or onCompleted() calls to the Camel route
+ * This is the default consumer strategy for client-streaming and 
bi-directional streaming gRPC calls. gRPC request
+ * stream observer which is propagating every onNext(), onError() or 
onCompleted() calls to the Camel route.
  */
 public class GrpcRequestPropagationStreamObserver extends 
GrpcRequestAbstractStreamObserver {
 
@@ -37,15 +38,13 @@ public class GrpcRequestPropagationStreamObserver extends 
GrpcRequestAbstractStr
     @Override
     public void onNext(Object request) {
         CountDownLatch latch = new CountDownLatch(1);
-        Object responseBody = null;
+        Object responseBody;
 
         exchange = endpoint.createExchange();
         exchange.getIn().setBody(request);
         exchange.getIn().setHeaders(headers);
 
-        consumer.process(exchange, doneSync -> {
-            latch.countDown();
-        });
+        consumer.process(exchange, doneSync -> latch.countDown());
 
         try {
             latch.await();
@@ -62,8 +61,6 @@ public class GrpcRequestPropagationStreamObserver extends 
GrpcRequestAbstractStr
             } else {
                 responseObserver.onNext(responseBody);
             }
-            responseObserver.onCompleted();
-
         } catch (InterruptedException e) {
             responseObserver.onError(e);
         }

Reply via email to