This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 17c3b2c  CAMEL-13118: Components should not depend on camel-core but 
camel-support
17c3b2c is described below

commit 17c3b2c8b08a64bacbd40006642989796b72c6a9
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Apr 18 08:09:04 2019 +0200

    CAMEL-13118: Components should not depend on camel-core but camel-support
---
 components/camel-grpc/pom.xml                      |  2 +-
 .../apache/camel/component/grpc/GrpcProducer.java  | 10 ++++++-
 .../client/GrpcResponseRouterStreamObserver.java   | 31 +++++++++++++---------
 3 files changed, 28 insertions(+), 15 deletions(-)

diff --git a/components/camel-grpc/pom.xml b/components/camel-grpc/pom.xml
index 5d3a06d..db4545c 100644
--- a/components/camel-grpc/pom.xml
+++ b/components/camel-grpc/pom.xml
@@ -43,7 +43,7 @@
         <!-- requires camel-core -->
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-core</artifactId>
+            <artifactId>camel-support</artifactId>
         </dependency>
 
         <dependency>
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
index ad542be..3e99b9c 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
@@ -38,6 +38,7 @@ import 
org.apache.camel.component.grpc.client.GrpcResponseRouterStreamObserver;
 import org.apache.camel.spi.ClassResolver;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 
 /**
@@ -117,11 +118,18 @@ public class GrpcProducer extends DefaultAsyncProducer {
             if (configuration.getStreamRepliesTo() != null) {
                 this.globalResponseObserver = new 
GrpcResponseRouterStreamObserver(configuration, getEndpoint());
             }
+
+            if (this.globalResponseObserver != null) {
+                ServiceHelper.startService(this.globalResponseObserver);
+            }
         }
     }
 
     @Override
     protected void doStop() throws Exception {
+        if (this.globalResponseObserver != null) {
+            ServiceHelper.stopService(this.globalResponseObserver);
+        }
         if (channel != null) {
             forwarder.shutdown();
             forwarder = null;
@@ -136,7 +144,7 @@ public class GrpcProducer extends DefaultAsyncProducer {
     }
 
     protected void initializeChannel() throws Exception {
-        NettyChannelBuilder channelBuilder = null;
+        NettyChannelBuilder channelBuilder;
         
         if (!ObjectHelper.isEmpty(configuration.getHost()) && 
!ObjectHelper.isEmpty(configuration.getPort())) {
             log.info("Creating channel to the remote gRPC server {}:{}", 
configuration.getHost(), configuration.getPort());
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
index c1edbd0..749b53e 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
@@ -17,31 +17,28 @@
 package org.apache.camel.component.grpc.client;
 
 import io.grpc.stub.StreamObserver;
-import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.grpc.GrpcConfiguration;
 import org.apache.camel.component.grpc.GrpcConstants;
-import org.apache.camel.impl.DefaultProducerCache;
-import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.service.ServiceSupport;
 
 /**
  * A stream observer that routes all responses to another endpoint.
  */
-public class GrpcResponseRouterStreamObserver implements 
StreamObserver<Object> {
+public class GrpcResponseRouterStreamObserver extends ServiceSupport 
implements StreamObserver<Object> {
 
     private final Endpoint sourceEndpoint;
     private final GrpcConfiguration configuration;
-    private final Endpoint endpoint;
-    private final ProducerCache producerCache;
+    private Endpoint endpoint;
+    private AsyncProducer producer;
 
-    public GrpcResponseRouterStreamObserver(GrpcConfiguration configuration, 
Endpoint sourceEndpoint) {
+    public GrpcResponseRouterStreamObserver(GrpcConfiguration configuration, 
Endpoint sourceEndpoint) throws Exception {
         this.configuration = configuration;
         this.sourceEndpoint = sourceEndpoint;
-        this.endpoint = 
CamelContextHelper.getMandatoryEndpoint(sourceEndpoint.getCamelContext(), 
configuration.getStreamRepliesTo());
-        sourceEndpoint.getCamelContext().createProducerTemplate(-1);
-        this.producerCache = new DefaultProducerCache(this, 
sourceEndpoint.getCamelContext(), -1);
     }
 
     @Override
@@ -50,7 +47,6 @@ public class GrpcResponseRouterStreamObserver implements 
StreamObserver<Object>
         exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, 
GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT);
         exchange.getIn().setBody(o);
         doSend(exchange);
-
     }
 
     @Override
@@ -72,10 +68,19 @@ public class GrpcResponseRouterStreamObserver implements 
StreamObserver<Object>
         }
     }
 
-
     private void doSend(Exchange exchange) {
+        producer.processAsync(exchange);
+    }
 
-        producerCache.doInAsyncProducer(endpoint, exchange, doneSync -> { }, 
AsyncProcessor::process);
+    @Override
+    protected void doStart() throws Exception {
+        this.endpoint = 
CamelContextHelper.getMandatoryEndpoint(sourceEndpoint.getCamelContext(), 
configuration.getStreamRepliesTo());
+        this.producer = endpoint.createAsyncProducer();
+        ServiceHelper.startService(producer);
     }
 
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(producer);
+    }
 }

Reply via email to