This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new eafe3e5  CAMEL-17026: initial refactoring to allow easier 
implementation of the feature
eafe3e5 is described below

commit eafe3e53eae767acbda051a58a118e8132369a6f
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Thu Sep 30 16:52:40 2021 +0200

    CAMEL-17026: initial refactoring to allow easier implementation of the 
feature
---
 .../camel/component/kafka/KafkaProducer.java       | 173 ++++++---------------
 .../kafka/producer/support/DelegatingCallback.java |  38 +++++
 .../producer/support/KafkaProducerCallBack.java    | 111 +++++++++++++
 3 files changed, 197 insertions(+), 125 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 8d908c6..0bc394f 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -29,16 +29,18 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.component.kafka.producer.support.DelegatingCallback;
+import org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack;
 import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.KeyValueHolder;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -104,19 +106,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     protected void doStart() throws Exception {
         Properties props = getProps();
         if (kafkaProducer == null) {
-            ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
-            try {
-                // Kafka uses reflection for loading authentication settings,
-                // use its classloader
-                Thread.currentThread()
-                        
.setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
-                LOG.trace("Creating KafkaProducer");
-                kafkaProducer = 
endpoint.getKafkaClientFactory().getProducer(props);
-                closeKafkaProducer = true;
-            } finally {
-                
Thread.currentThread().setContextClassLoader(threadClassLoader);
-            }
-            LOG.debug("Created KafkaProducer: {}", kafkaProducer);
+            createProducer(props);
         }
 
         // if we are in asynchronous mode we need a worker pool
@@ -127,6 +117,22 @@ public class KafkaProducer extends DefaultAsyncProducer {
         }
     }
 
+    private void createProducer(Properties props) {
+        ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            // Kafka uses reflection for loading authentication settings,
+            // use its classloader
+            Thread.currentThread()
+                    
.setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
+            LOG.trace("Creating KafkaProducer");
+            kafkaProducer = 
endpoint.getKafkaClientFactory().getProducer(props);
+            closeKafkaProducer = true;
+        } finally {
+            Thread.currentThread().setContextClassLoader(threadClassLoader);
+        }
+        LOG.debug("Created KafkaProducer: {}", kafkaProducer);
+    }
+
     @Override
     protected void doStop() throws Exception {
         if (kafkaProducer != null && closeKafkaProducer) {
@@ -217,18 +223,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
                         }
 
                         if 
(innerMmessage.getHeader(KafkaConstants.PARTITION_KEY) != null) {
-                            innerPartitionKey = 
endpoint.getConfiguration().getPartitionKey() != null
-                                    ? 
endpoint.getConfiguration().getPartitionKey()
-                                    : 
innerMmessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+                            innerPartitionKey = 
getInnerPartitionKey(innerMmessage);
                         }
 
                         if (innerMmessage.getHeader(KafkaConstants.KEY) != 
null) {
-                            innerKey = endpoint.getConfiguration().getKey() != 
null
-                                    ? endpoint.getConfiguration().getKey() : 
innerMmessage.getHeader(KafkaConstants.KEY);
-                            if (innerKey != null) {
-                                innerKey = 
tryConvertToSerializedType(innerExchange, innerKey,
-                                        
endpoint.getConfiguration().getKeySerializer());
-                            }
+                            innerKey = getInnerKey(innerExchange, 
innerMmessage);
                         }
 
                         if 
(innerMmessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) != null) {
@@ -240,7 +239,6 @@ public class KafkaProducer extends DefaultAsyncProducer {
                         ex = innerExchange == null ? exchange : innerExchange;
                         value = tryConvertToSerializedType(ex, 
innerMmessage.getBody(),
                                 
endpoint.getConfiguration().getValueSerializer());
-
                     }
 
                     return new KeyValueHolder(
@@ -249,6 +247,25 @@ public class KafkaProducer extends DefaultAsyncProducer {
                                     innerTopic, innerPartitionKey, 
innerTimestamp, innerKey, value, propagatedHeaders));
                 }
 
