CAMEL-11237: camel-grpc - Add a grpc consumer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e72fc444 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e72fc444 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e72fc444
Branch: refs/heads/master Commit: e72fc4443700c04e30f625026c6d5d5c9e6cef83 Parents: ed85770 Author: Dmitry Volodin <dmvo...@gmail.com> Authored: Wed May 17 23:43:47 2017 +0300 Committer: Dmitry Volodin <dmvo...@gmail.com> Committed: Mon May 22 16:42:58 2017 +0300 ---------------------------------------------------------------------- components/camel-grpc/pom.xml | 12 + .../src/main/docs/grpc-component.adoc | 9 +- .../camel/component/grpc/GrpcConfiguration.java | 6 +- .../camel/component/grpc/GrpcConstants.java | 12 + .../camel/component/grpc/GrpcConsumer.java | 138 +++++++++++ .../camel/component/grpc/GrpcEndpoint.java | 6 +- .../apache/camel/component/grpc/GrpcUtils.java | 14 ++ .../grpc/server/GrpcHeaderInterceptor.java | 45 ++++ .../grpc/server/GrpcMethodHandler.java | 88 +++++++ .../GrpcRequestAbstractStreamObserver.java | 46 ++++ .../GrpcRequestAggregationStreamObserver.java | 68 +++++ .../GrpcRequestPropagationStreamObserver.java | 57 +++++ .../grpc/GrpcConsumerConcurrentTest.java | 192 ++++++++++++++ .../camel/component/grpc/GrpcConsumerTest.java | 248 +++++++++++++++++++ 14 files changed, 932 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-grpc/pom.xml b/components/camel-grpc/pom.xml index 8614d3d..37aff72 100644 --- a/components/camel-grpc/pom.xml +++ b/components/camel-grpc/pom.xml @@ -63,6 +63,12 @@ <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> </dependency> + + <dependency> + <groupId>org.javassist</groupId> + <artifactId>javassist</artifactId> + <version>3.20.0-GA</version> + </dependency> <!-- for testing --> <dependency> @@ -70,6 +76,12 @@ <artifactId>camel-test</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.googlecode.junit-toolbox</groupId> + <artifactId>junit-toolbox</artifactId> + <version>2.3</version> + <scope>test</scope> + </dependency> <!-- logging --> <dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/docs/grpc-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/docs/grpc-component.adoc b/components/camel-grpc/src/main/docs/grpc-component.adoc index a642263..7c53b9a 100644 --- a/components/camel-grpc/src/main/docs/grpc-component.adoc +++ b/components/camel-grpc/src/main/docs/grpc-component.adoc @@ -47,14 +47,17 @@ with the following path and query parameters: | **service** | *Required* Fully qualified service name from the protocol buffer descriptor file (package dot service definition name) | | String |======================================================================= -#### Query Parameters (6 parameters): +#### Query Parameters (9 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= | Name | Description | Default | Type -| **host** (producer) | The gRPC server host name | | String +| **host** (common) | The gRPC server host name | | String +| **port** (common) | The gRPC server port | | int +| **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler +| **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **method** (producer) | gRPC method name | | String -| **port** (producer) | The gRPC server port | | int | **target** (producer) | The channel target name as alternative to host and port parameters | | String | **usePlainText** (producer) | The plaintext connection to the server flag | true | Boolean | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java index 8909419..01d94b6 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java @@ -27,15 +27,15 @@ public class GrpcConfiguration { @UriPath @Metadata(required = "true") private String service; - @UriParam + @UriParam(label = "producer") private String method; @UriParam private String host; @UriParam private int port; - @UriParam + @UriParam(label = "producer") private String target; - @UriParam(defaultValue = "true") + @UriParam(label = "producer", defaultValue = "true") private Boolean usePlainText = true; private String serviceName; http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java index 44051a3..f61f3a8 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java @@ -22,7 +22,19 @@ package org.apache.camel.component.grpc; public interface GrpcConstants { String GRPC_SERVICE_CLASS_POSTFIX = "Grpc"; + String GRPC_SERVER_IMPL_POSTFIX = "ImplBase"; String GRPC_SERVICE_SYNC_STUB_METHOD = "newBlockingStub"; String GRPC_SERVICE_ASYNC_STUB_METHOD = "newStub"; String GRPC_SERVICE_FUTURE_STUB_METHOD = "newFutureStub"; + + /* + * This headers will be set after gRPC consumer method is invoked + */ + String GRPC_METHOD_NAME_HEADER = "CamelGrpcMethodName"; + String GRPC_USER_AGENT_HEADER = "CamelGrpcUserAgent"; + String GRPC_EVENT_TYPE_HEADER = "CamelGrpcEventType"; + + String GRPC_EVENT_TYPE_ON_NEXT = "onNext"; + String GRPC_EVENT_TYPE_ON_ERROR = "onError"; + String GRPC_EVENT_TYPE_ON_COMPLETED = "onCompleted"; } http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java new file mode 100644 index 0000000..29a2ada --- /dev/null +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.grpc; + +import java.lang.reflect.InvocationTargetException; +import java.net.InetSocketAddress; + +import io.grpc.BindableService; +import io.grpc.Server; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.netty.NettyServerBuilder; +import javassist.util.proxy.MethodHandler; +import javassist.util.proxy.ProxyFactory; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.grpc.server.GrpcHeaderInterceptor; +import org.apache.camel.component.grpc.server.GrpcMethodHandler; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents gRPC server consumer implementation + */ +public class GrpcConsumer extends DefaultConsumer { + private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumer.class); + + protected final GrpcConfiguration configuration; + protected final GrpcEndpoint endpoint; + + private Server server; + + public GrpcConsumer(GrpcEndpoint endpoint, Processor processor, GrpcConfiguration configuration) { + super(endpoint, processor); + this.endpoint = endpoint; + this.configuration = configuration; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + if (server == null) { + LOG.info("Starting the gRPC server"); + initializeServer(); + server.start(); + LOG.info("gRPC server started and listening on port: " + server.getPort()); + } + } + + @Override + protected void doStop() throws Exception { + if (server != null) { + LOG.trace("Terminating gRPC server"); + server.shutdown().shutdownNow(); + server = null; + } + super.doStop(); + } + + protected void initializeServer() { + NettyServerBuilder serverBuilder = null; + BindableService bindableService = null; + ProxyFactory serviceProxy = new ProxyFactory(); + ServerInterceptor headerInterceptor = new GrpcHeaderInterceptor(); + MethodHandler methodHandler = new GrpcMethodHandler(endpoint, this); + + serviceProxy.setSuperclass(GrpcUtils.constructGrpcImplBaseClass(configuration.getServicePackage(), configuration.getServiceName())); + try { + bindableService = (BindableService)serviceProxy.create(new Class<?>[0], new Object[0], methodHandler); + } catch (NoSuchMethodException | IllegalArgumentException | InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new IllegalArgumentException("Unable to create bindable proxy service for " + configuration.getService()); + } + + if (!ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) { + LOG.info("Building gRPC server on " + configuration.getHost() + ":" + configuration.getPort()); + serverBuilder = NettyServerBuilder.forAddress(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + } else if (ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) { + LOG.info("Building gRPC server on <any address>" + ":" + configuration.getPort()); + serverBuilder = NettyServerBuilder.forPort(configuration.getPort()); + } else { + throw new IllegalArgumentException("No server start properties (host, port) specified"); + } + + server = serverBuilder.addService(ServerInterceptors.intercept(bindableService, headerInterceptor)).build(); + } + + public boolean process(Exchange exchange, AsyncCallback callback) { + exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT); + return doSend(exchange, callback); + } + + public void onCompleted(Exchange exchange) { + exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED); + doSend(exchange, done -> { + }); + } + + public void onError(Exchange exchange, Throwable error) { + exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR); + exchange.getIn().setBody(error); + + doSend(exchange, done -> { + }); + } + + private boolean doSend(Exchange exchange, AsyncCallback callback) { + if (this.isRunAllowed()) { + this.getAsyncProcessor().process(exchange, doneSync -> { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + callback.done(doneSync); + }); + return false; + } else { + LOG.warn("Consumer not ready to process exchanges. The exchange {} will be discarded", exchange); + callback.done(true); + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java index 61f0e22..1f6010c 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java @@ -25,9 +25,9 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; /** - * The gRPC component is using for calling remote procedures via HTTP/2 + * The gRPC component allows to call and expose remote procedures via HTTP/2 with protobuf dataformat */ -@UriEndpoint(firstVersion = "2.19.0", scheme = "grpc", title = "gRPC", syntax = "grpc:service", producerOnly = true, label = "rpc") +@UriEndpoint(firstVersion = "2.19.0", scheme = "grpc", title = "gRPC", syntax = "grpc:service", label = "rpc") public class GrpcEndpoint extends DefaultEndpoint { @UriParam protected final GrpcConfiguration configuration; @@ -47,7 +47,7 @@ public class GrpcEndpoint extends DefaultEndpoint { } public Consumer createConsumer(Processor processor) throws Exception { - throw new UnsupportedOperationException("Cannot consume from a gRPC endpoint: " + getEndpointUri()); + return new GrpcConsumer(this, processor, configuration); } public boolean isSingleton() { http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java index 5ff1a88..96c79f5 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcUtils.java @@ -69,6 +69,20 @@ public final class GrpcUtils { return grpcBlockingStub; } + @SuppressWarnings("rawtypes") + public static Class constructGrpcImplBaseClass(String packageName, String serviceName) { + Class grpcServerImpl; + + String serverBaseImpl = packageName + "." + serviceName + GrpcConstants.GRPC_SERVICE_CLASS_POSTFIX + "$" + serviceName + GrpcConstants.GRPC_SERVER_IMPL_POSTFIX; + try { + grpcServerImpl = Class.forName(serverBaseImpl); + + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("gRPC server base class not found: " + serverBaseImpl); + } + return grpcServerImpl; + } + @SuppressWarnings({"rawtypes", "unchecked"}) public static void invokeAsyncMethod(Object asyncStubClass, String invokeMethod, Object request, StreamObserver responseObserver) { Class[] paramMethod = null; http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcHeaderInterceptor.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcHeaderInterceptor.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcHeaderInterceptor.java new file mode 100644 index 0000000..1ce582d --- /dev/null +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcHeaderInterceptor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.grpc.server; + +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.internal.GrpcUtil; +import org.apache.camel.Exchange; +import org.apache.camel.component.grpc.GrpcConstants; + +/** + * gRPC server header interceptor + */ +public class GrpcHeaderInterceptor implements ServerInterceptor { + public static final Context.Key<String> USER_AGENT_CONTEXT_KEY = Context.key(GrpcConstants.GRPC_USER_AGENT_HEADER); + public static final Context.Key<String> CONTENT_TYPE_CONTEXT_KEY = Context.key(Exchange.CONTENT_TYPE); + + @Override + public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) { + Context context = Context.current() + .withValue(USER_AGENT_CONTEXT_KEY, requestHeaders.get(GrpcUtil.USER_AGENT_KEY)) + .withValue(CONTENT_TYPE_CONTEXT_KEY, requestHeaders.get(GrpcUtil.CONTENT_TYPE_KEY)); + + return Contexts.interceptCall(context, call, requestHeaders, next); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java new file mode 100644 index 0000000..2ed83cc --- /dev/null +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.grpc.server; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.grpc.stub.StreamObserver; +import javassist.util.proxy.MethodHandler; +import org.apache.camel.Exchange; +import org.apache.camel.component.grpc.GrpcConstants; +import org.apache.camel.component.grpc.GrpcConsumer; +import org.apache.camel.component.grpc.GrpcEndpoint; + +/** + * gRPC server method invocation handler + */ +public class GrpcMethodHandler implements MethodHandler { + private final GrpcEndpoint endpoint; + private final GrpcConsumer consumer; + + public GrpcMethodHandler(GrpcEndpoint endpoint, GrpcConsumer consumer) { + this.endpoint = endpoint; + this.consumer = consumer; + } + + @Override + @SuppressWarnings("unchecked") + public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) throws Throwable { + Map<String, Object> grcpHeaders = new HashMap<String, Object>(); + + grcpHeaders.put(GrpcHeaderInterceptor.USER_AGENT_CONTEXT_KEY.toString(), GrpcHeaderInterceptor.USER_AGENT_CONTEXT_KEY.get()); + grcpHeaders.put(GrpcHeaderInterceptor.CONTENT_TYPE_CONTEXT_KEY.toString(), GrpcHeaderInterceptor.CONTENT_TYPE_CONTEXT_KEY.get()); + grcpHeaders.put(GrpcConstants.GRPC_METHOD_NAME_HEADER, thisMethod.getName()); + + // Determines that the incoming parameters are transmitted in synchronous mode + // Two incoming parameters and second is instance of the io.grpc.stub.StreamObserver + if (args.length == 2 && args[1] instanceof StreamObserver) { + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody(args[0]); + exchange.getIn().setHeaders(grcpHeaders); + + if (endpoint.isSynchronous()) { + consumer.getProcessor().process(exchange); + } else { + consumer.getAsyncProcessor().process(exchange); + } + + StreamObserver<Object> responseObserver = (StreamObserver<Object>)args[1]; + Object responseBody = exchange.getIn().getBody(); + if (responseBody instanceof List) { + List<Object> responseList = (List<Object>)responseBody; + responseList.forEach((responseItem) -> { + responseObserver.onNext(responseItem); + }); + } else { + responseObserver.onNext(responseBody); + } + responseObserver.onCompleted(); + } else if (args.length == 1 && args[0] instanceof StreamObserver) { + // Single incoming parameter is instance of the io.grpc.stub.StreamObserver + final StreamObserver<Object> responseObserver = (StreamObserver<Object>)args[0]; + final StreamObserver<Object> requestObserver = new GrpcRequestAggregationStreamObserver(endpoint, consumer, responseObserver, grcpHeaders); + + return requestObserver; + } else { + throw new IllegalArgumentException("Invalid to process gRPC method: " + thisMethod.getName()); + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java new file mode 100644 index 0000000..b47e724 --- /dev/null +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.grpc.server; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import io.grpc.stub.StreamObserver; +import org.apache.camel.Exchange; +import org.apache.camel.component.grpc.GrpcConsumer; +import org.apache.camel.component.grpc.GrpcEndpoint; + +/** + * gRPC request abstract stream observer is the base class for other stream + * observer implementations + */ +public abstract class GrpcRequestAbstractStreamObserver implements StreamObserver<Object> { + protected final GrpcEndpoint endpoint; + protected final GrpcConsumer consumer; + protected Exchange exchange; + protected List<Object> requestList = new LinkedList<>(); + protected StreamObserver<Object> responseObserver; + protected Map<String, Object> headers; + + public GrpcRequestAbstractStreamObserver(GrpcEndpoint endpoint, GrpcConsumer consumer, StreamObserver<Object> responseObserver, Map<String, Object> headers) { + this.endpoint = endpoint; + this.consumer = consumer; + this.responseObserver = responseObserver; + this.headers = headers; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java new file mode 100644 index 0000000..8f7ff6a --- /dev/null +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.grpc.server; + +import java.util.List; +import java.util.Map; + +import io.grpc.stub.StreamObserver; +import org.apache.camel.component.grpc.GrpcConsumer; +import org.apache.camel.component.grpc.GrpcEndpoint; + +/** + * gRPC request stream observer which is collecting received objects every + * onNext() call into the list and processing them in onCompleted() + */ +public class GrpcRequestAggregationStreamObserver extends GrpcRequestAbstractStreamObserver { + + public GrpcRequestAggregationStreamObserver(GrpcEndpoint endpoint, GrpcConsumer consumer, StreamObserver<Object> responseObserver, Map<String, Object> headers) { + super(endpoint, consumer, responseObserver, headers); + } + + @Override + public void onNext(Object request) { + requestList.add(request); + } + + @Override + public void onError(Throwable t) { + exchange.setException(t); + } + + @Override + @SuppressWarnings("unchecked") + public void onCompleted() { + exchange = endpoint.createExchange(); + + exchange.getIn().setBody(requestList); + exchange.getIn().setHeaders(headers); + + consumer.process(exchange, doneSync -> { + }); + + Object responseBody = exchange.getIn().getBody(); + if (responseBody instanceof List) { + List<Object> responseList = (List<Object>)responseBody; + responseList.forEach((responseItem) -> { + responseObserver.onNext(responseItem); + }); + } else { + responseObserver.onNext(responseBody); + } + responseObserver.onCompleted(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java new file mode 100644 index 0000000..da3baff --- /dev/null +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.grpc.server; + +import java.util.Map; + +import io.grpc.stub.StreamObserver; +import org.apache.camel.component.grpc.GrpcConsumer; +import org.apache.camel.component.grpc.GrpcEndpoint; + +/** + * gRPC request stream observer which is propagating every onNext(), onError() + * or onCompleted() calls to the Camel route + */ +public class GrpcRequestPropagationStreamObserver extends GrpcRequestAbstractStreamObserver { + + public GrpcRequestPropagationStreamObserver(GrpcEndpoint endpoint, GrpcConsumer consumer, StreamObserver<Object> responseObserver, Map<String, Object> headers) { + super(endpoint, consumer, responseObserver, headers); + } + + @Override + public void onNext(Object request) { + exchange = endpoint.createExchange(); + exchange.getIn().setBody(request); + consumer.process(exchange, doneSync -> { + }); + responseObserver.onNext(exchange.getOut().getBody()); + } + + @Override + public void onError(Throwable throwable) { + exchange = endpoint.createExchange(); + consumer.onError(exchange, throwable); + responseObserver.onError(throwable); + } + + @Override + public void onCompleted() { + exchange = endpoint.createExchange(); + consumer.onCompleted(exchange); + responseObserver.onCompleted(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java new file mode 100644 index 0000000..b5febee --- /dev/null +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.grpc; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.googlecode.junittoolbox.MultithreadingTester; +import com.googlecode.junittoolbox.RunnableAssert; + +import io.grpc.ManagedChannel; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GrpcConsumerConcurrentTest extends CamelTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerConcurrentTest.class); + + private static final int GRPC_ASYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int GRPC_HEADERS_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int CONCURRENT_THREAD_COUNT = 300; + private static final int ROUNDS_PER_THREAD_COUNT = 10; + private static final String GRPC_TEST_PING_VALUE = "PING"; + private static final String GRPC_TEST_PONG_VALUE = "PONG"; + private static final String GRPC_USER_AGENT_PREFIX = "user-agent-"; + private static AtomicInteger idCounter = new AtomicInteger(); + + public static Integer createId() { + return idCounter.getAndIncrement(); + } + + public static Integer getId() { + return idCounter.get(); + } + + @Test + public void testAsyncWithConcurrentThreads() throws Exception { + RunnableAssert ra = new RunnableAssert("foo") { + + @Override + public void run() { + final CountDownLatch latch = new CountDownLatch(1); + ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build(); + PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); + + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + int instanceId = createId(); + + final PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build(); + StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); + requestObserver.onNext(pingRequest); + requestObserver.onNext(pingRequest); + requestObserver.onCompleted(); + try { + latch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + PongResponse pongResponse = responseObserver.getPongResponse(); + + assertNotNull("instanceId = " + instanceId, pongResponse); + assertEquals(instanceId, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + + asyncRequestChannel.shutdown().shutdownNow(); + } + }; + + new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT).run(); + } + + @Test + public void testHeadersWithConcurrentThreads() throws Exception { + RunnableAssert ra = new RunnableAssert("foo") { + + @Override + public void run() { + int instanceId = createId(); + final CountDownLatch latch = new CountDownLatch(1); + ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", GRPC_HEADERS_TEST_PORT).userAgent(GRPC_USER_AGENT_PREFIX + instanceId).usePlaintext(true).build(); + PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); + + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + + final PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build(); + StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); + requestObserver.onNext(pingRequest); + requestObserver.onNext(pingRequest); + requestObserver.onCompleted(); + try { + latch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + PongResponse pongResponse = responseObserver.getPongResponse(); + + assertNotNull("instanceId = " + instanceId, pongResponse); + assertEquals(instanceId, pongResponse.getPongId()); + assertEquals(GRPC_USER_AGENT_PREFIX + instanceId, pongResponse.getPongName()); + + asyncRequestChannel.shutdown().shutdownNow(); + } + }; + + new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT).run(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port=" + GRPC_ASYNC_REQUEST_TEST_PORT).bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); + from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port=" + GRPC_HEADERS_TEST_PORT).process(new HeaderExchangeProcessor()); + } + }; + } + + public class PongResponseStreamObserver implements StreamObserver<PongResponse> { + private PongResponse pongResponse; + private final CountDownLatch latch; + + public PongResponseStreamObserver(CountDownLatch latch) { + this.latch = latch; + } + + public PongResponse getPongResponse() { + return pongResponse; + } + + @Override + public void onNext(PongResponse value) { + pongResponse = value; + } + + @Override + public void onError(Throwable t) { + LOG.info("Exception", t); + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + } + + public class GrpcMessageBuilder { + public PongResponse buildAsyncPongResponse(List<PingRequest> pingRequests) { + PongResponse pongResponse = PongResponse.newBuilder().setPongName(pingRequests.get(0).getPingName() + GRPC_TEST_PONG_VALUE).setPongId(pingRequests.get(0).getPingId()).build(); + return pongResponse; + } + } + + public class HeaderExchangeProcessor implements Processor { + + @SuppressWarnings("unchecked") + public void process(Exchange exchange) throws Exception { + List<PingRequest> pingRequests = (List<PingRequest>)exchange.getIn().getBody(); + String userAgentName = (String)exchange.getIn().getHeader(GrpcConstants.GRPC_USER_AGENT_HEADER); + + // As user agent name is prepended the library's user agent information it's necessary to extract this value (before first space) + PongResponse pongResponse = PongResponse.newBuilder().setPongName(userAgentName.substring(0, userAgentName.indexOf(' '))).setPongId(pingRequests.get(0).getPingId()).build(); + exchange.getIn().setBody(pongResponse); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e72fc444/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java new file mode 100644 index 0000000..1ef9808 --- /dev/null +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.grpc; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GrpcConsumerTest extends CamelTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerTest.class); + + private static final int GRPC_SYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int GRPC_ASYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int GRPC_TEST_PING_ID = 1; + private static final String GRPC_TEST_PING_VALUE = "PING"; + private static final String GRPC_TEST_PONG_VALUE = "PONG"; + + private ManagedChannel syncRequestChannel; + private ManagedChannel asyncRequestChannel; + private PingPongGrpc.PingPongBlockingStub blockingStub; + private PingPongGrpc.PingPongStub nonBlockingStub; + private PingPongGrpc.PingPongStub asyncNonBlockingStub; + + private PongResponse pongResponse; + + @Before + public void startGrpcChannels() { + syncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_SYNC_REQUEST_TEST_PORT).usePlaintext(true).build(); + asyncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build(); + blockingStub = PingPongGrpc.newBlockingStub(syncRequestChannel); + nonBlockingStub = PingPongGrpc.newStub(syncRequestChannel); + asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); + } + + @After + public void stopGrpcChannels() { + syncRequestChannel.shutdown().shutdownNow(); + asyncRequestChannel.shutdown().shutdownNow(); + } + + @Test + public void testSyncSyncMethodInSync() throws Exception { + LOG.info("gRPC pingSyncSync method blocking test start"); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + PongResponse pongResponse = blockingStub.pingSyncSync(pingRequest); + + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + + @Test + public void testSyncAsyncMethodInSync() throws Exception { + LOG.info("gRPC pingSyncAsync method blocking test start"); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + Iterator<PongResponse> pongResponseIter = blockingStub.pingSyncAsync(pingRequest); + while (pongResponseIter.hasNext()) { + PongResponse pongResponse = pongResponseIter.next(); + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + } + + @Test + public void testSyncSyncMethodInAsync() throws Exception { + LOG.info("gRPC pingSyncSync method aync test start"); + final CountDownLatch latch = new CountDownLatch(1); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + + StreamObserver<PongResponse> responseObserver = new StreamObserver<PongResponse>() { + + @Override + public void onNext(PongResponse value) { + pongResponse = value; + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + latch.countDown(); + } + + }; + + nonBlockingStub.pingSyncSync(pingRequest, responseObserver); + latch.await(1, TimeUnit.SECONDS); + + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + + @Test + public void testSyncAsyncMethodInAsync() throws Exception { + LOG.info("gRPC pingSyncAsync method aync test start"); + final CountDownLatch latch = new CountDownLatch(1); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + + StreamObserver<PongResponse> responseObserver = new StreamObserver<PongResponse>() { + + @Override + public void onNext(PongResponse value) { + pongResponse = value; + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + latch.countDown(); + } + + }; + + nonBlockingStub.pingSyncAsync(pingRequest, responseObserver); + latch.await(1, TimeUnit.SECONDS); + + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + + @Test + public void testAsyncSyncMethodInAsync() throws Exception { + LOG.info("gRPC pingAsyncSync method aync test start"); + final CountDownLatch latch = new CountDownLatch(1); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + + StreamObserver<PongResponse> responseObserver = new StreamObserver<PongResponse>() { + + @Override + public void onNext(PongResponse value) { + pongResponse = value; + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + latch.countDown(); + } + + }; + + StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncSync(responseObserver); + requestObserver.onNext(pingRequest); + requestObserver.onNext(pingRequest); + requestObserver.onCompleted(); + latch.await(10, TimeUnit.SECONDS); + + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + + @Test + public void testAsyncAsyncMethodInAsync() throws Exception { + LOG.info("gRPC pingAsyncAsync method aync test start"); + final CountDownLatch latch = new CountDownLatch(1); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + + StreamObserver<PongResponse> responseObserver = new StreamObserver<PongResponse>() { + + @Override + public void onNext(PongResponse value) { + pongResponse = value; + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + latch.countDown(); + } + + }; + + StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); + requestObserver.onNext(pingRequest); + requestObserver.onNext(pingRequest); + requestObserver.onCompleted(); + latch.await(1000, TimeUnit.SECONDS); + + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port=" + GRPC_SYNC_REQUEST_TEST_PORT).bean(new GrpcMessageBuilder(), "buildPongResponse"); + from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port=" + GRPC_ASYNC_REQUEST_TEST_PORT).bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); + } + }; + } + + public class GrpcMessageBuilder { + public PongResponse buildPongResponse(PingRequest pingRequest) { + PongResponse pongResponse = PongResponse.newBuilder().setPongName(pingRequest.getPingName() + GRPC_TEST_PONG_VALUE).setPongId(pingRequest.getPingId()).build(); + return pongResponse; + } + + public PongResponse buildAsyncPongResponse(List<PingRequest> pingRequests) { + PongResponse pongResponse = PongResponse.newBuilder().setPongName(pingRequests.get(0).getPingName() + GRPC_TEST_PONG_VALUE).setPongId(pingRequests.get(0).getPingId()).build(); + return pongResponse; + } + } +}