CAMEL-11237: camel-grpc - Add a grpc consumer

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e72fc444
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e72fc444
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e72fc444

Branch: refs/heads/master
Commit: e72fc4443700c04e30f625026c6d5d5c9e6cef83
Parents: ed85770
Author: Dmitry Volodin <dmvo...@gmail.com>
Authored: Wed May 17 23:43:47 2017 +0300
Committer: Dmitry Volodin <dmvo...@gmail.com>
Committed: Mon May 22 16:42:58 2017 +0300

----------------------------------------------------------------------
 components/camel-grpc/pom.xml                   |  12 +
 .../src/main/docs/grpc-component.adoc           |   9 +-
 .../camel/component/grpc/GrpcConfiguration.java |   6 +-
 .../camel/component/grpc/GrpcConstants.java     |  12 +
 .../camel/component/grpc/GrpcConsumer.java      | 138 +++++++++++
 .../camel/component/grpc/GrpcEndpoint.java      |   6 +-
 .../apache/camel/component/grpc/GrpcUtils.java  |  14 ++
 .../grpc/server/GrpcHeaderInterceptor.java      |  45 ++++
 .../grpc/server/GrpcMethodHandler.java          |  88 +++++++
 .../GrpcRequestAbstractStreamObserver.java      |  46 ++++
 .../GrpcRequestAggregationStreamObserver.java   |  68 +++++
 .../GrpcRequestPropagationStreamObserver.java   |  57 +++++
 .../grpc/GrpcConsumerConcurrentTest.java        | 192 ++++++++++++++
 .../camel/component/grpc/GrpcConsumerTest.java  | 248 +++++++++++++++++++
 14 files changed, 932 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-grpc/pom.xml b/components/camel-grpc/pom.xml
index 8614d3d..37aff72 100644
--- a/components/camel-grpc/pom.xml
+++ b/components/camel-grpc/pom.xml
@@ -63,6 +63,12 @@
       <groupId>org.springframework</groupId>
       <artifactId>spring-core</artifactId>
     </dependency>
+    
+    <dependency>
+      <groupId>org.javassist</groupId>
+      <artifactId>javassist</artifactId>
+      <version>3.20.0-GA</version>
+    </dependency>
 
     <!-- for testing -->
     <dependency>
@@ -70,6 +76,12 @@
       <artifactId>camel-test</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.googlecode.junit-toolbox</groupId>
+      <artifactId>junit-toolbox</artifactId>
+      <version>2.3</version>
+      <scope>test</scope>
+    </dependency>
 
     <!-- logging -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/docs/grpc-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-grpc/src/main/docs/grpc-component.adoc 
b/components/camel-grpc/src/main/docs/grpc-component.adoc
index a642263..7c53b9a 100644
--- a/components/camel-grpc/src/main/docs/grpc-component.adoc
+++ b/components/camel-grpc/src/main/docs/grpc-component.adoc
@@ -47,14 +47,17 @@ with the following path and query parameters:
 | **service** | *Required* Fully qualified service name from the protocol 
buffer descriptor file (package dot service definition name) |  | String
 |=======================================================================
 
-#### Query Parameters (6 parameters):
+#### Query Parameters (9 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
 | Name | Description | Default | Type
