This is an automated email from the ASF dual-hosted git repository. dmvolod pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 7fc8adc06b989c9a37f8f487f0dda67ecfb42ef6 Author: Rajasekhar <rajasekh...@live.com> AuthorDate: Sun Oct 25 22:54:00 2020 +0530 camel-15754-Improve the propagation consumer strategy --- .../grpc/server/GrpcRequestPropagationStreamObserver.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java index 9dd7323..e4044eb 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java @@ -25,7 +25,8 @@ import org.apache.camel.component.grpc.GrpcConsumer; import org.apache.camel.component.grpc.GrpcEndpoint; /** - * gRPC request stream observer which is propagating every onNext(), onError() or onCompleted() calls to the Camel route + * This is the default consumer strategy for client-streaming and bi-directional streaming gRPC calls. gRPC request + * stream observer which is propagating every onNext(), onError() or onCompleted() calls to the Camel route. */ public class GrpcRequestPropagationStreamObserver extends GrpcRequestAbstractStreamObserver { @@ -37,15 +38,13 @@ public class GrpcRequestPropagationStreamObserver extends GrpcRequestAbstractStr @Override public void onNext(Object request) { CountDownLatch latch = new CountDownLatch(1); - Object responseBody = null; + Object responseBody; exchange = endpoint.createExchange(); exchange.getIn().setBody(request); exchange.getIn().setHeaders(headers); - consumer.process(exchange, doneSync -> { - latch.countDown(); - }); + consumer.process(exchange, doneSync -> latch.countDown()); try { latch.await(); @@ -62,8 +61,6 @@ public class GrpcRequestPropagationStreamObserver extends GrpcRequestAbstractStr } else { responseObserver.onNext(responseBody); } - responseObserver.onCompleted(); - } catch (InterruptedException e) { responseObserver.onError(e); }