CAMEL-11237: Adding request/response processing strategies Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dd9e7b04 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dd9e7b04 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dd9e7b04
Branch: refs/heads/master Commit: dd9e7b048f277beeb2d4b04f4800dea1084fcd28 Parents: e72fc44 Author: Dmitry Volodin <dmvo...@gmail.com> Authored: Thu May 18 18:45:06 2017 +0300 Committer: Dmitry Volodin <dmvo...@gmail.com> Committed: Mon May 22 16:42:58 2017 +0300 ---------------------------------------------------------------------- .../camel/component/grpc/GrpcConfiguration.java | 15 +++++++- .../camel/component/grpc/GrpcConsumer.java | 4 +++ .../grpc/GrpcProcessingStrategies.java | 38 ++++++++++++++++++++ .../grpc/server/GrpcMethodHandler.java | 11 +++++- .../GrpcRequestAbstractStreamObserver.java | 3 -- .../GrpcRequestAggregationStreamObserver.java | 5 +-- 6 files changed, 69 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java index 01d94b6..3589738 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java @@ -37,6 +37,8 @@ public class GrpcConfiguration { private String target; @UriParam(label = "producer", defaultValue = "true") private Boolean usePlainText = true; + @UriParam(label = "consumer") + private GrpcProcessingStrategies processingStrategy = GrpcProcessingStrategies.PROPAGATION; private String serviceName; private String servicePackage; @@ -98,7 +100,7 @@ public class GrpcConfiguration { } /** - * The plaintext connection to the server flag + * The plain text connection to the server flag */ public Boolean getUsePlainText() { return usePlainText; @@ -109,6 +111,17 @@ public class GrpcConfiguration { } /** + * TBD + */ + public GrpcProcessingStrategies getProcessingStrategy() { + return processingStrategy; + } + + public void setProcessingStrategy(GrpcProcessingStrategies processingStrategy) { + this.processingStrategy = processingStrategy; + } + + /** * The service name extracted from the full service name */ protected String getServiceName() { http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java index 29a2ada..3bcdec0 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java @@ -52,6 +52,10 @@ public class GrpcConsumer extends DefaultConsumer { this.endpoint = endpoint; this.configuration = configuration; } + + public GrpcConfiguration getConfiguration() { + return configuration; + } @Override protected void doStart() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProcessingStrategies.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProcessingStrategies.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProcessingStrategies.java new file mode 100644 index 0000000..69766bf --- /dev/null +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProcessingStrategies.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.grpc; + +/* + * Available values for the request and response processing strategies + */ +public enum GrpcProcessingStrategies { + + AGGREGATION("AGGREGATION"), + PROPAGATION("PROPAGATION"); + + private final String strategy; + + GrpcProcessingStrategies(final String strategy) { + this.strategy = strategy; + } + + @Override + public String toString() { + return strategy; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java index 2ed83cc..5e41932 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java @@ -27,6 +27,7 @@ import org.apache.camel.Exchange; import org.apache.camel.component.grpc.GrpcConstants; import org.apache.camel.component.grpc.GrpcConsumer; import org.apache.camel.component.grpc.GrpcEndpoint; +import org.apache.camel.component.grpc.GrpcProcessingStrategies; /** * gRPC server method invocation handler @@ -76,7 +77,15 @@ public class GrpcMethodHandler implements MethodHandler { } else if (args.length == 1 && args[0] instanceof StreamObserver) { // Single incoming parameter is instance of the io.grpc.stub.StreamObserver final StreamObserver<Object> responseObserver = (StreamObserver<Object>)args[0]; - final StreamObserver<Object> requestObserver = new GrpcRequestAggregationStreamObserver(endpoint, consumer, responseObserver, grcpHeaders); + StreamObserver<Object> requestObserver = null; + + if (consumer.getConfiguration().getProcessingStrategy() == GrpcProcessingStrategies.AGGREGATION) { + requestObserver = new GrpcRequestAggregationStreamObserver(endpoint, consumer, responseObserver, grcpHeaders); + } else if (consumer.getConfiguration().getProcessingStrategy() == GrpcProcessingStrategies.PROPAGATION) { + requestObserver = new GrpcRequestPropagationStreamObserver(endpoint, consumer, responseObserver, grcpHeaders); + } else { + throw new IllegalArgumentException("gRPC processing strategy not implemented " + consumer.getConfiguration().getProcessingStrategy()); + } return requestObserver; } else { http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java index b47e724..0850e97 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java @@ -16,8 +16,6 @@ */ package org.apache.camel.component.grpc.server; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import io.grpc.stub.StreamObserver; @@ -33,7 +31,6 @@ public abstract class GrpcRequestAbstractStreamObserver implements StreamObserve protected final GrpcEndpoint endpoint; protected final GrpcConsumer consumer; protected Exchange exchange; - protected List<Object> requestList = new LinkedList<>(); protected StreamObserver<Object> responseObserver; protected Map<String, Object> headers; http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java ---------------------------------------------------------------------- diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java index 8f7ff6a..145029e 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.grpc.server; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -28,9 +29,11 @@ import org.apache.camel.component.grpc.GrpcEndpoint; * onNext() call into the list and processing them in onCompleted() */ public class GrpcRequestAggregationStreamObserver extends GrpcRequestAbstractStreamObserver { + private List<Object> requestList = new LinkedList<>(); public GrpcRequestAggregationStreamObserver(GrpcEndpoint endpoint, GrpcConsumer consumer, StreamObserver<Object> responseObserver, Map<String, Object> headers) { super(endpoint, consumer, responseObserver, headers); + exchange = endpoint.createExchange(); } @Override @@ -46,8 +49,6 @@ public class GrpcRequestAggregationStreamObserver extends GrpcRequestAbstractStr @Override @SuppressWarnings("unchecked") public void onCompleted() { - exchange = endpoint.createExchange(); - exchange.getIn().setBody(requestList); exchange.getIn().setHeaders(headers);