This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new aa8dfb9 [core] use Endpoint instead of URI string to send exchnages using the producer template aa8dfb9 is described below commit aa8dfb994ab51fc8192562328453a52ef852fc6f Author: Luca Burgazzoli <lburgazz...@gmail.com> AuthorDate: Wed Oct 7 23:10:44 2020 +0200 [core] use Endpoint instead of URI string to send exchnages using the producer template --- .../main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 3ac39e6..2a97de4 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ProducerTemplate; @@ -57,6 +58,7 @@ public class CamelSinkTask extends SinkTask { private CamelKafkaConnectMain cms; private ProducerTemplate producer; private CamelSinkConnectorConfig config; + private Endpoint localEndpoint; @Override public String version() { @@ -93,9 +95,12 @@ public class CamelSinkTask extends SinkTask { .withAggregationTimeout(timeout) .build(camelContext); - producer = cms.getProducerTemplate(); cms.start(); + + producer = cms.getProducerTemplate(); + localEndpoint = cms.getCamelContext().getEndpoint(LOCAL_URL); + LOG.info("CamelSinkTask connector task started"); } catch (Exception e) { throw new ConnectException("Failed to create and start Camel context", e); @@ -136,7 +141,7 @@ public class CamelSinkTask extends SinkTask { exchange.getMessage().setBody(record.value()); LOG.debug("Sending exchange {} to {}", exchange.getExchangeId(), LOCAL_URL); - producer.send(LOCAL_URL, exchange); + producer.send(localEndpoint, exchange); if (exchange.isFailed()) { throw new ConnectException("Exchange delivery has failed!", exchange.getException());