+                private Object getInnerKey(Exchange innerExchange, Message 
innerMmessage) {
+                    Object innerKey;
+                    innerKey = endpoint.getConfiguration().getKey() != null
+                            ? endpoint.getConfiguration().getKey() : 
innerMmessage.getHeader(KafkaConstants.KEY);
+                    if (innerKey != null) {
+                        innerKey = tryConvertToSerializedType(innerExchange, 
innerKey,
+                                
endpoint.getConfiguration().getKeySerializer());
+                    }
+                    return innerKey;
+                }
+
+                private Integer getInnerPartitionKey(Message innerMmessage) {
+                    Integer innerPartitionKey;
+                    innerPartitionKey = 
endpoint.getConfiguration().getPartitionKey() != null
+                            ? endpoint.getConfiguration().getPartitionKey()
+                            : 
innerMmessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+                    return innerPartitionKey;
+                }
+
                 @Override
                 public void remove() {
                     msgList.remove();
@@ -257,13 +274,13 @@ public class KafkaProducer extends DefaultAsyncProducer {
         }
 
         // endpoint take precedence over header configuration
-        final Integer partitionKey = 
endpoint.getConfiguration().getPartitionKey() != null
-                ? endpoint.getConfiguration().getPartitionKey()
-                : exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, 
Integer.class);
+        final Integer partitionKey = 
ObjectHelper.supplyIfEmpty(endpoint.getConfiguration().getPartitionKey(),
+                () -> exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, 
Integer.class));
 
         // endpoint take precedence over header configuration
-        Object key = endpoint.getConfiguration().getKey() != null
-                ? endpoint.getConfiguration().getKey() : 
exchange.getIn().getHeader(KafkaConstants.KEY);
+        Object key = 
ObjectHelper.supplyIfEmpty(endpoint.getConfiguration().getKey(),
+                () -> exchange.getIn().getHeader(KafkaConstants.KEY));
+
         if (key != null) {
             key = tryConvertToSerializedType(exchange, key, 
endpoint.getConfiguration().getKeySerializer());
         }
@@ -348,7 +365,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
             Iterator<KeyValueHolder<Object, ProducerRecord>> c = 
createRecorder(exchange);
-            KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, 
callback);
+            KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, 
callback, workerPool, endpoint.getConfiguration());
             while (c.hasNext()) {
                 cb.increment();
                 KeyValueHolder<Object, ProducerRecord> exrec = c.next();
@@ -358,7 +375,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
                 }
                 List<Callback> delegates = new ArrayList<>(Arrays.asList(cb));
                 if (exrec.getKey() != null) {
-                    delegates.add(new KafkaProducerCallBack(exrec.getKey()));
+                    delegates.add(new KafkaProducerCallBack(exrec.getKey(), 
workerPool, endpoint.getConfiguration()));
                 }
                 kafkaProducer.send(rec, new 
DelegatingCallback(delegates.toArray(new Callback[0])));
             }
@@ -397,98 +414,4 @@ public class KafkaProducer extends DefaultAsyncProducer {
         return answer != null ? answer : object;
     }
 
