CAMEL-11237: Add additional consumer test cases Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/57226689 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/57226689 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/57226689
Branch: refs/heads/master Commit: 572266899bcc11cd9b003bcd91870acc89ba760c Parents: dd9e7b0 Author: Dmitry Volodin <dmvo...@gmail.com> Authored: Fri May 19 18:57:20 2017 +0300 Committer: Dmitry Volodin <dmvo...@gmail.com> Committed: Mon May 22 16:42:58 2017 +0300 ---------------------------------------------------------------------- .../src/main/docs/grpc-component.adoc | 5 +- .../GrpcRequestPropagationStreamObserver.java | 10 +- .../grpc/GrpcConsumerAggregationTest.java | 217 ++++++++++++++++ .../grpc/GrpcConsumerConcurrentTest.java | 80 +++--- .../grpc/GrpcConsumerPropagationTest.java | 152 ++++++++++++ .../camel/component/grpc/GrpcConsumerTest.java | 248 ------------------- 6 files changed, 424 insertions(+), 288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/57226689/components/camel-grpc/src/main/docs/grpc-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/docs/grpc-component.adoc b/components/camel-grpc/src/main/docs/grpc-component.adoc index 7c53b9a..08fe996 100644 --- a/components/camel-grpc/src/main/docs/grpc-component.adoc +++ b/components/camel-grpc/src/main/docs/grpc-component.adoc @@ -47,7 +47,7 @@ with the following path and query parameters: | **service** | *Required* Fully qualified service name from the protocol buffer descriptor file (package dot service definition name) | | String |======================================================================= -#### Query Parameters (9 parameters): +#### Query Parameters (10 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -55,11 +55,12 @@ with the following path and query parameters: | **host** (common) | The gRPC server host name | | String | **port** (common) | The gRPC server port | | int | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| **processingStrategy** (consumer) | TBD | | GrpcProcessing Strategies | **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **method** (producer) | gRPC method name | | String | **target** (producer) | The channel target name as alternative to host and port parameters | | String -| **usePlainText** (producer) | The plaintext connection to the server flag | true | Boolean +| **usePlainText** (producer) | The plain text connection to the server flag | true | Boolean | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean |======================================================================= // endpoint options: END http://git-wip-us.apache.org/repos/asf/camel/blob/57226689/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java index da3baff..ae51100 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestPropagationStreamObserver.java @@ -36,14 +36,21 @@ public class GrpcRequestPropagationStreamObserver extends GrpcRequestAbstractStr public void onNext(Object request) { exchange = endpoint.createExchange(); exchange.getIn().setBody(request); + exchange.getIn().setHeaders(headers); consumer.process(exchange, doneSync -> { }); - responseObserver.onNext(exchange.getOut().getBody()); + if (exchange.hasOut()) { + responseObserver.onNext(exchange.getOut().getBody()); + } else { + responseObserver.onNext(exchange.getIn().getBody()); + } + responseObserver.onCompleted(); } @Override public void onError(Throwable throwable) { exchange = endpoint.createExchange(); + exchange.getIn().setHeaders(headers); consumer.onError(exchange, throwable); responseObserver.onError(throwable); } @@ -51,6 +58,7 @@ public class GrpcRequestPropagationStreamObserver extends GrpcRequestAbstractStr @Override public void onCompleted() { exchange = endpoint.createExchange(); + exchange.getIn().setHeaders(headers); consumer.onCompleted(exchange); responseObserver.onCompleted(); } http://git-wip-us.apache.org/repos/asf/camel/blob/57226689/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerAggregationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerAggregationTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerAggregationTest.java new file mode 100644 index 0000000..a6225ad --- /dev/null +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerAggregationTest.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.grpc; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.grpc.GrpcConsumerConcurrentTest.PongResponseStreamObserver; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GrpcConsumerAggregationTest extends CamelTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerAggregationTest.class); + + private static final int GRPC_SYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int GRPC_ASYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int GRPC_TEST_PING_ID = 1; + private static final String GRPC_TEST_PING_VALUE = "PING"; + private static final String GRPC_TEST_PONG_VALUE = "PONG"; + + private ManagedChannel syncRequestChannel; + private ManagedChannel asyncRequestChannel; + private PingPongGrpc.PingPongBlockingStub blockingStub; + private PingPongGrpc.PingPongStub nonBlockingStub; + private PingPongGrpc.PingPongStub asyncNonBlockingStub; + + @Before + public void startGrpcChannels() { + syncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_SYNC_REQUEST_TEST_PORT).usePlaintext(true).build(); + asyncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build(); + blockingStub = PingPongGrpc.newBlockingStub(syncRequestChannel); + nonBlockingStub = PingPongGrpc.newStub(syncRequestChannel); + asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); + } + + @After + public void stopGrpcChannels() { + syncRequestChannel.shutdown().shutdownNow(); + asyncRequestChannel.shutdown().shutdownNow(); + } + + @Test + public void testSyncSyncMethodInSync() throws Exception { + LOG.info("gRPC pingSyncSync method blocking test start"); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + PongResponse pongResponse = blockingStub.pingSyncSync(pingRequest); + + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + + @Test + public void testSyncAsyncMethodInSync() throws Exception { + LOG.info("gRPC pingSyncAsync method blocking test start"); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + Iterator<PongResponse> pongResponseIter = blockingStub.pingSyncAsync(pingRequest); + while (pongResponseIter.hasNext()) { + PongResponse pongResponse = pongResponseIter.next(); + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + } + + @Test + public void testSyncSyncMethodInAsync() throws Exception { + LOG.info("gRPC pingSyncSync method aync test start"); + final CountDownLatch latch = new CountDownLatch(1); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + + nonBlockingStub.pingSyncSync(pingRequest, responseObserver); + latch.await(5, TimeUnit.SECONDS); + + PongResponse pongResponse = responseObserver.getPongResponse(); + + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + + @Test + public void testSyncAsyncMethodInAsync() throws Exception { + LOG.info("gRPC pingSyncAsync method aync test start"); + final CountDownLatch latch = new CountDownLatch(1); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + + nonBlockingStub.pingSyncAsync(pingRequest, responseObserver); + latch.await(5, TimeUnit.SECONDS); + + PongResponse pongResponse = responseObserver.getPongResponse(); + + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + + @Test + public void testAsyncSyncMethodInAsync() throws Exception { + LOG.info("gRPC pingAsyncSync method aync test start"); + final CountDownLatch latch = new CountDownLatch(1); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + + StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncSync(responseObserver); + requestObserver.onNext(pingRequest); + requestObserver.onNext(pingRequest); + requestObserver.onCompleted(); + latch.await(5, TimeUnit.SECONDS); + + PongResponse pongResponse = responseObserver.getPongResponse(); + + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + + @Test + public void testAsyncAsyncMethodInAsync() throws Exception { + LOG.info("gRPC pingAsyncAsync method aync test start"); + final CountDownLatch latch = new CountDownLatch(1); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + + StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); + requestObserver.onNext(pingRequest); + requestObserver.onNext(pingRequest); + requestObserver.onCompleted(); + latch.await(5, TimeUnit.SECONDS); + + PongResponse pongResponse = responseObserver.getPongResponse(); + + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&processingStrategy=AGGREGATION&host=localhost&port=" + GRPC_SYNC_REQUEST_TEST_PORT) + .bean(new GrpcMessageBuilder(), "buildPongResponse"); + + from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&processingStrategy=AGGREGATION&host=localhost&port=" + GRPC_ASYNC_REQUEST_TEST_PORT) + .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); + } + }; + } + + public class PongResponseStreamObserver implements StreamObserver<PongResponse> { + private PongResponse pongResponse; + private final CountDownLatch latch; + + public PongResponseStreamObserver(CountDownLatch latch) { + this.latch = latch; + } + + public PongResponse getPongResponse() { + return pongResponse; + } + + @Override + public void onNext(PongResponse value) { + pongResponse = value; + } + + @Override + public void onError(Throwable t) { + LOG.info("Exception", t); + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + } + + public class GrpcMessageBuilder { + public PongResponse buildPongResponse(PingRequest pingRequest) { + return PongResponse.newBuilder().setPongName(pingRequest.getPingName() + GRPC_TEST_PONG_VALUE).setPongId(pingRequest.getPingId()).build(); + } + + public PongResponse buildAsyncPongResponse(List<PingRequest> pingRequests) { + return PongResponse.newBuilder().setPongName(pingRequests.get(0).getPingName() + GRPC_TEST_PONG_VALUE).setPongId(pingRequests.get(0).getPingId()).build(); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/57226689/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java index b5febee..01b92d8 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java @@ -47,28 +47,28 @@ public class GrpcConsumerConcurrentTest extends CamelTestSupport { private static final String GRPC_TEST_PONG_VALUE = "PONG"; private static final String GRPC_USER_AGENT_PREFIX = "user-agent-"; private static AtomicInteger idCounter = new AtomicInteger(); - + public static Integer createId() { return idCounter.getAndIncrement(); } - + public static Integer getId() { return idCounter.get(); } - + @Test public void testAsyncWithConcurrentThreads() throws Exception { RunnableAssert ra = new RunnableAssert("foo") { - + @Override public void run() { final CountDownLatch latch = new CountDownLatch(1); ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build(); PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); - - PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); int instanceId = createId(); - + final PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build(); StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); requestObserver.onNext(pingRequest); @@ -79,33 +79,34 @@ public class GrpcConsumerConcurrentTest extends CamelTestSupport { } catch (InterruptedException e) { e.printStackTrace(); } - + PongResponse pongResponse = responseObserver.getPongResponse(); - + assertNotNull("instanceId = " + instanceId, pongResponse); assertEquals(instanceId, pongResponse.getPongId()); assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); - + asyncRequestChannel.shutdown().shutdownNow(); } }; - + new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT).run(); } - + @Test public void testHeadersWithConcurrentThreads() throws Exception { RunnableAssert ra = new RunnableAssert("foo") { - + @Override public void run() { int instanceId = createId(); final CountDownLatch latch = new CountDownLatch(1); - ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", GRPC_HEADERS_TEST_PORT).userAgent(GRPC_USER_AGENT_PREFIX + instanceId).usePlaintext(true).build(); + ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", GRPC_HEADERS_TEST_PORT).userAgent(GRPC_USER_AGENT_PREFIX + instanceId) + .usePlaintext(true).build(); PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); - - PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); - + + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + final PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build(); StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); requestObserver.onNext(pingRequest); @@ -116,39 +117,42 @@ public class GrpcConsumerConcurrentTest extends CamelTestSupport { } catch (InterruptedException e) { e.printStackTrace(); } - + PongResponse pongResponse = responseObserver.getPongResponse(); - + assertNotNull("instanceId = " + instanceId, pongResponse); assertEquals(instanceId, pongResponse.getPongId()); assertEquals(GRPC_USER_AGENT_PREFIX + instanceId, pongResponse.getPongName()); - + asyncRequestChannel.shutdown().shutdownNow(); } }; - + new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT).run(); } - + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override - public void configure() { - from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port=" + GRPC_ASYNC_REQUEST_TEST_PORT).bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); - from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port=" + GRPC_HEADERS_TEST_PORT).process(new HeaderExchangeProcessor()); + public void configure() { + from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&processingStrategy=AGGREGATION&host=localhost&port=" + GRPC_ASYNC_REQUEST_TEST_PORT) + .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); + + from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&processingStrategy=AGGREGATION&host=localhost&port=" + GRPC_HEADERS_TEST_PORT) + .process(new HeaderExchangeProcessor()); } }; } - + public class PongResponseStreamObserver implements StreamObserver<PongResponse> { private PongResponse pongResponse; private final CountDownLatch latch; - + public PongResponseStreamObserver(CountDownLatch latch) { this.latch = latch; } - + public PongResponse getPongResponse() { return pongResponse; } @@ -169,24 +173,26 @@ public class GrpcConsumerConcurrentTest extends CamelTestSupport { latch.countDown(); } } - + public class GrpcMessageBuilder { public PongResponse buildAsyncPongResponse(List<PingRequest> pingRequests) { - PongResponse pongResponse = PongResponse.newBuilder().setPongName(pingRequests.get(0).getPingName() + GRPC_TEST_PONG_VALUE).setPongId(pingRequests.get(0).getPingId()).build(); - return pongResponse; + return PongResponse.newBuilder().setPongName(pingRequests.get(0).getPingName() + GRPC_TEST_PONG_VALUE).setPongId(pingRequests.get(0).getPingId()).build(); } } - + public class HeaderExchangeProcessor implements Processor { - + @SuppressWarnings("unchecked") public void process(Exchange exchange) throws Exception { List<PingRequest> pingRequests = (List<PingRequest>)exchange.getIn().getBody(); String userAgentName = (String)exchange.getIn().getHeader(GrpcConstants.GRPC_USER_AGENT_HEADER); - - // As user agent name is prepended the library's user agent information it's necessary to extract this value (before first space) - PongResponse pongResponse = PongResponse.newBuilder().setPongName(userAgentName.substring(0, userAgentName.indexOf(' '))).setPongId(pingRequests.get(0).getPingId()).build(); + + // As user agent name is prepended the library's user agent + // information it's necessary to extract this value (before first + // space) + PongResponse pongResponse = PongResponse.newBuilder().setPongName(userAgentName.substring(0, userAgentName.indexOf(' '))).setPongId(pingRequests.get(0).getPingId()) + .build(); exchange.getIn().setBody(pongResponse); } - } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/57226689/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java new file mode 100644 index 0000000..123afe0 --- /dev/null +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.grpc; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.grpc.GrpcConsumerAggregationTest.GrpcMessageBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GrpcConsumerPropagationTest extends CamelTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerPropagationTest.class); + + private static final int GRPC_ASYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); + private static final int GRPC_TEST_PING_ID = 1; + private static final String GRPC_TEST_PING_VALUE = "PING"; + private static final String GRPC_TEST_PONG_VALUE = "PONG"; + + private ManagedChannel asyncRequestChannel; + private PingPongGrpc.PingPongStub nonBlockingStub; + private PingPongGrpc.PingPongStub asyncNonBlockingStub; + + @Before + public void startGrpcChannels() { + asyncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build(); + asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); + } + + @After + public void stopGrpcChannels() throws Exception { + asyncRequestChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + + @Test + public void testOnNextPropagation() throws Exception { + LOG.info("gRPC pingAsyncSync method aync test start"); + + final CountDownLatch latch = new CountDownLatch(1); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + + StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncSync(responseObserver); + requestObserver.onNext(pingRequest); + latch.await(5, TimeUnit.SECONDS); + + MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation"); + mockEndpoint.expectedMessageCount(1); + mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT); + mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER, "pingAsyncSync"); + + PongResponse pongResponse = responseObserver.getPongResponse(); + assertNotNull(pongResponse); + assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + } + + + @Test + public void testOnCompletedPropagation() throws Exception { + LOG.info("gRPC pingAsyncAsync method aync test start"); + final CountDownLatch latch = new CountDownLatch(1); + PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + + StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); + requestObserver.onCompleted(); + latch.await(5, TimeUnit.SECONDS); + + MockEndpoint mockEndpoint = getMockEndpoint("mock:async-propagation"); + mockEndpoint.expectedMessageCount(1); + mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED); + mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GrpcConstants.GRPC_METHOD_NAME_HEADER, "pingAsyncAsync"); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + + from("grpc://org.apache.camel.component.grpc.PingPong?processingStrategy=PROPAGATION&host=localhost&port=" + GRPC_ASYNC_REQUEST_TEST_PORT) + .to("mock:async-propagation") + .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); + } + }; + } + + public class PongResponseStreamObserver implements StreamObserver<PongResponse> { + private PongResponse pongResponse; + private final CountDownLatch latch; + + public PongResponseStreamObserver(CountDownLatch latch) { + this.latch = latch; + } + + public PongResponse getPongResponse() { + return pongResponse; + } + + @Override + public void onNext(PongResponse value) { + pongResponse = value; + latch.countDown(); + } + + @Override + public void onError(Throwable t) { + LOG.info("Exception", t); + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + } + + public class GrpcMessageBuilder { + + public PongResponse buildAsyncPongResponse(PingRequest pingRequests) { + return PongResponse.newBuilder().setPongName(pingRequests.getPingName() + GRPC_TEST_PONG_VALUE).setPongId(pingRequests.getPingId()).build(); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/57226689/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java deleted file mode 100644 index 1ef9808..0000000 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerTest.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.grpc; - -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.stub.StreamObserver; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; -import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class GrpcConsumerTest extends CamelTestSupport { - private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerTest.class); - - private static final int GRPC_SYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); - private static final int GRPC_ASYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable(); - private static final int GRPC_TEST_PING_ID = 1; - private static final String GRPC_TEST_PING_VALUE = "PING"; - private static final String GRPC_TEST_PONG_VALUE = "PONG"; - - private ManagedChannel syncRequestChannel; - private ManagedChannel asyncRequestChannel; - private PingPongGrpc.PingPongBlockingStub blockingStub; - private PingPongGrpc.PingPongStub nonBlockingStub; - private PingPongGrpc.PingPongStub asyncNonBlockingStub; - - private PongResponse pongResponse; - - @Before - public void startGrpcChannels() { - syncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_SYNC_REQUEST_TEST_PORT).usePlaintext(true).build(); - asyncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext(true).build(); - blockingStub = PingPongGrpc.newBlockingStub(syncRequestChannel); - nonBlockingStub = PingPongGrpc.newStub(syncRequestChannel); - asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); - } - - @After - public void stopGrpcChannels() { - syncRequestChannel.shutdown().shutdownNow(); - asyncRequestChannel.shutdown().shutdownNow(); - } - - @Test - public void testSyncSyncMethodInSync() throws Exception { - LOG.info("gRPC pingSyncSync method blocking test start"); - PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); - PongResponse pongResponse = blockingStub.pingSyncSync(pingRequest); - - assertNotNull(pongResponse); - assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); - assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); - } - - @Test - public void testSyncAsyncMethodInSync() throws Exception { - LOG.info("gRPC pingSyncAsync method blocking test start"); - PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); - Iterator<PongResponse> pongResponseIter = blockingStub.pingSyncAsync(pingRequest); - while (pongResponseIter.hasNext()) { - PongResponse pongResponse = pongResponseIter.next(); - assertNotNull(pongResponse); - assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); - assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); - } - } - - @Test - public void testSyncSyncMethodInAsync() throws Exception { - LOG.info("gRPC pingSyncSync method aync test start"); - final CountDownLatch latch = new CountDownLatch(1); - PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); - - StreamObserver<PongResponse> responseObserver = new StreamObserver<PongResponse>() { - - @Override - public void onNext(PongResponse value) { - pongResponse = value; - } - - @Override - public void onError(Throwable t) { - } - - @Override - public void onCompleted() { - latch.countDown(); - } - - }; - - nonBlockingStub.pingSyncSync(pingRequest, responseObserver); - latch.await(1, TimeUnit.SECONDS); - - assertNotNull(pongResponse); - assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); - assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); - } - - @Test - public void testSyncAsyncMethodInAsync() throws Exception { - LOG.info("gRPC pingSyncAsync method aync test start"); - final CountDownLatch latch = new CountDownLatch(1); - PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); - - StreamObserver<PongResponse> responseObserver = new StreamObserver<PongResponse>() { - - @Override - public void onNext(PongResponse value) { - pongResponse = value; - } - - @Override - public void onError(Throwable t) { - } - - @Override - public void onCompleted() { - latch.countDown(); - } - - }; - - nonBlockingStub.pingSyncAsync(pingRequest, responseObserver); - latch.await(1, TimeUnit.SECONDS); - - assertNotNull(pongResponse); - assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); - assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); - } - - @Test - public void testAsyncSyncMethodInAsync() throws Exception { - LOG.info("gRPC pingAsyncSync method aync test start"); - final CountDownLatch latch = new CountDownLatch(1); - PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); - - StreamObserver<PongResponse> responseObserver = new StreamObserver<PongResponse>() { - - @Override - public void onNext(PongResponse value) { - pongResponse = value; - } - - @Override - public void onError(Throwable t) { - } - - @Override - public void onCompleted() { - latch.countDown(); - } - - }; - - StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncSync(responseObserver); - requestObserver.onNext(pingRequest); - requestObserver.onNext(pingRequest); - requestObserver.onCompleted(); - latch.await(10, TimeUnit.SECONDS); - - assertNotNull(pongResponse); - assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); - assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); - } - - @Test - public void testAsyncAsyncMethodInAsync() throws Exception { - LOG.info("gRPC pingAsyncAsync method aync test start"); - final CountDownLatch latch = new CountDownLatch(1); - PingRequest pingRequest = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build(); - - StreamObserver<PongResponse> responseObserver = new StreamObserver<PongResponse>() { - - @Override - public void onNext(PongResponse value) { - pongResponse = value; - } - - @Override - public void onError(Throwable t) { - } - - @Override - public void onCompleted() { - latch.countDown(); - } - - }; - - StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); - requestObserver.onNext(pingRequest); - requestObserver.onNext(pingRequest); - requestObserver.onCompleted(); - latch.await(1000, TimeUnit.SECONDS); - - assertNotNull(pongResponse); - assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId()); - assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() { - from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port=" + GRPC_SYNC_REQUEST_TEST_PORT).bean(new GrpcMessageBuilder(), "buildPongResponse"); - from("grpc://org.apache.camel.component.grpc.PingPong?synchronous=true&host=localhost&port=" + GRPC_ASYNC_REQUEST_TEST_PORT).bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); - } - }; - } - - public class GrpcMessageBuilder { - public PongResponse buildPongResponse(PingRequest pingRequest) { - PongResponse pongResponse = PongResponse.newBuilder().setPongName(pingRequest.getPingName() + GRPC_TEST_PONG_VALUE).setPongId(pingRequest.getPingId()).build(); - return pongResponse; - } - - public PongResponse buildAsyncPongResponse(List<PingRequest> pingRequests) { - PongResponse pongResponse = PongResponse.newBuilder().setPongName(pingRequests.get(0).getPingName() + GRPC_TEST_PONG_VALUE).setPongId(pingRequests.get(0).getPingId()).build(); - return pongResponse; - } - } -}