Repository: camel
Updated Branches:
  refs/heads/camel-2.19.x bc186de09 -> 18ac65279


CAMEL-11288: camel-grpc producer incorrectly called async services

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/18ac6527
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/18ac6527
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/18ac6527

Branch: refs/heads/camel-2.19.x
Commit: 18ac6527911b65c54be4e72db14025c120dd30a6
Parents: bc186de
Author: Dmitry Volodin <dmvo...@gmail.com>
Authored: Tue May 16 19:09:09 2017 +0300
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed May 17 09:29:36 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/grpc-component.adoc           |  22 +-
 .../camel/component/grpc/GrpcComponent.java     |   5 +-
 .../camel/component/grpc/GrpcConstants.java     |   2 +-
 .../camel/component/grpc/GrpcProducer.java      |  41 +--
 .../apache/camel/component/grpc/GrpcUtils.java  |  52 ++--
 .../GrpcResponseAggregationStreamObserver.java  |  58 +++++
 .../component/grpc/GrpcProducerAsyncTest.java   | 258 +++++++++++++++++++
 .../component/grpc/GrpcProducerSyncTest.java    | 158 ++++++++++++
 .../camel/component/grpc/GrpcProducerTest.java  | 200 --------------
 .../camel-grpc/src/test/proto/pingpong.proto    |  14 +-
 10 files changed, 551 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/18ac6527/components/camel-grpc/src/main/docs/grpc-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-grpc/src/main/docs/grpc-component.adoc 
b/components/camel-grpc/src/main/docs/grpc-component.adoc
index 3f06f14..a642263 100644
--- a/components/camel-grpc/src/main/docs/grpc-component.adoc
+++ b/components/camel-grpc/src/main/docs/grpc-component.adoc
@@ -61,9 +61,25 @@ with the following path and query parameters:
 |=======================================================================
 // endpoint options: END
 
-### gRPC Resource Type Mapping
-In case of synchronous invocation, component expects method request class in 
Body and returns response class as out Body.
-In case of asynchronous invocation, component expects method request class in 
Body and returns java.util.List of the response class as out Body.
+### gRPC producer resource type mapping
+
+The table below shows the types of objects in the message body, depending on 
the types (simple or stream) of incoming and outgoing parameters, as well as 
the invocation style (synchronous or asynchronous). Please note, that 
invocation of the procedures with incoming stream parameter in asynchronous 
style are not allowed.
+
+[width="100%",cols="15%,15%,15%,25%,25%",options="header",]
+|=======================================================================
+|Invocation style |Request type|Response type|Request Body Type|Result Body 
Type
+
+|*synchronous*|simple|simple|Object|Object
+|*synchronous*|simple|stream|Object|List<Object>
+|synchronous|stream|simple|not allowed|not allowed
+|synchronous|stream|stream|not allowed|not allowed
+
+|*asynchronous*|simple|simple|Object|List<Object>
+|*asynchronous*|simple|stream|Object|List<Object>
+|*asynchronous*|stream|simple|Object or List<Object>|List<Object>
+|*asynchronous*|stream|stream|Object or List<Object>|List<Object>
+
+|=======================================================================
 
 ### Examples
 

