This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new 8afe1de CAMEL-16222: PooledExchangeFactory experiment 8afe1de is described below commit 8afe1dec0e9011afb6bd9f9a52fc9e37dfeb0573 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 23 09:17:35 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../camel/component/kafka/KafkaConsumer.java | 36 ++++++++--- .../camel/component/kafka/KafkaEndpoint.java | 21 ------ .../camel/component/kafka/KafkaConsumerTest.java | 1 - .../camel/component/kafka/KafkaEndpointTest.java | 75 ---------------------- 4 files changed, 27 insertions(+), 106 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index b911881..7078044 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -33,6 +33,7 @@ import java.util.regex.Pattern; import java.util.stream.StreamSupport; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer; import org.apache.camel.spi.HeaderFilterStrategy; @@ -239,8 +240,7 @@ public class KafkaConsumer extends DefaultConsumer { @SuppressWarnings("unchecked") protected boolean doRun() { - // allow to re-connect thread in case we use that to retry failed - // messages + // allow to re-connect thread in case we use that to retry failed messages boolean reConnect = false; boolean unsubscribing = false; @@ -320,7 +320,7 @@ public class KafkaConsumer extends DefaultConsumer { LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value()); } - Exchange exchange = endpoint.createKafkaExchange(record); + Exchange exchange = createKafkaExchange(record); propagateHeaders(record, exchange, endpoint.getConfiguration()); @@ -355,14 +355,11 @@ public class KafkaConsumer extends DefaultConsumer { // processing failed due to an unhandled // exception, what should we do if (endpoint.getConfiguration().isBreakOnFirstError()) { - // we are failing and we should break - // out + // we are failing and we should break out LOG.warn( "Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", - exchange, - topicName, partitionLastOffset, exchange.getException()); - // force commit so we resume on next - // poll where we failed + exchange, topicName, partitionLastOffset, exchange.getException()); + // force commit so we resume on next poll where we failed commitOffset(offsetRepository, partition, partitionLastOffset, true); // continue to next partition breakOnErrorHit = true; @@ -380,6 +377,9 @@ public class KafkaConsumer extends DefaultConsumer { // offset state upon partition revoke lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset); } + + // success so release the exchange + releaseExchange(exchange, false); } if (!breakOnErrorHit) { @@ -506,6 +506,24 @@ public class KafkaConsumer extends DefaultConsumer { } } + @SuppressWarnings("rawtypes") + private Exchange createKafkaExchange(ConsumerRecord record) { + Exchange exchange = createExchange(false); + + Message message = exchange.getIn(); + message.setHeader(KafkaConstants.PARTITION, record.partition()); + message.setHeader(KafkaConstants.TOPIC, record.topic()); + message.setHeader(KafkaConstants.OFFSET, record.offset()); + message.setHeader(KafkaConstants.HEADERS, record.headers()); + message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp()); + if (record.key() != null) { + message.setHeader(KafkaConstants.KEY, record.key()); + } + message.setBody(record.value()); + + return exchange; + } + private void propagateHeaders( ConsumerRecord<Object, Object> record, Exchange exchange, KafkaConfiguration kafkaConfiguration) { HeaderFilterStrategy headerFilterStrategy = kafkaConfiguration.getHeaderFilterStrategy(); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 2ffc47e..950cbfa 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -21,8 +21,6 @@ import java.util.concurrent.ExecutorService; import org.apache.camel.Category; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; -import org.apache.camel.Message; import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -33,7 +31,6 @@ import org.apache.camel.support.DefaultEndpoint; import org.apache.camel.support.SynchronousDelegateProducer; import org.apache.camel.util.CastUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Deserializer; @@ -146,24 +143,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS "KafkaProducer[" + configuration.getTopic() + "]", core, max); } - @SuppressWarnings("rawtypes") - public Exchange createKafkaExchange(ConsumerRecord record) { - Exchange exchange = super.createExchange(); - - Message message = exchange.getIn(); - message.setHeader(KafkaConstants.PARTITION, record.partition()); - message.setHeader(KafkaConstants.TOPIC, record.topic()); - message.setHeader(KafkaConstants.OFFSET, record.offset()); - message.setHeader(KafkaConstants.HEADERS, record.headers()); - message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp()); - if (record.key() != null) { - message.setHeader(KafkaConstants.KEY, record.key()); - } - message.setBody(record.value()); - - return exchange; - } - protected KafkaProducer createProducer(KafkaEndpoint endpoint) { return new KafkaProducer(endpoint); } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java index 26188c4..7bad317 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java @@ -20,7 +20,6 @@ import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; import org.apache.camel.spi.ExchangeFactory; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java deleted file mode 100644 index 0533013..0000000 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kafka; - -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoSettings; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.when; - -@MockitoSettings -public class KafkaEndpointTest { - - private KafkaEndpoint endpoint; - - @Mock - private ConsumerRecord<String, String> mockRecord; - - @Mock - private KafkaComponent mockKafkaComponent; - - @BeforeEach - public void setup() { - KafkaComponent kafka = new KafkaComponent(new DefaultCamelContext()); - kafka.init(); - endpoint = new KafkaEndpoint("kafka:mytopic?brokers=localhost", kafka); - } - - @Test - public void createKafkaExchangeShouldSetHeaders() { - - when(mockRecord.key()).thenReturn("somekey"); - when(mockRecord.topic()).thenReturn("topic"); - when(mockRecord.partition()).thenReturn(4); - when(mockRecord.offset()).thenReturn(56L); - when(mockRecord.timestamp()).thenReturn(1518026587392L); - - Exchange exchange = endpoint.createKafkaExchange(mockRecord); - Message inMessage = exchange.getIn(); - assertNotNull(inMessage); - assertEquals("somekey", inMessage.getHeader(KafkaConstants.KEY)); - assertEquals("topic", inMessage.getHeader(KafkaConstants.TOPIC)); - assertEquals(4, inMessage.getHeader(KafkaConstants.PARTITION)); - assertEquals(56L, inMessage.getHeader(KafkaConstants.OFFSET)); - assertEquals(1518026587392L, inMessage.getHeader(KafkaConstants.TIMESTAMP)); - } - - @Test - public void isSingletonShouldReturnTrue() { - assertTrue(endpoint.isSingleton()); - } - -}