This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 89575e5 CAMEL-16208: camel-vertx-kafka - Optimize a little bit 89575e5 is described below commit 89575e552d6f7a11786191f543a56aac557717cb Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Feb 15 07:23:27 2021 +0100 CAMEL-16208: camel-vertx-kafka - Optimize a little bit --- .../component/vertx/kafka/VertxKafkaConsumer.java | 42 ++++++++++------------ 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java index a857e44..ae99290 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java @@ -29,6 +29,7 @@ import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfigurat import org.apache.camel.component.vertx.kafka.operations.VertxKafkaConsumerOperations; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.SynchronizationAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,7 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable { private static final Logger LOG = LoggerFactory.getLogger(VertxKafkaConsumer.class); + private Synchronization onCompletion; private KafkaConsumer<Object, Object> kafkaConsumer; public VertxKafkaConsumer(final VertxKafkaEndpoint endpoint, final Processor processor) { @@ -43,6 +45,12 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable { } @Override + protected void doInit() throws Exception { + super.doInit(); + this.onCompletion = new ConsumerOnCompletion(); + } + + @Override protected void doStart() throws Exception { super.doStart(); @@ -105,33 +113,13 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable { propagatedHeaders.forEach((key, value) -> exchange.getIn().setHeader(key, value)); // add exchange callback - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { - @Override - public void onComplete(Exchange exchange) { - // at the moment we don't commit the offsets manually, we can add it in the future - } - - @Override - public void onFailure(Exchange exchange) { - // we do nothing here - processRollback(exchange); - } - }); + exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); // send message to next processor in the route getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange [{}] done.", exchange)); } private void onErrorListener(final Throwable error) { - final Exchange exchange = getEndpoint().createExchange(); - - // set the thrown exception - exchange.setException(error); - - // log exception if an exception occurred and was not handled - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, - exchange.getException()); - } + getExceptionHandler().handleException("Error from Kafka consumer.", error); } /** @@ -139,10 +127,18 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable { * * @param exchange the exchange */ - private void processRollback(Exchange exchange) { + protected void processRollback(Exchange exchange) { final Exception cause = exchange.getException(); if (cause != null) { getExceptionHandler().handleException("Error during processing exchange.", exchange, cause); } } + + private class ConsumerOnCompletion extends SynchronizationAdapter { + + @Override + public void onFailure(Exchange exchange) { + processRollback(exchange); + } + } }