http://git-wip-us.apache.org/repos/asf/camel/blob/18ac6527/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcComponent.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcComponent.java
index d77c916..5a64aec 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcComponent.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcComponent.java
@@ -20,6 +20,7 @@ import java.util.Map;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.DefaultComponent;
+import org.springframework.util.ObjectUtils;
 
 /**
  * Represents the component that manages {@link GrpcEndpoint}.
@@ -35,7 +36,9 @@ public class GrpcComponent extends DefaultComponent {
         config.setServicePackage(extractServicePackage(remaining));
         // Convert method name to the camel case style
         // This requires if method name as described inside .proto file 
directly
-        
config.setMethod(GrpcUtils.convertMethod2CamelCase(config.getMethod()));
+        if (!ObjectUtils.isEmpty(config.getMethod())) {
+            
config.setMethod(GrpcUtils.convertMethod2CamelCase(config.getMethod()));
+        }
 
         Endpoint endpoint = new GrpcEndpoint(uri, this, config);
         return endpoint;

http://git-wip-us.apache.org/repos/asf/camel/blob/18ac6527/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
index 4658650..44051a3 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
@@ -21,7 +21,7 @@ package org.apache.camel.component.grpc;
  */
 public interface GrpcConstants {
 
-    String GRPC_SERVICE_CLASS_PREFIX = "Grpc";
+    String GRPC_SERVICE_CLASS_POSTFIX = "Grpc";
     String GRPC_SERVICE_SYNC_STUB_METHOD = "newBlockingStub";
     String GRPC_SERVICE_ASYNC_STUB_METHOD = "newStub";
     String GRPC_SERVICE_FUTURE_STUB_METHOD = "newFutureStub";

http://git-wip-us.apache.org/repos/asf/camel/blob/18ac6527/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
index 8018802..01668fe 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
@@ -16,16 +16,13 @@
  */
 package org.apache.camel.component.grpc;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.stub.StreamObserver;
+import io.grpc.netty.NettyChannelBuilder;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import 
org.apache.camel.component.grpc.client.GrpcResponseAggregationStreamObserver;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -52,34 +49,8 @@ public class GrpcProducer extends DefaultProducer implements 
AsyncProcessor {
     public boolean process(Exchange exchange, AsyncCallback callback) {
         Message message = exchange.getIn();
 
-        StreamObserver<Object> asyncHandler = new StreamObserver<Object>() {
-
-            @SuppressWarnings("unchecked")
-            @Override
-            public void onNext(Object response) {
-                final Object currentBody = exchange.getOut().getBody();
-                List<Object> returnBody = new ArrayList<Object>();
-                if (currentBody instanceof List) {
-                    returnBody = (List<Object>)currentBody;
-                }
-                returnBody.add(response);
-                exchange.getOut().setBody(returnBody);
-            }
-
-            @Override
-            public void onError(Throwable t) {
-                exchange.setException(t);
-                callback.done(false);
-            }
-
-            @Override
-            public void onCompleted() {
-                exchange.getOut().setHeaders(exchange.getIn().getHeaders());
-                callback.done(false);
-            }
-        };
         try {
-            GrpcUtils.invokeAsyncMethod(grpcStub, configuration.getMethod(), 
message.getBody(), asyncHandler);
+            GrpcUtils.invokeAsyncMethod(grpcStub, configuration.getMethod(), 
message.getBody(), new GrpcResponseAggregationStreamObserver(exchange, 
callback));
         } catch (Exception e) {
             exchange.setException(e);
             callback.done(true);
@@ -122,14 +93,16 @@ public class GrpcProducer extends DefaultProducer 
implements AsyncProcessor {
     }
 
     protected void initializeChannel() {
+        NettyChannelBuilder channelBuilder = null;
         if (!ObjectHelper.isEmpty(configuration.getHost()) && 
!ObjectHelper.isEmpty(configuration.getPort())) {
             LOG.info("Creating channel to the remote gRPC server " + 
configuration.getHost() + ":" + configuration.getPort());
-            channel = 
ManagedChannelBuilder.forAddress(configuration.getHost(), 
configuration.getPort()).usePlaintext(configuration.getUsePlainText()).build();
+            channelBuilder = 
NettyChannelBuilder.forAddress(configuration.getHost(), 
configuration.getPort());
         } else if (!ObjectHelper.isEmpty(configuration.getTarget())) {
             LOG.info("Creating channel to the remote gRPC server " + 
configuration.getTarget());
-            channel = 
ManagedChannelBuilder.forTarget(configuration.getTarget()).usePlaintext(configuration.getUsePlainText()).build();
+            channelBuilder =  
NettyChannelBuilder.forTarget(configuration.getTarget());
         } else {
             throw new IllegalArgumentException("No connection properties 
(host, port or target) specified");
         }
+        channel = 
channelBuilder.usePlaintext(configuration.getUsePlainText()).build();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/18ac6527/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java
index 07bcda1..5ff1a88 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java
@@ -17,6 +17,9 @@
 package org.apache.camel.component.grpc;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
 import io.grpc.Channel;
 import io.grpc.stub.StreamObserver;
@@ -51,7 +54,7 @@ public final class GrpcUtils {
         paramChannel[0] = Channel.class;
         Object grpcBlockingStub = null;
 
-        String serviceClassName = packageName + "." + serviceName + 
GrpcConstants.GRPC_SERVICE_CLASS_PREFIX;
+        String serviceClassName = packageName + "." + serviceName + 
GrpcConstants.GRPC_SERVICE_CLASS_POSTFIX;
         try {
             Class grpcServiceClass = Class.forName(serviceClassName);
             Method grpcBlockingMethod = 
ReflectionUtils.findMethod(grpcServiceClass, stubMethod, paramChannel);
@@ -65,32 +68,49 @@ public final class GrpcUtils {
         }
         return grpcBlockingStub;
     }
-
-    @SuppressWarnings("rawtypes")
-    public static void invokeAsyncMethod(Object asyncStubClass, String 
invokeMethod, Object request, StreamObserver asyncHandler) {
-        Class[] paramMethod = new Class[2];
-        paramMethod[0] = request.getClass();
-        paramMethod[1] = StreamObserver.class;
+    
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static void invokeAsyncMethod(Object asyncStubClass, String 
invokeMethod, Object request, StreamObserver responseObserver) {
+        Class[] paramMethod = null;
 
         Method method = ReflectionUtils.findMethod(asyncStubClass.getClass(), 
invokeMethod, paramMethod);
         if (method == null) {
-            throw new IllegalArgumentException("gRPC service method not found: 
" + invokeMethod + " with parameter: " + request.getClass().getName());
+            throw new IllegalArgumentException("gRPC service method not found: 
" + asyncStubClass.getClass().getName() + "." + invokeMethod);
+        }
+        if (method.getReturnType().equals(StreamObserver.class)) {
+            StreamObserver<Object> requestObserver = 
(StreamObserver<Object>)ReflectionUtils.invokeMethod(method, asyncStubClass, 
responseObserver);
+            if (request instanceof List) {
+                List<Object> requestList = (List<Object>)request;
+                requestList.forEach((requestItem) -> {
+                    requestObserver.onNext(requestItem);
+                });
+            } else {
+                requestObserver.onNext(request);
+            }
+            requestObserver.onCompleted();
+        } else {
+            ReflectionUtils.invokeMethod(method, asyncStubClass, request, 
responseObserver);
         }
-        ReflectionUtils.invokeMethod(method, asyncStubClass, request, 
asyncHandler);
     }
 
-    @SuppressWarnings("rawtypes")
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public static Object invokeSyncMethod(Object blockingStubClass, String 
invokeMethod, Object request) {
-        Class[] paramMethod = new Class[1];
-        paramMethod[0] = request.getClass();
-        Object responseObject = null;
+        Class[] paramMethod = null;
 
         Method method = 
ReflectionUtils.findMethod(blockingStubClass.getClass(), invokeMethod, 
paramMethod);
         if (method == null) {
-            throw new IllegalArgumentException("gRPC service method not found: 
" + invokeMethod + " with parameter: " + request.getClass().getName());
+            throw new IllegalArgumentException("gRPC service method not found: 
" + blockingStubClass.getClass().getName() + "." + invokeMethod);
+        }
+        if (method.getReturnType().equals(Iterator.class)) {
+            Iterator<Object> responseObjects = 
(Iterator<Object>)ReflectionUtils.invokeMethod(method, blockingStubClass, 
request);
+            List<Object> objectList = new ArrayList<Object>();
+            while (responseObjects.hasNext()) {
+                objectList.add(responseObjects.next());
+            }
+            return objectList;
+        } else {
+            return ReflectionUtils.invokeMethod(method, blockingStubClass, 
request);
         }
-        responseObject = ReflectionUtils.invokeMethod(method, 
blockingStubClass, request);
-        return responseObject;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/18ac6527/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseAggregationStreamObserver.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseAggregationStreamObserver.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseAggregationStreamObserver.java
new file mode 100644
index 0000000..d12bcc0
--- /dev/null
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseAggregationStreamObserver.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc.client;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/*
+ * gRPC response stream observer which is collecting response objects every
+ * onNext() call into the list and setting them inside Body when onCompleted() 
invoked
+ */
+public class GrpcResponseAggregationStreamObserver implements 
StreamObserver<Object> {
+    private final Exchange exchange;
+    private final AsyncCallback callback;
+    private List<Object> responseCollection = new LinkedList<Object>();
+    
+    public GrpcResponseAggregationStreamObserver(Exchange exchange, 
AsyncCallback callback) {
+        this.exchange = exchange;
+        this.callback = callback;
+    }
+
+    @Override
+    public void onNext(Object response) {
+        responseCollection.add(response);
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+        exchange.setException(throwable);
+        callback.done(false);
+    }
+
+    @Override
+    public void onCompleted() {
+        exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+        exchange.getOut().setBody(responseCollection);
+        callback.done(false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/18ac6527/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerAsyncTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerAsyncTest.java
 
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerAsyncTest.java
new file mode 100644
index 0000000..fa7156d
--- /dev/null
+++ 
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerAsyncTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GrpcProducerAsyncTest extends CamelTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GrpcProducerAsyncTest.class);
+
+    private static final int GRPC_TEST_PORT = 
AvailablePortFinder.getNextAvailable();
+    private static final int GRPC_TEST_PING_ID = 1;
+    private static final int GRPC_TEST_PONG_ID01 = 1;
+    private static final int GRPC_TEST_PONG_ID02 = 2;
+    private static final String GRPC_TEST_PING_VALUE = "PING";
+    private static final String GRPC_TEST_PONG_VALUE = "PONG";
+
+    private static Server grpcServer;
+    private Object asyncPongResponse;
+
+    @BeforeClass
+    public static void startGrpcServer() throws Exception {
+        grpcServer = ServerBuilder.forPort(GRPC_TEST_PORT).addService(new 
PingPongImpl()).build().start();
+        LOG.info("gRPC server started on port " + GRPC_TEST_PORT);
+    }
+
+    @AfterClass
+    public static void stopGrpcServer() throws IOException {
+        if (grpcServer != null) {
+            grpcServer.shutdown();
+            LOG.info("gRPC server stoped");
+        }
+    }
+
+    @Test
+    public void testPingSyncSyncMethodInvocation() throws Exception {
+        LOG.info("gRPC PingSyncSync method test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+
+        // Testing sync service call with async style invocation
+        template.asyncCallbackSendBody("direct:grpc-sync-sync", pingRequest, 
new SynchronizationAdapter() {
+
+            @Override
+            public void onComplete(Exchange exchange) {
+                asyncPongResponse = exchange.getOut().getBody();
+                latch.countDown();
+            }
+        });
+        latch.await(1, TimeUnit.SECONDS);
+
+        assertNotNull(asyncPongResponse);
+        assertTrue(asyncPongResponse instanceof List);
+
+        @SuppressWarnings("unchecked")
+        List<PongResponse> asyncPongResponseList = 
(List<PongResponse>)asyncPongResponse;
+        assertEquals(1, asyncPongResponseList.size());
+        assertEquals(asyncPongResponseList.get(0).getPongId(), 
GRPC_TEST_PING_ID);
+        assertEquals(asyncPongResponseList.get(0).getPongName(), 
GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE);
+    }
+
+    @Test
+    public void testPingSyncAsyncMethodInvocation() throws Exception {
+        LOG.info("gRPC PingSyncAsync method test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+
+        // Testing async service call
+        template.asyncCallbackSendBody("direct:grpc-sync-async", pingRequest, 
new SynchronizationAdapter() {
+
+            @Override
+            public void onComplete(Exchange exchange) {
+                asyncPongResponse = exchange.getOut().getBody();
+                latch.countDown();
+            }
+        });
+        latch.await(1, TimeUnit.SECONDS);
+
+        assertNotNull(asyncPongResponse);
+        assertTrue(asyncPongResponse instanceof List);
+
+        @SuppressWarnings("unchecked")
+        List<PongResponse> asyncPongResponseList = 
(List<PongResponse>)asyncPongResponse;
+        assertEquals(2, asyncPongResponseList.size());
+        assertEquals(asyncPongResponseList.get(0).getPongId(), 
GRPC_TEST_PONG_ID01);
+        assertEquals(asyncPongResponseList.get(1).getPongId(), 
GRPC_TEST_PONG_ID02);
+        assertEquals(asyncPongResponseList.get(0).getPongName(), 
GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE);
+    }
+    
+    @Test
+    public void testPingAsyncSyncMethodInvocation() throws Exception {
+        LOG.info("gRPC PingAsyncSync method test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+
+        // Testing async service call with async style invocation
+        template.asyncCallbackSendBody("direct:grpc-async-sync", pingRequest, 
new SynchronizationAdapter() {
+
+            @Override
+            public void onComplete(Exchange exchange) {
+                asyncPongResponse = exchange.getOut().getBody();
+                latch.countDown();
+            }
+        });
+        latch.await(1, TimeUnit.SECONDS);
+
+        assertNotNull(asyncPongResponse);
+        assertTrue(asyncPongResponse instanceof List);
+
+        @SuppressWarnings("unchecked")
+        List<PongResponse> asyncPongResponseList = 
(List<PongResponse>)asyncPongResponse;
+        assertEquals(1, asyncPongResponseList.size());
+        assertEquals(asyncPongResponseList.get(0).getPongId(), 
GRPC_TEST_PING_ID);
+        assertEquals(asyncPongResponseList.get(0).getPongName(), 
GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE);
+    }
+    
+    @Test
+    public void testPingAsyncAsyncMethodInvocation() throws Exception {
+        LOG.info("gRPC PingAsyncAsync method test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+
+        // Testing async service call with async style invocation
+        template.asyncCallbackSendBody("direct:grpc-async-async", pingRequest, 
new SynchronizationAdapter() {
+
+            @Override
+            public void onComplete(Exchange exchange) {
+                asyncPongResponse = exchange.getOut().getBody();
+                latch.countDown();
+            }
+        });
+        latch.await(1, TimeUnit.SECONDS);
+
+        assertNotNull(asyncPongResponse);
+        assertTrue(asyncPongResponse instanceof List);
+
+        @SuppressWarnings("unchecked")
+        List<PongResponse> asyncPongResponseList = 
(List<PongResponse>)asyncPongResponse;
+        assertEquals(1, asyncPongResponseList.size());
+        assertEquals(asyncPongResponseList.get(0).getPongId(), 
GRPC_TEST_PING_ID);
+        assertEquals(asyncPongResponseList.get(0).getPongName(), 
GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("direct:grpc-sync-sync").to("grpc://org.apache.camel.component.grpc.PingPong?method=pingSyncSync&host=localhost&port="
 + GRPC_TEST_PORT);
+                
from("direct:grpc-sync-async").to("grpc://org.apache.camel.component.grpc.PingPong?method=pingSyncAsync&host=localhost&port="
 + GRPC_TEST_PORT);
+                
from("direct:grpc-async-sync").to("grpc://org.apache.camel.component.grpc.PingPong?method=pingAsyncSync&host=localhost&port="
 + GRPC_TEST_PORT);
+                
from("direct:grpc-async-async").to("grpc://org.apache.camel.component.grpc.PingPong?method=pingAsyncAsync&host=localhost&port="
 + GRPC_TEST_PORT);
+            }
+        };
+    }
+
+    /**
+     * Test gRPC PingPong server implementation
+     */
+    static class PingPongImpl extends PingPongGrpc.PingPongImplBase {
+        @Override
+        public void pingSyncSync(PingRequest request, 
StreamObserver<PongResponse> responseObserver) {
+            LOG.info("gRPC server received data from PingPong service 
PingId={} PingName={}", request.getPingId(), request.getPingName());
+            PongResponse response = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(request.getPingId()).build();
+            responseObserver.onNext(response);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void pingSyncAsync(PingRequest request, 
StreamObserver<PongResponse> responseObserver) {
+            LOG.info("gRPC server received data from PingAsyncResponse service 
PingId={} PingName={}", request.getPingId(), request.getPingName());
+            PongResponse response01 = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(GRPC_TEST_PONG_ID01).build();
+            PongResponse response02 = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(GRPC_TEST_PONG_ID02).build();
+            responseObserver.onNext(response01);
+            responseObserver.onNext(response02);
+            responseObserver.onCompleted();
+        }
+        
+        @Override
+        public StreamObserver<PingRequest> 
pingAsyncSync(StreamObserver<PongResponse> responseObserver) {
+            @SuppressWarnings({"unchecked", "rawtypes"})
+            StreamObserver<PingRequest> requestObserver = new 
StreamObserver<PingRequest>() {
+
+                @Override
+                public void onNext(PingRequest request) {
+                    PongResponse response = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(request.getPingId()).build();
+                    responseObserver.onNext(response);
+                }
+
+                @Override
+                public void onError(Throwable t) {
+                    LOG.info("Error in pingAsyncSync() " + t.getMessage());
+                }
+
+                @Override
+                public void onCompleted() {
+                    responseObserver.onCompleted();
+                }
+            };
+            return requestObserver;
+        }
+        
+        @Override
+        public StreamObserver<PingRequest> 
pingAsyncAsync(StreamObserver<PongResponse> responseObserver) {
+            @SuppressWarnings({"unchecked", "rawtypes"})
+            StreamObserver<PingRequest> requestObserver = new 
StreamObserver<PingRequest>() {
+
+                @Override
+                public void onNext(PingRequest request) {
+                    PongResponse response = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(request.getPingId()).build();
+                    responseObserver.onNext(response);
+                }
+
+                @Override
+                public void onError(Throwable t) {
+                    LOG.info("Error in pingAsyncAsync() " + t.getMessage());
+                }
+
+                @Override
+                public void onCompleted() {
+                    responseObserver.onCompleted();
+                }
+            };
+            return requestObserver;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/18ac6527/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSyncTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSyncTest.java
 
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSyncTest.java
new file mode 100644
index 0000000..7bb9bf1
--- /dev/null
+++ 
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSyncTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+import io.grpc.BindableService;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GrpcProducerSyncTest extends CamelTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GrpcProducerSyncTest.class);
+
+    private static final int GRPC_TEST_PORT = 
AvailablePortFinder.getNextAvailable();
+    private static final int GRPC_TEST_PING_ID = 1;
+    private static final int GRPC_TEST_PONG_ID01 = 1;
+    private static final int GRPC_TEST_PONG_ID02 = 2;
+    private static final int MULTIPLE_RUN_TEST_COUNT = 100;
+    private static final String GRPC_TEST_PING_VALUE = "PING";
+    private static final String GRPC_TEST_PONG_VALUE = "PONG";
+
+    private static Server grpcServer;
+
+    @BeforeClass
+    public static void startGrpcServer() throws Exception {
+        grpcServer = ServerBuilder.forPort(GRPC_TEST_PORT).addService(new 
PingPongImpl()).build().start();
+        LOG.info("gRPC server started on port " + GRPC_TEST_PORT);
+    }
+
+    @AfterClass
+    public static void stopGrpcServer() throws IOException {
+        if (grpcServer != null) {
+            grpcServer.shutdown();
+            LOG.info("gRPC server stoped");
+        }
+    }
+
+    @Test
+    public void testPingSyncSyncMethodInvocation() throws Exception {
+        LOG.info("gRPC PingSyncSync method test start");
+        // Testing simple sync method invoke with host and port parameters
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        Object pongResponse = template.requestBody("direct:grpc-sync-sync", 
pingRequest);
+        assertNotNull(pongResponse);
+        assertTrue(pongResponse instanceof PongResponse);
+        assertEquals(((PongResponse)pongResponse).getPongId(), 
GRPC_TEST_PING_ID);
+        assertEquals(((PongResponse)pongResponse).getPongName(), 
GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE);
+
+        // Testing simple sync method invoke with target instead of host and
+        // port parameters
+        pongResponse = template.requestBody("direct:grpc-sync-target", 
pingRequest);
+        assertNotNull(pongResponse);
+        assertTrue(pongResponse instanceof PongResponse);
+        assertEquals(((PongResponse)pongResponse).getPongId(), 
GRPC_TEST_PING_ID);
+
+        // Testing simple sync method with name described in .proto file 
instead
+        // of generated class
+        pongResponse = 
template.requestBody("direct:grpc-sync-proto-method-name", pingRequest);
+        assertNotNull(pongResponse);
+        assertTrue(pongResponse instanceof PongResponse);
+        assertEquals(((PongResponse)pongResponse).getPongId(), 
GRPC_TEST_PING_ID);
+    }
+
+    @Test
+    public void testPingSyncSyncMultipleInvocation() throws Exception {
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        // Multiple sync methods call for average performance estimation
+        for (int id = 0; id < MULTIPLE_RUN_TEST_COUNT; id++) {
+            PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE + 
id).setPingId(id).build();
+            Object pongResponse = 
template.requestBody("direct:grpc-sync-sync", pingRequest);
+            assertEquals(((PongResponse)pongResponse).getPongId(), id);
+        }
+        LOG.info("Multiple sync invocation time {} milliseconds, everage 
operations/sec {} ", stopwatch.stop().elapsed(TimeUnit.MILLISECONDS),
+                 Math.round(1000 * MULTIPLE_RUN_TEST_COUNT / 
stopwatch.elapsed(TimeUnit.MILLISECONDS)));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testPingSyncAsyncMethodInvocation() throws Exception {
+        LOG.info("gRPC PingSyncAsync method test start");
+        // Testing simple method with sync request and asyc response in 
synchronous invocation style
+        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        Object pongResponse = template.requestBody("direct:grpc-sync-async", 
pingRequest);
+        assertNotNull(pongResponse);
+        assertTrue(pongResponse instanceof List<?>);
+        assertEquals(((List<PongResponse>)pongResponse).get(0).getPongId(), 
GRPC_TEST_PONG_ID01);
+        assertEquals(((List<PongResponse>)pongResponse).get(1).getPongId(), 
GRPC_TEST_PONG_ID02);
+        assertEquals(((List<PongResponse>)pongResponse).get(0).getPongName(), 
GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("direct:grpc-sync-sync").to("grpc://org.apache.camel.component.grpc.PingPong?method=pingSyncSync&host=localhost&port="
 + GRPC_TEST_PORT + "&synchronous=true");
+                from("direct:grpc-sync-target")
+                    
.to("grpc://org.apache.camel.component.grpc.PingPong?method=pingSyncSync&target=dns:///localhost:"
 + GRPC_TEST_PORT + "&synchronous=true");
+                from("direct:grpc-sync-proto-method-name")
+                    
.to("grpc://org.apache.camel.component.grpc.PingPong?method=PingSyncSync&host=localhost&port="
 + GRPC_TEST_PORT + "&synchronous=true");
+                
from("direct:grpc-sync-async").to("grpc://org.apache.camel.component.grpc.PingPong?method=pingSyncAsync&host=localhost&port="
 + GRPC_TEST_PORT + "&synchronous=true");
+            }
+        };
+    }
+
+    /**
+     * Test gRPC PingPong server implementation
+     */
+    static class PingPongImpl extends PingPongGrpc.PingPongImplBase {
+        @Override
+        public void pingSyncSync(PingRequest request, 
StreamObserver<PongResponse> responseObserver) {
+            LOG.info("gRPC server received data from PingPong service 
PingId={} PingName={}", request.getPingId(), request.getPingName());
+            PongResponse response = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(request.getPingId()).build();
+            responseObserver.onNext(response);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void pingSyncAsync(PingRequest request, 
StreamObserver<PongResponse> responseObserver) {
+            LOG.info("gRPC server received data from PingAsyncResponse service 
PingId={} PingName={}", request.getPingId(), request.getPingName());
+            PongResponse response01 = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(GRPC_TEST_PONG_ID01).build();
+            PongResponse response02 = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(GRPC_TEST_PONG_ID02).build();
+            responseObserver.onNext(response01);
+            responseObserver.onNext(response02);
+            responseObserver.onCompleted();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/18ac6527/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerTest.java
 
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerTest.java
deleted file mode 100644
index e713549..0000000
--- 
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.grpc;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Stopwatch;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import io.grpc.stub.StreamObserver;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.support.SynchronizationAdapter;
-import org.apache.camel.test.AvailablePortFinder;
-import org.apache.camel.test.junit4.CamelTestSupport;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GrpcProducerTest extends CamelTestSupport {
-    private static final Logger LOG = 
LoggerFactory.getLogger(GrpcProducerTest.class);
-
-    private static final int GRPC_TEST_PORT = 
AvailablePortFinder.getNextAvailable();
-    private static final int GRPC_TEST_PING_ID = 1;
-    private static final int GRPC_TEST_PONG_ID01 = 1;
-    private static final int GRPC_TEST_PONG_ID02 = 2;
-    private static final int MULTIPLE_RUN_TEST_COUNT = 100;
-    private static final String GRPC_TEST_PING_VALUE = "PING";
-    private static final String GRPC_TEST_PONG_VALUE = "PONG";
-
-    private static Server grpcServer;
-    private Object asyncPongResponse;
-
-    @BeforeClass
-    public static void startGrpcServer() throws Exception {
-        grpcServer = ServerBuilder.forPort(GRPC_TEST_PORT).addService(new 
PingPongImpl()).build().start();
-        LOG.info("gRPC server started on port " + GRPC_TEST_PORT);
-    }
-
-    @AfterClass
-    public static void stopGrpcServer() throws IOException {
-        if (grpcServer != null) {
-            grpcServer.shutdown();
-            LOG.info("gRPC server stoped...");
-        }
-    }
-
-    @Test
-    public void testSyncMethodInvocation() throws Exception {
-        LOG.info("gRPC sync test start");
-        // Testing simple sync method invoke with host and port parameters
-        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
-        Object pongResponse = template.requestBody("direct:grpc-sync", 
pingRequest);
-        assertNotNull(pongResponse);
-        assertTrue(pongResponse instanceof PongResponse);
-        assertEquals(((PongResponse)pongResponse).getPongId(), 
GRPC_TEST_PING_ID);
-        assertEquals(((PongResponse)pongResponse).getPongName(), 
GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE);
-
-        // Testing simple sync method invoke with target instead of host and
-        // port parameters
-        pongResponse = template.requestBody("direct:grpc-sync-target", 
pingRequest);
-        assertNotNull(pongResponse);
-        assertTrue(pongResponse instanceof PongResponse);
-        assertEquals(((PongResponse)pongResponse).getPongId(), 
GRPC_TEST_PING_ID);
-
-        // Testing simple sync method with name described in .proto file 
instead
-        // of generated class
-        pongResponse = 
template.requestBody("direct:grpc-sync-proto-method-name", pingRequest);
-        assertNotNull(pongResponse);
-        assertTrue(pongResponse instanceof PongResponse);
-        assertEquals(((PongResponse)pongResponse).getPongId(), 
GRPC_TEST_PING_ID);
-    }
-
-    @Test
-    public void testSyncMethodAsyncStyleInvocation() throws Exception {
-        LOG.info("gRPC sync method with async style test start");
-        final CountDownLatch latch = new CountDownLatch(1);
-        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
-
-        // Testing sync service call with async style invocation
-        template.asyncCallbackSendBody("direct:grpc-sync-async", pingRequest, 
new SynchronizationAdapter() {
-
-            @Override
-            public void onComplete(Exchange exchange) {
-                asyncPongResponse = exchange.getOut().getBody();
-                latch.countDown();
-            }
-        });
-        latch.await(1, TimeUnit.SECONDS);
-
-        assertNotNull(asyncPongResponse);
-        assertTrue(asyncPongResponse instanceof List);
-
-        @SuppressWarnings("unchecked")
-        List<PongResponse> asyncPongResponseList = 
(List<PongResponse>)asyncPongResponse;
-        assertEquals(1, asyncPongResponseList.size());
-        assertEquals(asyncPongResponseList.get(0).getPongId(), 
GRPC_TEST_PING_ID);
-        assertEquals(asyncPongResponseList.get(0).getPongName(), 
GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE);
-    }
-
-    @Test
-    public void testMultipleSyncInvocation() throws Exception {
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        // Multiple sync methods call for average performance estimation
-        for (int id = 0; id < MULTIPLE_RUN_TEST_COUNT; id++) {
-            PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE + 
id).setPingId(id).build();
-            Object pongResponse = template.requestBody("direct:grpc-sync", 
pingRequest);
-            assertEquals(((PongResponse)pongResponse).getPongId(), id);
-        }
-        LOG.info("Multiple sync invocation time {} milliseconds, everage 
operations/sec {} ", stopwatch.stop().elapsed(TimeUnit.MILLISECONDS),
-                 Math.round(1000 * MULTIPLE_RUN_TEST_COUNT / 
stopwatch.elapsed(TimeUnit.MILLISECONDS)));
-    }
-
-    @Test
-    public void testAsyncMethodInvocation() throws Exception {
-        LOG.info("gRPC async method reponse test start");
-        final CountDownLatch latch = new CountDownLatch(1);
-        PingRequest pingRequest = 
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
-
-        // Testing async service call
-        template.asyncCallbackSendBody("direct:grpc-async", pingRequest, new 
SynchronizationAdapter() {
-
-            @Override
-            public void onComplete(Exchange exchange) {
-                asyncPongResponse = exchange.getOut().getBody();
-                latch.countDown();
-            }
-        });
-        latch.await(1, TimeUnit.SECONDS);
-
-        assertNotNull(asyncPongResponse);
-        assertTrue(asyncPongResponse instanceof List);
-
-        @SuppressWarnings("unchecked")
-        List<PongResponse> asyncPongResponseList = 
(List<PongResponse>)asyncPongResponse;
-        assertEquals(2, asyncPongResponseList.size());
-        assertEquals(asyncPongResponseList.get(0).getPongId(), 
GRPC_TEST_PONG_ID01);
-        assertEquals(asyncPongResponseList.get(1).getPongId(), 
GRPC_TEST_PONG_ID02);
-        assertEquals(asyncPongResponseList.get(0).getPongName(), 
GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE);
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() {
-                
from("direct:grpc-sync").to("grpc://org.apache.camel.component.grpc.PingPong?method=sendPing&host=localhost&port="
 + GRPC_TEST_PORT + "&synchronous=true");
-                from("direct:grpc-sync-target")
-                    
.to("grpc://org.apache.camel.component.grpc.PingPong?method=sendPing&target=dns:///localhost:"
 + GRPC_TEST_PORT + "&synchronous=true");
-                from("direct:grpc-sync-proto-method-name")
-                    
.to("grpc://org.apache.camel.component.grpc.PingPong?method=SendPing&host=localhost&port="
 + GRPC_TEST_PORT + "&synchronous=true");
-                
from("direct:grpc-sync-async").to("grpc://org.apache.camel.component.grpc.PingPong?method=sendPing&host=localhost&port="
 + GRPC_TEST_PORT);
-                
from("direct:grpc-async").to("grpc://org.apache.camel.component.grpc.PingPong?method=pingAsyncResponse&host=localhost&port="
 + GRPC_TEST_PORT);
-            }
-        };
-    }
-
-    /**
-     * Test gRPC PingPong server implementation
-     */
-    static class PingPongImpl extends PingPongGrpc.PingPongImplBase {
-        @Override
-        public void sendPing(PingRequest request, StreamObserver<PongResponse> 
responseObserver) {
-            LOG.info("gRPC server received data from PingPong service 
PingId={} PingName={}", request.getPingId(), request.getPingName());
-            PongResponse response = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(request.getPingId()).build();
-            responseObserver.onNext(response);
-            responseObserver.onCompleted();
-        }
-
-        @Override
-        public void pingAsyncResponse(PingRequest request, 
StreamObserver<PongResponse> responseObserver) {
-            LOG.info("gRPC server received data from PingAsyncResponse service 
PingId={} PingName={}", request.getPingId(), request.getPingName());
-            PongResponse response01 = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(GRPC_TEST_PONG_ID01).build();
-            PongResponse response02 = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE).setPongId(GRPC_TEST_PONG_ID02).build();
-            responseObserver.onNext(response01);
-            responseObserver.onNext(response02);
-            responseObserver.onCompleted();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/18ac6527/components/camel-grpc/src/test/proto/pingpong.proto
----------------------------------------------------------------------
diff --git a/components/camel-grpc/src/test/proto/pingpong.proto 
b/components/camel-grpc/src/test/proto/pingpong.proto
index ff1a4b9..4ee45b7 100644
--- a/components/camel-grpc/src/test/proto/pingpong.proto
+++ b/components/camel-grpc/src/test/proto/pingpong.proto
@@ -24,11 +24,17 @@ package org.apache.camel.component.grpc.test;
 
 // The PingPong service definition.
 service PingPong {
-  // Sending ping message and getting answer
-  rpc SendPing (PingRequest) returns (PongResponse) {}
+  // Sending ping message and getting pong answer synchronously
+  rpc PingSyncSync (PingRequest) returns (PongResponse) {}
   
-  // Sending ping message and getting answer in async mode (multiple response 
messages)
-  rpc PingAsyncResponse (PingRequest) returns (stream PongResponse) {}
+  // Sending ping message synchronously and getting pong answer asynchronously 
in streaming mode (multiple response messages)
+  rpc PingSyncAsync (PingRequest) returns (stream PongResponse) {}
+  
+  // Sending ping message asynchronously and getting pong answer synchronously
+  rpc PingAsyncSync (stream PingRequest) returns (PongResponse) {}
+  
+  // Sending ping message asynchronously and getting pong answer 
asynchronously in streaming mode (multiple response messages)
+  rpc PingAsyncAsync (stream PingRequest) returns (stream PongResponse) {}
 }
 
 // The request message containing the user's name.

Reply via email to