-| **host** (producer) | The gRPC server host name |  | String
+| **host** (common) | The gRPC server host name |  | String
+| **port** (common) | The gRPC server port |  | int
+| **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages or the likes will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| **exceptionHandler** (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
+| **exchangePattern** (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | **method** (producer) | gRPC method name |  | String
-| **port** (producer) | The gRPC server port |  | int
 | **target** (producer) | The channel target name as alternative to host and 
port parameters |  | String
 | **usePlainText** (producer) | The plaintext connection to the server flag | 
true | Boolean
 | **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
index 8909419..01d94b6 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
@@ -27,15 +27,15 @@ public class GrpcConfiguration {
     @UriPath
     @Metadata(required = "true")
     private String service;
-    @UriParam
+    @UriParam(label = "producer")
     private String method;
     @UriParam
     private String host;
     @UriParam
     private int port;
-    @UriParam
+    @UriParam(label = "producer")
     private String target;
-    @UriParam(defaultValue = "true")
+    @UriParam(label = "producer", defaultValue = "true")
     private Boolean usePlainText = true;
 
     private String serviceName;

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
index 44051a3..f61f3a8 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
@@ -22,7 +22,19 @@ package org.apache.camel.component.grpc;
 public interface GrpcConstants {
 
     String GRPC_SERVICE_CLASS_POSTFIX = "Grpc";
+    String GRPC_SERVER_IMPL_POSTFIX = "ImplBase";
     String GRPC_SERVICE_SYNC_STUB_METHOD = "newBlockingStub";
     String GRPC_SERVICE_ASYNC_STUB_METHOD = "newStub";
     String GRPC_SERVICE_FUTURE_STUB_METHOD = "newFutureStub";
+    
+    /*
+     * This headers will be set after gRPC consumer method is invoked
+     */
+    String GRPC_METHOD_NAME_HEADER = "CamelGrpcMethodName";
+    String GRPC_USER_AGENT_HEADER = "CamelGrpcUserAgent";
+    String GRPC_EVENT_TYPE_HEADER = "CamelGrpcEventType";
+    
+    String GRPC_EVENT_TYPE_ON_NEXT = "onNext";
+    String GRPC_EVENT_TYPE_ON_ERROR = "onError";
+    String GRPC_EVENT_TYPE_ON_COMPLETED = "onCompleted";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
new file mode 100644
index 0000000..29a2ada
--- /dev/null
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc;
+
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetSocketAddress;
+
+import io.grpc.BindableService;
+import io.grpc.Server;
+import io.grpc.ServerInterceptor;
+import io.grpc.ServerInterceptors;
+import io.grpc.netty.NettyServerBuilder;
+import javassist.util.proxy.MethodHandler;
+import javassist.util.proxy.ProxyFactory;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.grpc.server.GrpcHeaderInterceptor;
+import org.apache.camel.component.grpc.server.GrpcMethodHandler;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents gRPC server consumer implementation
+ */
+public class GrpcConsumer extends DefaultConsumer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GrpcConsumer.class);
+
+    protected final GrpcConfiguration configuration;
+    protected final GrpcEndpoint endpoint;
+
+    private Server server;
+
+    public GrpcConsumer(GrpcEndpoint endpoint, Processor processor, 
GrpcConfiguration configuration) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+        this.configuration = configuration;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (server == null) {
+            LOG.info("Starting the gRPC server");
+            initializeServer();
+            server.start();
+            LOG.info("gRPC server started and listening on port: " + 
server.getPort());
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (server != null) {
+            LOG.trace("Terminating gRPC server");
+            server.shutdown().shutdownNow();
+            server = null;
+        }
+        super.doStop();
+    }
+
+    protected void initializeServer() {
+        NettyServerBuilder serverBuilder = null;
+        BindableService bindableService = null;
+        ProxyFactory serviceProxy = new ProxyFactory();
+        ServerInterceptor headerInterceptor = new GrpcHeaderInterceptor();
+        MethodHandler methodHandler = new GrpcMethodHandler(endpoint, this);
+        
+        
serviceProxy.setSuperclass(GrpcUtils.constructGrpcImplBaseClass(configuration.getServicePackage(),
 configuration.getServiceName()));
+        try {
+            bindableService = (BindableService)serviceProxy.create(new 
Class<?>[0], new Object[0], methodHandler);
+        } catch (NoSuchMethodException | IllegalArgumentException | 
InstantiationException | IllegalAccessException | InvocationTargetException e) {
+            throw new IllegalArgumentException("Unable to create bindable 
proxy service for " + configuration.getService());
+        }
+        
+        if (!ObjectHelper.isEmpty(configuration.getHost()) && 
!ObjectHelper.isEmpty(configuration.getPort())) {
+            LOG.info("Building gRPC server on " + configuration.getHost() + 
":" + configuration.getPort());
+            serverBuilder = NettyServerBuilder.forAddress(new 
InetSocketAddress(configuration.getHost(), configuration.getPort()));
+        } else if (ObjectHelper.isEmpty(configuration.getHost()) && 
!ObjectHelper.isEmpty(configuration.getPort())) {
+            LOG.info("Building gRPC server on <any address>" + ":" + 
configuration.getPort());
+            serverBuilder = 
NettyServerBuilder.forPort(configuration.getPort());
+        } else {
+            throw new IllegalArgumentException("No server start properties 
(host, port) specified");
+        }
+        
+        server = 
serverBuilder.addService(ServerInterceptors.intercept(bindableService, 
headerInterceptor)).build();
+    }
+    
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, 
GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT);
+        return doSend(exchange, callback);
+    }
+    
+    public void onCompleted(Exchange exchange) {
+        exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, 
GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
+        doSend(exchange, done -> {
+        });
+    }
+
+    public void onError(Exchange exchange, Throwable error) {
+        exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, 
GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR);
+        exchange.getIn().setBody(error);
+        
+        doSend(exchange, done -> {
+        });
+    }
+        
+    private boolean doSend(Exchange exchange, AsyncCallback callback) {
+        if (this.isRunAllowed()) {
+            this.getAsyncProcessor().process(exchange, doneSync -> {
+                if (exchange.getException() != null) {
+                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
+                }
+                callback.done(doneSync);
+            });
+            return false;
+        } else {
+            LOG.warn("Consumer not ready to process exchanges. The exchange {} 
will be discarded", exchange);
+            callback.done(true);
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java
index 61f0e22..1f6010c 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java
@@ -25,9 +25,9 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 
 /**
- * The gRPC component is using for calling remote procedures via HTTP/2
+ * The gRPC component allows to call and expose remote procedures via HTTP/2 
with protobuf dataformat
  */
-@UriEndpoint(firstVersion = "2.19.0", scheme = "grpc", title = "gRPC", syntax 
= "grpc:service", producerOnly = true, label = "rpc")
+@UriEndpoint(firstVersion = "2.19.0", scheme = "grpc", title = "gRPC", syntax 
= "grpc:service", label = "rpc")
 public class GrpcEndpoint extends DefaultEndpoint {
     @UriParam
     protected final GrpcConfiguration configuration;
@@ -47,7 +47,7 @@ public class GrpcEndpoint extends DefaultEndpoint {
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        throw new UnsupportedOperationException("Cannot consume from a gRPC 
endpoint: " + getEndpointUri());
+        return new GrpcConsumer(this, processor, configuration);
     }
 
     public boolean isSingleton() {

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java
index 5ff1a88..96c79f5 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java
@@ -69,6 +69,20 @@ public final class GrpcUtils {
         return grpcBlockingStub;
     }
     
+    @SuppressWarnings("rawtypes")
+    public static Class constructGrpcImplBaseClass(String packageName, String 
serviceName) {
+        Class grpcServerImpl;
+
+        String serverBaseImpl = packageName + "." + serviceName + 
GrpcConstants.GRPC_SERVICE_CLASS_POSTFIX + "$" + serviceName + 
GrpcConstants.GRPC_SERVER_IMPL_POSTFIX;
+        try {
+            grpcServerImpl = Class.forName(serverBaseImpl);
+
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("gRPC server base class not 
found: " + serverBaseImpl);
+        }
+        return grpcServerImpl;
+    }
+
     @SuppressWarnings({"rawtypes", "unchecked"})
     public static void invokeAsyncMethod(Object asyncStubClass, String 
invokeMethod, Object request, StreamObserver responseObserver) {
         Class[] paramMethod = null;

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcHeaderInterceptor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcHeaderInterceptor.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcHeaderInterceptor.java
new file mode 100644
index 0000000..1ce582d
--- /dev/null
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcHeaderInterceptor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc.server;
+
+import io.grpc.Context;
+import io.grpc.Contexts;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.internal.GrpcUtil;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.grpc.GrpcConstants;
+
+/**
+ * gRPC server header interceptor
+ */
+public class GrpcHeaderInterceptor implements ServerInterceptor {
+    public static final Context.Key<String> USER_AGENT_CONTEXT_KEY = 
Context.key(GrpcConstants.GRPC_USER_AGENT_HEADER);
+    public static final Context.Key<String> CONTENT_TYPE_CONTEXT_KEY = 
Context.key(Exchange.CONTENT_TYPE);
+
+    @Override
+    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> 
call, Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) {
+        Context context = Context.current()
+            .withValue(USER_AGENT_CONTEXT_KEY, 
requestHeaders.get(GrpcUtil.USER_AGENT_KEY))
+            .withValue(CONTENT_TYPE_CONTEXT_KEY, 
requestHeaders.get(GrpcUtil.CONTENT_TYPE_KEY));
+
+        return Contexts.interceptCall(context, call, requestHeaders, next);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
new file mode 100644
index 0000000..2ed83cc
--- /dev/null
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc.server;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.grpc.stub.StreamObserver;
+import javassist.util.proxy.MethodHandler;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.grpc.GrpcConstants;
+import org.apache.camel.component.grpc.GrpcConsumer;
+import org.apache.camel.component.grpc.GrpcEndpoint;
+
+/**
+ * gRPC server method invocation handler
+ */
+public class GrpcMethodHandler implements MethodHandler {
+    private final GrpcEndpoint endpoint;
+    private final GrpcConsumer consumer;
+
+    public GrpcMethodHandler(GrpcEndpoint endpoint, GrpcConsumer consumer) {
+        this.endpoint = endpoint;
+        this.consumer = consumer;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Object invoke(Object self, Method thisMethod, Method proceed, 
Object[] args) throws Throwable {
+        Map<String, Object> grcpHeaders = new HashMap<String, Object>();
+        
+        
grcpHeaders.put(GrpcHeaderInterceptor.USER_AGENT_CONTEXT_KEY.toString(), 
GrpcHeaderInterceptor.USER_AGENT_CONTEXT_KEY.get());
+        
grcpHeaders.put(GrpcHeaderInterceptor.CONTENT_TYPE_CONTEXT_KEY.toString(), 
GrpcHeaderInterceptor.CONTENT_TYPE_CONTEXT_KEY.get());
+        grcpHeaders.put(GrpcConstants.GRPC_METHOD_NAME_HEADER, 
thisMethod.getName());
+        
+        // Determines that the incoming parameters are transmitted in 
synchronous mode
+        // Two incoming parameters and second is instance of the 
io.grpc.stub.StreamObserver
+        if (args.length == 2 && args[1] instanceof StreamObserver) {
+            Exchange exchange = endpoint.createExchange();
+            exchange.getIn().setBody(args[0]);
+            exchange.getIn().setHeaders(grcpHeaders);
+
+            if (endpoint.isSynchronous()) {
+                consumer.getProcessor().process(exchange);
+            } else {
+                consumer.getAsyncProcessor().process(exchange);
+            }
+            
+            StreamObserver<Object> responseObserver = 
(StreamObserver<Object>)args[1];
+            Object responseBody = exchange.getIn().getBody();
+            if (responseBody instanceof List) {
+                List<Object> responseList = (List<Object>)responseBody;
+                responseList.forEach((responseItem) -> {
+                    responseObserver.onNext(responseItem);
+                });
+            } else {
+                responseObserver.onNext(responseBody);
+            }
+            responseObserver.onCompleted();
+        } else if (args.length == 1 && args[0] instanceof StreamObserver) {
+            // Single incoming parameter is instance of the 
io.grpc.stub.StreamObserver
+            final StreamObserver<Object> responseObserver = 
(StreamObserver<Object>)args[0];
+            final StreamObserver<Object> requestObserver = new 
GrpcRequestAggregationStreamObserver(endpoint, consumer, responseObserver, 
grcpHeaders);
+            
+            return requestObserver;
+        } else {
+            throw new IllegalArgumentException("Invalid to process gRPC 
method: " + thisMethod.getName());
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java
new file mode 100644
index 0000000..b47e724
--- /dev/null
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc.server;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.grpc.GrpcConsumer;
+import org.apache.camel.component.grpc.GrpcEndpoint;
+
+/**
+ * gRPC request abstract stream observer is the base class for other stream
+ * observer implementations
+ */
+public abstract class GrpcRequestAbstractStreamObserver implements 
StreamObserver<Object> {
+    protected final GrpcEndpoint endpoint;
+    protected final GrpcConsumer consumer;
+    protected Exchange exchange;
+    protected List<Object> requestList = new LinkedList<>();
+    protected StreamObserver<Object> responseObserver;
+    protected Map<String, Object> headers;
+
+    public GrpcRequestAbstractStreamObserver(GrpcEndpoint endpoint, 
GrpcConsumer consumer, StreamObserver<Object> responseObserver, Map<String, 
Object> headers) {
+        this.endpoint = endpoint;
+        this.consumer = consumer;
+        this.responseObserver = responseObserver;
+        this.headers = headers;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
new file mode 100644
index 0000000..8f7ff6a
--- /dev/null
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc.server;
+
+import java.util.List;
+import java.util.Map;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.component.grpc.GrpcConsumer;
+import org.apache.camel.component.grpc.GrpcEndpoint;
+
+/**
+ * gRPC request stream observer which is collecting received objects every
+ * onNext() call into the list and processing them in onCompleted()
+ */
+public class GrpcRequestAggregationStreamObserver extends 
GrpcRequestAbstractStreamObserver {
+
+    public GrpcRequestAggregationStreamObserver(GrpcEndpoint endpoint, 
GrpcConsumer consumer, StreamObserver<Object> responseObserver, Map<String, 
Object> headers) {
+        super(endpoint, consumer, responseObserver, headers);
+    }
+
+    @Override
+    public void onNext(Object request) {
+        requestList.add(request);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+        exchange.setException(t);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void onCompleted() {
+        exchange = endpoint.createExchange();
+
+        exchange.getIn().setBody(requestList);
+        exchange.getIn().setHeaders(headers);
+
+        consumer.process(exchange, doneSync -> {
+        });
+
+        Object responseBody = exchange.getIn().getBody();
+        if (responseBody instanceof List) {
+            List<Object> responseList = (List<Object>)responseBody;
+            responseList.forEach((responseItem) -> {
+                responseObserver.onNext(responseItem);
+            });
+        } else {
+            responseObserver.onNext(responseBody);
+        }
+        responseObserver.onCompleted();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
new file mode 100644
index 0000000..da3baff
--- /dev/null
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc.server;
+
+import java.util.Map;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.component.grpc.GrpcConsumer;
+import org.apache.camel.component.grpc.GrpcEndpoint;
+
+/**
+ * gRPC request stream observer which is propagating every onNext(), onError()
+ * or onCompleted() calls to the Camel route
+ */
+public class GrpcRequestPropagationStreamObserver extends 
GrpcRequestAbstractStreamObserver {
+
+    public GrpcRequestPropagationStreamObserver(GrpcEndpoint endpoint, 
GrpcConsumer consumer, StreamObserver<Object> responseObserver, Map<String, 
Object> headers) {
+        super(endpoint, consumer, responseObserver, headers);
+    }
+
+    @Override
+    public void onNext(Object request) {
+        exchange = endpoint.createExchange();
+        exchange.getIn().setBody(request);
+        consumer.process(exchange, doneSync -> {
+        });
+        responseObserver.onNext(exchange.getOut().getBody());
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+        exchange = endpoint.createExchange();
+        consumer.onError(exchange, throwable);
+        responseObserver.onError(throwable);
+    }
+
+    @Override
+    public void onCompleted() {
+        exchange = endpoint.createExchange();
+        consumer.onCompleted(exchange);
+        responseObserver.onCompleted();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java
 
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java
new file mode 100644
index 0000000..b5febee
--- /dev/null
+++ 
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.googlecode.junittoolbox.MultithreadingTester;
+import com.googlecode.junittoolbox.RunnableAssert;
+
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GrpcConsumerConcurrentTest extends CamelTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GrpcConsumerConcurrentTest.class);
+
+    private static final int GRPC_ASYNC_REQUEST_TEST_PORT = 
AvailablePortFinder.getNextAvailable();
+    private static final int GRPC_HEADERS_TEST_PORT = 
AvailablePortFinder.getNextAvailable();
+    private static final int CONCURRENT_THREAD_COUNT = 300;
+    private static final int ROUNDS_PER_THREAD_COUNT = 10;
+    private static final String GRPC_TEST_PING_VALUE = "PING";
+    private static final String GRPC_TEST_PONG_VALUE = "PONG";
+    private static final String GRPC_USER_AGENT_PREFIX = "user-agent-";
+    private static AtomicInteger idCounter = new AtomicInteger();
+    
+    public static Integer createId() {
+        return idCounter.getAndIncrement();
+    }
+    
+    public static Integer getId() {
+        return idCounter.get();
+    }
+    
+    @Test
+    public void testAsyncWithConcurrentThreads() throws Exception {
+        RunnableAssert ra = new RunnableAssert("foo") {
+            
+            @Override
+            public void run() {
+                final CountDownLatch latch = new CountDownLatch(1);
+                ManagedChannel asyncRequestChannel = 
NettyChannelBuilder.forAddress("localhost", 
GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build();
+                PingPongGrpc.PingPongStub asyncNonBlockingStub = 
PingPongGrpc.newStub(asyncRequestChannel);
+                
+                PongResponseStreamObserver responseObserver = new 
PongResponseStreamObserver(latch);                
+                int instanceId = createId();
+                
+                final PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
+                StreamObserver<PingRequest> requestObserver = 
asyncNonBlockingStub.pingAsyncAsync(responseObserver);
+                requestObserver.onNext(pingRequest);
+                requestObserver.onNext(pingRequest);
+                requestObserver.onCompleted();
+                try {
+                    latch.await(5, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                
+                PongResponse pongResponse = responseObserver.getPongResponse();
+                
+                assertNotNull("instanceId = " + instanceId, pongResponse);
+                assertEquals(instanceId, pongResponse.getPongId());
+                assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, 
pongResponse.getPongName());
+                
+                asyncRequestChannel.shutdown().shutdownNow();
+            }
+        };
+        
+        new 
MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT).run();
+    }
+    
+    @Test
+    public void testHeadersWithConcurrentThreads() throws Exception {
+        RunnableAssert ra = new RunnableAssert("foo") {
+            
+            @Override
+            public void run() {
+                int instanceId = createId();
+                final CountDownLatch latch = new CountDownLatch(1);
+                ManagedChannel asyncRequestChannel = 
NettyChannelBuilder.forAddress("localhost", 
GRPC_HEADERS_TEST_PORT).userAgent(GRPC_USER_AGENT_PREFIX + 
instanceId).usePlaintext(true).build();
+                PingPongGrpc.PingPongStub asyncNonBlockingStub = 
PingPongGrpc.newStub(asyncRequestChannel);
+                
+                PongResponseStreamObserver responseObserver = new 
PongResponseStreamObserver(latch);                
+                
+                final PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
+                StreamObserver<PingRequest> requestObserver = 
asyncNonBlockingStub.pingAsyncAsync(responseObserver);
+                requestObserver.onNext(pingRequest);
+                requestObserver.onNext(pingRequest);
+                requestObserver.onCompleted();
+                try {
+                    latch.await(5, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                
+                PongResponse pongResponse = responseObserver.getPongResponse();
+                
+                assertNotNull("instanceId = " + instanceId, pongResponse);
+                assertEquals(instanceId, pongResponse.getPongId());
+                assertEquals(GRPC_USER_AGENT_PREFIX + instanceId, 
pongResponse.getPongName());
+                
+                asyncRequestChannel.shutdown().shutdownNow();
+            }
+        };
+        
+        new 
MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT).run();
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {                
+                
from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port="
 + GRPC_ASYNC_REQUEST_TEST_PORT).bean(new GrpcMessageBuilder(), 
"buildAsyncPongResponse");
+                
from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port="
 + GRPC_HEADERS_TEST_PORT).process(new HeaderExchangeProcessor());
+            }
+        };
+    }
+    
+    public class PongResponseStreamObserver implements 
StreamObserver<PongResponse> {
+        private PongResponse pongResponse;
+        private final CountDownLatch latch;
+        
+        public PongResponseStreamObserver(CountDownLatch latch) {
+            this.latch = latch;
+        }
+        
+        public PongResponse getPongResponse() {
+            return pongResponse;
+        }
+
+        @Override
+        public void onNext(PongResponse value) {
+            pongResponse = value;
+        }
+
+        @Override
+        public void onError(Throwable t) {
+            LOG.info("Exception", t);
+            latch.countDown();
+        }
+
+        @Override
+        public void onCompleted() {
+            latch.countDown();
+        }
+    }
+    
+    public class GrpcMessageBuilder {
+        public PongResponse buildAsyncPongResponse(List<PingRequest> 
pingRequests) {
+            PongResponse pongResponse = 
PongResponse.newBuilder().setPongName(pingRequests.get(0).getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(pingRequests.get(0).getPingId()).build();
+            return pongResponse;
+        }
+    }
+    
+    public class HeaderExchangeProcessor implements Processor {
+        
+        @SuppressWarnings("unchecked")
+        public void process(Exchange exchange) throws Exception {
+            List<PingRequest> pingRequests = 
(List<PingRequest>)exchange.getIn().getBody();
+            String userAgentName = 
(String)exchange.getIn().getHeader(GrpcConstants.GRPC_USER_AGENT_HEADER);
+            
+            // As user agent name is prepended the library's user agent 
information it's necessary to extract this value (before first space)
+            PongResponse pongResponse = 
PongResponse.newBuilder().setPongName(userAgentName.substring(0, 
userAgentName.indexOf(' '))).setPongId(pingRequests.get(0).getPingId()).build();
+            exchange.getIn().setBody(pongResponse);
+        }
+      }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java
 
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java
new file mode 100644
index 0000000..1ef9808
--- /dev/null
+++ 
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GrpcConsumerTest extends CamelTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GrpcConsumerTest.class);
+
+    private static final int GRPC_SYNC_REQUEST_TEST_PORT = 
AvailablePortFinder.getNextAvailable();
+    private static final int GRPC_ASYNC_REQUEST_TEST_PORT = 
AvailablePortFinder.getNextAvailable();
+    private static final int GRPC_TEST_PING_ID = 1;
+    private static final String GRPC_TEST_PING_VALUE = "PING";
+    private static final String GRPC_TEST_PONG_VALUE = "PONG";
+
+    private ManagedChannel syncRequestChannel;
+    private ManagedChannel asyncRequestChannel;
+    private PingPongGrpc.PingPongBlockingStub blockingStub;
+    private PingPongGrpc.PingPongStub nonBlockingStub;
+    private PingPongGrpc.PingPongStub asyncNonBlockingStub;
+    
+    private PongResponse pongResponse;
+    
+    @Before
+    public void startGrpcChannels() {
+        syncRequestChannel = ManagedChannelBuilder.forAddress("localhost", 
GRPC_SYNC_REQUEST_TEST_PORT).usePlaintext(true).build();
+        asyncRequestChannel = ManagedChannelBuilder.forAddress("localhost", 
GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build();
+        blockingStub = PingPongGrpc.newBlockingStub(syncRequestChannel);
+        nonBlockingStub = PingPongGrpc.newStub(syncRequestChannel);
+        asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);
+    }
+    
+    @After
+    public void stopGrpcChannels() {
+        syncRequestChannel.shutdown().shutdownNow();
+        asyncRequestChannel.shutdown().shutdownNow();
+    }
+    
+    @Test
+    public void testSyncSyncMethodInSync() throws Exception {
+        LOG.info("gRPC pingSyncSync method blocking test start");
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        PongResponse pongResponse = blockingStub.pingSyncSync(pingRequest);
+        
+        assertNotNull(pongResponse);
+        assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+        assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, 
pongResponse.getPongName());
+    }
+    
+    @Test
+    public void testSyncAsyncMethodInSync() throws Exception {
+        LOG.info("gRPC pingSyncAsync method blocking test start");
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        Iterator<PongResponse> pongResponseIter = 
blockingStub.pingSyncAsync(pingRequest);
+        while (pongResponseIter.hasNext()) {
+            PongResponse pongResponse = pongResponseIter.next();
+            assertNotNull(pongResponse);
+            assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+            assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, 
pongResponse.getPongName());
+        }
+    }
+    
+    @Test
+    public void testSyncSyncMethodInAsync() throws Exception {
+        LOG.info("gRPC pingSyncSync method aync test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        
+        StreamObserver<PongResponse> responseObserver = new 
StreamObserver<PongResponse>() {
+
+            @Override
+            public void onNext(PongResponse value) {
+                pongResponse = value;
+            }
+
+            @Override
+            public void onError(Throwable t) {
+            }
+
+            @Override
+            public void onCompleted() {
+                latch.countDown();
+            }
+            
+        };
+        
+        nonBlockingStub.pingSyncSync(pingRequest, responseObserver);
+        latch.await(1, TimeUnit.SECONDS);
+        
+        assertNotNull(pongResponse);
+        assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+        assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, 
pongResponse.getPongName());
+    }
+    
+    @Test
+    public void testSyncAsyncMethodInAsync() throws Exception {
+        LOG.info("gRPC pingSyncAsync method aync test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        
+        StreamObserver<PongResponse> responseObserver = new 
StreamObserver<PongResponse>() {
+
+            @Override
+            public void onNext(PongResponse value) {
+                pongResponse = value;
+            }
+
+            @Override
+            public void onError(Throwable t) {
+            }
+
+            @Override
+            public void onCompleted() {
+                latch.countDown();
+            }
+            
+        };
+        
+        nonBlockingStub.pingSyncAsync(pingRequest, responseObserver);
+        latch.await(1, TimeUnit.SECONDS);
+        
+        assertNotNull(pongResponse);
+        assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+        assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, 
pongResponse.getPongName());
+    }
+
+    @Test
+    public void testAsyncSyncMethodInAsync() throws Exception {
+        LOG.info("gRPC pingAsyncSync method aync test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        
+        StreamObserver<PongResponse> responseObserver = new 
StreamObserver<PongResponse>() {
+
+            @Override
+            public void onNext(PongResponse value) {
+                pongResponse = value;
+            }
+
+            @Override
+            public void onError(Throwable t) {
+            }
+
+            @Override
+            public void onCompleted() {
+                latch.countDown();
+            }
+            
+        };
+        
+        StreamObserver<PingRequest> requestObserver = 
asyncNonBlockingStub.pingAsyncSync(responseObserver);
+        requestObserver.onNext(pingRequest);
+        requestObserver.onNext(pingRequest);
+        requestObserver.onCompleted();
+        latch.await(10, TimeUnit.SECONDS);
+        
+        assertNotNull(pongResponse);
+        assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+        assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, 
pongResponse.getPongName());
+    }
+    
+    @Test
+    public void testAsyncAsyncMethodInAsync() throws Exception {
+        LOG.info("gRPC pingAsyncAsync method aync test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        
+        StreamObserver<PongResponse> responseObserver = new 
StreamObserver<PongResponse>() {
+
+            @Override
+            public void onNext(PongResponse value) {
+                pongResponse = value;
+            }
+
+            @Override
+            public void onError(Throwable t) {
+            }
+
+            @Override
+            public void onCompleted() {
+                latch.countDown();
+            }
+            
+        };
+        
+        StreamObserver<PingRequest> requestObserver = 
asyncNonBlockingStub.pingAsyncAsync(responseObserver);
+        requestObserver.onNext(pingRequest);
+        requestObserver.onNext(pingRequest);
+        requestObserver.onCompleted();
+        latch.await(1000, TimeUnit.SECONDS);
+        
+        assertNotNull(pongResponse);
+        assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+        assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, 
pongResponse.getPongName());
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port="
 + GRPC_SYNC_REQUEST_TEST_PORT).bean(new GrpcMessageBuilder(), 
"buildPongResponse");
+                
from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port="
 + GRPC_ASYNC_REQUEST_TEST_PORT).bean(new GrpcMessageBuilder(), 
"buildAsyncPongResponse");
+            }
+        };
+    }
+    
+    public class GrpcMessageBuilder {
+        public PongResponse buildPongResponse(PingRequest pingRequest) {
+            PongResponse pongResponse = 
PongResponse.newBuilder().setPongName(pingRequest.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(pingRequest.getPingId()).build();
+            return pongResponse;
+        }
+        
+        public PongResponse buildAsyncPongResponse(List<PingRequest> 
pingRequests) {
+            PongResponse pongResponse = 
PongResponse.newBuilder().setPongName(pingRequests.get(0).getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(pingRequests.get(0).getPingId()).build();
+            return pongResponse;
+        }
+    }
+}

Reply via email to