This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 17c3b2c CAMEL-13118: Components should not depend on camel-core but camel-support 17c3b2c is described below commit 17c3b2c8b08a64bacbd40006642989796b72c6a9 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Apr 18 08:09:04 2019 +0200 CAMEL-13118: Components should not depend on camel-core but camel-support --- components/camel-grpc/pom.xml | 2 +- .../apache/camel/component/grpc/GrpcProducer.java | 10 ++++++- .../client/GrpcResponseRouterStreamObserver.java | 31 +++++++++++++--------- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/components/camel-grpc/pom.xml b/components/camel-grpc/pom.xml index 5d3a06d..db4545c 100644 --- a/components/camel-grpc/pom.xml +++ b/components/camel-grpc/pom.xml @@ -43,7 +43,7 @@ <!-- requires camel-core --> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-core</artifactId> + <artifactId>camel-support</artifactId> </dependency> <dependency> diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java index ad542be..3e99b9c 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java @@ -38,6 +38,7 @@ import org.apache.camel.component.grpc.client.GrpcResponseRouterStreamObserver; import org.apache.camel.spi.ClassResolver; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.ResourceHelper; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; /** @@ -117,11 +118,18 @@ public class GrpcProducer extends DefaultAsyncProducer { if (configuration.getStreamRepliesTo() != null) { this.globalResponseObserver = new GrpcResponseRouterStreamObserver(configuration, getEndpoint()); } + + if (this.globalResponseObserver != null) { + ServiceHelper.startService(this.globalResponseObserver); + } } } @Override protected void doStop() throws Exception { + if (this.globalResponseObserver != null) { + ServiceHelper.stopService(this.globalResponseObserver); + } if (channel != null) { forwarder.shutdown(); forwarder = null; @@ -136,7 +144,7 @@ public class GrpcProducer extends DefaultAsyncProducer { } protected void initializeChannel() throws Exception { - NettyChannelBuilder channelBuilder = null; + NettyChannelBuilder channelBuilder; if (!ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) { log.info("Creating channel to the remote gRPC server {}:{}", configuration.getHost(), configuration.getPort()); diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java index c1edbd0..749b53e 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java @@ -17,31 +17,28 @@ package org.apache.camel.component.grpc.client; import io.grpc.stub.StreamObserver; -import org.apache.camel.AsyncProcessor; +import org.apache.camel.AsyncProducer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.component.grpc.GrpcConfiguration; import org.apache.camel.component.grpc.GrpcConstants; -import org.apache.camel.impl.DefaultProducerCache; -import org.apache.camel.spi.ProducerCache; import org.apache.camel.support.CamelContextHelper; +import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.support.service.ServiceSupport; /** * A stream observer that routes all responses to another endpoint. */ -public class GrpcResponseRouterStreamObserver implements StreamObserver<Object> { +public class GrpcResponseRouterStreamObserver extends ServiceSupport implements StreamObserver<Object> { private final Endpoint sourceEndpoint; private final GrpcConfiguration configuration; - private final Endpoint endpoint; - private final ProducerCache producerCache; + private Endpoint endpoint; + private AsyncProducer producer; - public GrpcResponseRouterStreamObserver(GrpcConfiguration configuration, Endpoint sourceEndpoint) { + public GrpcResponseRouterStreamObserver(GrpcConfiguration configuration, Endpoint sourceEndpoint) throws Exception { this.configuration = configuration; this.sourceEndpoint = sourceEndpoint; - this.endpoint = CamelContextHelper.getMandatoryEndpoint(sourceEndpoint.getCamelContext(), configuration.getStreamRepliesTo()); - sourceEndpoint.getCamelContext().createProducerTemplate(-1); - this.producerCache = new DefaultProducerCache(this, sourceEndpoint.getCamelContext(), -1); } @Override @@ -50,7 +47,6 @@ public class GrpcResponseRouterStreamObserver implements StreamObserver<Object> exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER, GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT); exchange.getIn().setBody(o); doSend(exchange); - } @Override @@ -72,10 +68,19 @@ public class GrpcResponseRouterStreamObserver implements StreamObserver<Object> } } - private void doSend(Exchange exchange) { + producer.processAsync(exchange); + } - producerCache.doInAsyncProducer(endpoint, exchange, doneSync -> { }, AsyncProcessor::process); + @Override + protected void doStart() throws Exception { + this.endpoint = CamelContextHelper.getMandatoryEndpoint(sourceEndpoint.getCamelContext(), configuration.getStreamRepliesTo()); + this.producer = endpoint.createAsyncProducer(); + ServiceHelper.startService(producer); } + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(producer); + } }