This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new 8afe1de  CAMEL-16222: PooledExchangeFactory experiment
8afe1de is described below

commit 8afe1dec0e9011afb6bd9f9a52fc9e37dfeb0573
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Feb 23 09:17:35 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../camel/component/kafka/KafkaConsumer.java       | 36 ++++++++---
 .../camel/component/kafka/KafkaEndpoint.java       | 21 ------
 .../camel/component/kafka/KafkaConsumerTest.java   |  1 -
 .../camel/component/kafka/KafkaEndpointTest.java   | 75 ----------------------
 4 files changed, 27 insertions(+), 106 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index b911881..7078044 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -33,6 +33,7 @@ import java.util.regex.Pattern;
 import java.util.stream.StreamSupport;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
 import org.apache.camel.spi.HeaderFilterStrategy;
@@ -239,8 +240,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
         @SuppressWarnings("unchecked")
         protected boolean doRun() {
-            // allow to re-connect thread in case we use that to retry failed
-            // messages
+            // allow to re-connect thread in case we use that to retry failed 
messages
             boolean reConnect = false;
             boolean unsubscribing = false;
 
@@ -320,7 +320,7 @@ public class KafkaConsumer extends DefaultConsumer {
                                     LOG.trace("Partition = {}, offset = {}, 
key = {}, value = {}", record.partition(),
                                             record.offset(), record.key(), 
record.value());
                                 }
-                                Exchange exchange = 
endpoint.createKafkaExchange(record);
+                                Exchange exchange = 
createKafkaExchange(record);
 
                                 propagateHeaders(record, exchange, 
endpoint.getConfiguration());
 
@@ -355,14 +355,11 @@ public class KafkaConsumer extends DefaultConsumer {
                                     // processing failed due to an unhandled
                                     // exception, what should we do
                                     if 
(endpoint.getConfiguration().isBreakOnFirstError()) {
-                                        // we are failing and we should break
-                                        // out
+                                        // we are failing and we should break 
out
                                         LOG.warn(
                                                 "Error during processing {} 
from topic: {}. Will seek consumer to offset: {} and re-connect and start 
polling again.",
-                                                exchange,
-                                                topicName, 
partitionLastOffset, exchange.getException());
-                                        // force commit so we resume on next
-                                        // poll where we failed
+                                                exchange, topicName, 
partitionLastOffset, exchange.getException());
+                                        // force commit so we resume on next 
poll where we failed
                                         commitOffset(offsetRepository, 
partition, partitionLastOffset, true);
                                         // continue to next partition
                                         breakOnErrorHit = true;
@@ -380,6 +377,9 @@ public class KafkaConsumer extends DefaultConsumer {
                                     // offset state upon partition revoke
                                     
lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset);
                                 }
+
+                                // success so release the exchange
+                                releaseExchange(exchange, false);
                             }
 
                             if (!breakOnErrorHit) {
@@ -506,6 +506,24 @@ public class KafkaConsumer extends DefaultConsumer {
         }
     }
 
+    @SuppressWarnings("rawtypes")
+    private Exchange createKafkaExchange(ConsumerRecord record) {
+        Exchange exchange = createExchange(false);
+
+        Message message = exchange.getIn();
+        message.setHeader(KafkaConstants.PARTITION, record.partition());
+        message.setHeader(KafkaConstants.TOPIC, record.topic());
+        message.setHeader(KafkaConstants.OFFSET, record.offset());
+        message.setHeader(KafkaConstants.HEADERS, record.headers());
+        message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp());
+        if (record.key() != null) {
+            message.setHeader(KafkaConstants.KEY, record.key());
+        }
+        message.setBody(record.value());
+
+        return exchange;
+    }
+
     private void propagateHeaders(
             ConsumerRecord<Object, Object> record, Exchange exchange, 
KafkaConfiguration kafkaConfiguration) {
         HeaderFilterStrategy headerFilterStrategy = 
kafkaConfiguration.getHeaderFilterStrategy();
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 2ffc47e..950cbfa 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -21,8 +21,6 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -33,7 +31,6 @@ import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.support.SynchronousDelegateProducer;
 import org.apache.camel.util.CastUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -146,24 +143,6 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
                 "KafkaProducer[" + configuration.getTopic() + "]", core, max);
     }
 
-    @SuppressWarnings("rawtypes")
-    public Exchange createKafkaExchange(ConsumerRecord record) {
-        Exchange exchange = super.createExchange();
-
-        Message message = exchange.getIn();
-        message.setHeader(KafkaConstants.PARTITION, record.partition());
-        message.setHeader(KafkaConstants.TOPIC, record.topic());
-        message.setHeader(KafkaConstants.OFFSET, record.offset());
-        message.setHeader(KafkaConstants.HEADERS, record.headers());
-        message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp());
-        if (record.key() != null) {
-            message.setHeader(KafkaConstants.KEY, record.key());
-        }
-        message.setBody(record.value());
-
-        return exchange;
-    }
-
     protected KafkaProducer createProducer(KafkaEndpoint endpoint) {
         return new KafkaProducer(endpoint);
     }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 26188c4..7bad317 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -20,7 +20,6 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ExchangeFactory;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
deleted file mode 100644
index 0533013..0000000
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoSettings;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.when;
-
-@MockitoSettings
-public class KafkaEndpointTest {
-
-    private KafkaEndpoint endpoint;
-
-    @Mock
-    private ConsumerRecord<String, String> mockRecord;
-
-    @Mock
-    private KafkaComponent mockKafkaComponent;
-
-    @BeforeEach
-    public void setup() {
-        KafkaComponent kafka = new KafkaComponent(new DefaultCamelContext());
-        kafka.init();
-        endpoint = new KafkaEndpoint("kafka:mytopic?brokers=localhost", kafka);
-    }
-
-    @Test
-    public void createKafkaExchangeShouldSetHeaders() {
-
-        when(mockRecord.key()).thenReturn("somekey");
-        when(mockRecord.topic()).thenReturn("topic");
-        when(mockRecord.partition()).thenReturn(4);
-        when(mockRecord.offset()).thenReturn(56L);
-        when(mockRecord.timestamp()).thenReturn(1518026587392L);
-
-        Exchange exchange = endpoint.createKafkaExchange(mockRecord);
-        Message inMessage = exchange.getIn();
-        assertNotNull(inMessage);
-        assertEquals("somekey", inMessage.getHeader(KafkaConstants.KEY));
-        assertEquals("topic", inMessage.getHeader(KafkaConstants.TOPIC));
-        assertEquals(4, inMessage.getHeader(KafkaConstants.PARTITION));
-        assertEquals(56L, inMessage.getHeader(KafkaConstants.OFFSET));
-        assertEquals(1518026587392L, 
inMessage.getHeader(KafkaConstants.TIMESTAMP));
-    }
-
-    @Test
-    public void isSingletonShouldReturnTrue() {
-        assertTrue(endpoint.isSingleton());
-    }
-
-}

Reply via email to