CAMEL-11237: Adding request/response processing strategies

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dd9e7b04
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dd9e7b04
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dd9e7b04

Branch: refs/heads/master
Commit: dd9e7b048f277beeb2d4b04f4800dea1084fcd28
Parents: e72fc44
Author: Dmitry Volodin <dmvo...@gmail.com>
Authored: Thu May 18 18:45:06 2017 +0300
Committer: Dmitry Volodin <dmvo...@gmail.com>
Committed: Mon May 22 16:42:58 2017 +0300

----------------------------------------------------------------------
 .../camel/component/grpc/GrpcConfiguration.java | 15 +++++++-
 .../camel/component/grpc/GrpcConsumer.java      |  4 +++
 .../grpc/GrpcProcessingStrategies.java          | 38 ++++++++++++++++++++
 .../grpc/server/GrpcMethodHandler.java          | 11 +++++-
 .../GrpcRequestAbstractStreamObserver.java      |  3 --
 .../GrpcRequestAggregationStreamObserver.java   |  5 +--
 6 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
index 01d94b6..3589738 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
@@ -37,6 +37,8 @@ public class GrpcConfiguration {
     private String target;
     @UriParam(label = "producer", defaultValue = "true")
     private Boolean usePlainText = true;
+    @UriParam(label = "consumer")
+    private GrpcProcessingStrategies processingStrategy = 
GrpcProcessingStrategies.PROPAGATION;
 
     private String serviceName;
     private String servicePackage;
@@ -98,7 +100,7 @@ public class GrpcConfiguration {
     }
 
     /**
-     * The plaintext connection to the server flag
+     * The plain text connection to the server flag
      */
     public Boolean getUsePlainText() {
         return usePlainText;
@@ -109,6 +111,17 @@ public class GrpcConfiguration {
     }
 
     /**
+     * TBD
+     */
+    public GrpcProcessingStrategies getProcessingStrategy() {
+        return processingStrategy;
+    }
+
+    public void setProcessingStrategy(GrpcProcessingStrategies 
processingStrategy) {
+        this.processingStrategy = processingStrategy;
+    }
+
+    /**
      * The service name extracted from the full service name
      */
     protected String getServiceName() {

http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
index 29a2ada..3bcdec0 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
@@ -52,6 +52,10 @@ public class GrpcConsumer extends DefaultConsumer {
         this.endpoint = endpoint;
         this.configuration = configuration;
     }
+    
+    public GrpcConfiguration getConfiguration() {
+        return configuration;
+    }
 
     @Override
     protected void doStart() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProcessingStrategies.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProcessingStrategies.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProcessingStrategies.java
new file mode 100644
index 0000000..69766bf
--- /dev/null
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProcessingStrategies.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc;
+
+/*
+ * Available values for the request and response processing strategies
+ */
+public enum GrpcProcessingStrategies {
+    
+    AGGREGATION("AGGREGATION"),
+    PROPAGATION("PROPAGATION");
+
+    private final String strategy;
+
+    GrpcProcessingStrategies(final String strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override
+    public String toString() {
+        return strategy;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
index 2ed83cc..5e41932 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
@@ -27,6 +27,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.component.grpc.GrpcConstants;
 import org.apache.camel.component.grpc.GrpcConsumer;
 import org.apache.camel.component.grpc.GrpcEndpoint;
+import org.apache.camel.component.grpc.GrpcProcessingStrategies;
 
 /**
  * gRPC server method invocation handler
@@ -76,7 +77,15 @@ public class GrpcMethodHandler implements MethodHandler {
         } else if (args.length == 1 && args[0] instanceof StreamObserver) {
             // Single incoming parameter is instance of the 
io.grpc.stub.StreamObserver
             final StreamObserver<Object> responseObserver = 
(StreamObserver<Object>)args[0];
-            final StreamObserver<Object> requestObserver = new 
GrpcRequestAggregationStreamObserver(endpoint, consumer, responseObserver, 
grcpHeaders);
+            StreamObserver<Object> requestObserver = null;
+            
+            if (consumer.getConfiguration().getProcessingStrategy() == 
GrpcProcessingStrategies.AGGREGATION) {
+                requestObserver = new 
GrpcRequestAggregationStreamObserver(endpoint, consumer, responseObserver, 
grcpHeaders);
+            } else if (consumer.getConfiguration().getProcessingStrategy() == 
GrpcProcessingStrategies.PROPAGATION) {
+                requestObserver = new 
GrpcRequestPropagationStreamObserver(endpoint, consumer, responseObserver, 
grcpHeaders);
+            } else {
+                throw new IllegalArgumentException("gRPC processing strategy 
not implemented " + consumer.getConfiguration().getProcessingStrategy());
+            }
             
             return requestObserver;
         } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java
index b47e724..0850e97 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAbstractStreamObserver.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.grpc.server;
 
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
 import io.grpc.stub.StreamObserver;
@@ -33,7 +31,6 @@ public abstract class GrpcRequestAbstractStreamObserver 
implements StreamObserve
     protected final GrpcEndpoint endpoint;
     protected final GrpcConsumer consumer;
     protected Exchange exchange;
-    protected List<Object> requestList = new LinkedList<>();
     protected StreamObserver<Object> responseObserver;
     protected Map<String, Object> headers;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/dd9e7b04/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
----------------------------------------------------------------------
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
index 8f7ff6a..145029e 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestAggregationStreamObserver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.grpc.server;
 
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -28,9 +29,11 @@ import org.apache.camel.component.grpc.GrpcEndpoint;
  * onNext() call into the list and processing them in onCompleted()
  */
 public class GrpcRequestAggregationStreamObserver extends 
GrpcRequestAbstractStreamObserver {
+    private List<Object> requestList = new LinkedList<>();
 
     public GrpcRequestAggregationStreamObserver(GrpcEndpoint endpoint, 
GrpcConsumer consumer, StreamObserver<Object> responseObserver, Map<String, 
Object> headers) {
         super(endpoint, consumer, responseObserver, headers);
+        exchange = endpoint.createExchange();
     }
 
     @Override
@@ -46,8 +49,6 @@ public class GrpcRequestAggregationStreamObserver extends 
GrpcRequestAbstractStr
     @Override
     @SuppressWarnings("unchecked")
     public void onCompleted() {
-        exchange = endpoint.createExchange();
-
         exchange.getIn().setBody(requestList);
         exchange.getIn().setHeaders(headers);
 

Reply via email to