-    private static final class DelegatingCallback implements Callback {
-
-        private final List<Callback> callbacks;
-
-        public DelegatingCallback(Callback... callbacks) {
-            this.callbacks = Arrays.asList(callbacks);
-        }
-
-        @Override
-        public void onCompletion(RecordMetadata metadata, Exception exception) 
{
-            callbacks.forEach(c -> c.onCompletion(metadata, exception));
-        }
-    }
-
-    private final class KafkaProducerCallBack implements Callback {
-
-        private final Object body;
-        private final AsyncCallback callback;
-        private final AtomicInteger count = new AtomicInteger(1);
-        private final List<RecordMetadata> recordMetadatas = new ArrayList<>();
-
-        KafkaProducerCallBack(Object body, AsyncCallback callback) {
-            this.body = body;
-            this.callback = callback;
-            if (endpoint.getConfiguration().isRecordMetadata()) {
-                if (body instanceof Exchange) {
-                    Exchange ex = (Exchange) body;
-                    ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
-                }
-                if (body instanceof Message) {
-                    Message msg = (Message) body;
-                    msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
-                }
-            }
-        }
-
-        public KafkaProducerCallBack(Exchange exchange) {
-            this(exchange, null);
-        }
-
-        public KafkaProducerCallBack(Message message) {
-            this(message, null);
-        }
-
-        public KafkaProducerCallBack(Object body) {
-            this(body, null);
-        }
-
-        void increment() {
-            count.incrementAndGet();
-        }
-
-        boolean allSent() {
-            if (count.decrementAndGet() == 0) {
-                LOG.trace("All messages sent, continue routing.");
-                // was able to get all the work done while queuing the requests
-                if (callback != null) {
-                    callback.done(true);
-                }
-                return true;
-            }
-            return false;
-        }
-
-        @Override
-        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-            if (e != null) {
-                if (body instanceof Exchange) {
-                    ((Exchange) body).setException(e);
-                }
-                if (body instanceof Message && ((Message) body).getExchange() 
!= null) {
-                    ((Message) body).getExchange().setException(e);
-                }
-            }
-
-            recordMetadatas.add(recordMetadata);
-
-            if (count.decrementAndGet() == 0) {
-                // use worker pool to continue routing the exchange
-                // as this thread is from Kafka Callback and should not be used
-                // by Camel routing
-                workerPool.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        LOG.trace("All messages sent, continue routing.");
-                        if (callback != null) {
-                            callback.done(false);
-                        }
-                    }
-                });
-            }
-        }
-    }
-
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
new file mode 100644
index 0000000..21ced69
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.producer.support;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+public final class DelegatingCallback implements Callback {
+
+    private final List<Callback> callbacks;
+
+    public DelegatingCallback(Callback... callbacks) {
+        this.callbacks = Arrays.asList(callbacks);
+    }
+
+    @Override
+    public void onCompletion(RecordMetadata metadata, Exception exception) {
+        callbacks.forEach(c -> c.onCompletion(metadata, exception));
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
new file mode 100644
index 0000000..83f09a0
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.producer.support;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaProducerCallBack implements Callback {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaProducerCallBack.class);
+
+    private final Object body;
+    private final AsyncCallback callback;
+    private final AtomicInteger count = new AtomicInteger(1);
+    private final List<RecordMetadata> recordMetadatas = new ArrayList<>();
+    private final ExecutorService workerPool;
+
+    public KafkaProducerCallBack(Object body, ExecutorService workerPool, 
KafkaConfiguration configuration) {
+        this(body, null, workerPool, configuration);
+    }
+
+    public KafkaProducerCallBack(Object body, AsyncCallback callback, 
ExecutorService workerPool,
+                                 KafkaConfiguration configuration) {
+        this.body = body;
+        this.callback = callback;
+        this.workerPool = Objects.requireNonNull(workerPool, "A worker pool 
must be provided");
+
+        if (configuration.isRecordMetadata()) {
+            if (body instanceof Exchange) {
+                Exchange ex = (Exchange) body;
+                ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
+            }
+            if (body instanceof Message) {
+                Message msg = (Message) body;
+                msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
+            }
+        }
+    }
+
+    public void increment() {
+        count.incrementAndGet();
+    }
+
+    public boolean allSent() {
+        if (count.decrementAndGet() == 0) {
+            LOG.trace("All messages sent, continue routing.");
+            // was able to get all the work done while queuing the requests
+            if (callback != null) {
+                callback.done(true);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+        if (e != null) {
+            if (body instanceof Exchange) {
+                ((Exchange) body).setException(e);
+            }
+            if (body instanceof Message && ((Message) body).getExchange() != 
null) {
+                ((Message) body).getExchange().setException(e);
+            }
+        }
+
+        recordMetadatas.add(recordMetadata);
+
+        if (count.decrementAndGet() == 0) {
+            // use worker pool to continue routing the exchange
+            // as this thread is from Kafka Callback and should not be used
+            // by Camel routing
+            workerPool.submit(new Runnable() {
+                @Override
+                public void run() {
+                    LOG.trace("All messages sent, continue routing.");
+                    if (callback != null) {
+                        callback.done(false);
+                    }
+                }
+            });
+        }
+    }
+}

Reply via email to