This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new eafe3e5 CAMEL-17026: initial refactoring to allow easier implementation of the feature eafe3e5 is described below commit eafe3e53eae767acbda051a58a118e8132369a6f Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Thu Sep 30 16:52:40 2021 +0200 CAMEL-17026: initial refactoring to allow easier implementation of the feature --- .../camel/component/kafka/KafkaProducer.java | 173 ++++++--------------- .../kafka/producer/support/DelegatingCallback.java | 38 +++++ .../producer/support/KafkaProducerCallBack.java | 111 +++++++++++++ 3 files changed, 197 insertions(+), 125 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 8d908c6..0bc394f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -29,16 +29,18 @@ import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.component.kafka.producer.support.DelegatingCallback; +import org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack; import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.util.KeyValueHolder; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerConfig; @@ -104,19 +106,7 @@ public class KafkaProducer extends DefaultAsyncProducer { protected void doStart() throws Exception { Properties props = getProps(); if (kafkaProducer == null) { - ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); - try { - // Kafka uses reflection for loading authentication settings, - // use its classloader - Thread.currentThread() - .setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); - LOG.trace("Creating KafkaProducer"); - kafkaProducer = endpoint.getKafkaClientFactory().getProducer(props); - closeKafkaProducer = true; - } finally { - Thread.currentThread().setContextClassLoader(threadClassLoader); - } - LOG.debug("Created KafkaProducer: {}", kafkaProducer); + createProducer(props); } // if we are in asynchronous mode we need a worker pool @@ -127,6 +117,22 @@ public class KafkaProducer extends DefaultAsyncProducer { } } + private void createProducer(Properties props) { + ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); + try { + // Kafka uses reflection for loading authentication settings, + // use its classloader + Thread.currentThread() + .setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); + LOG.trace("Creating KafkaProducer"); + kafkaProducer = endpoint.getKafkaClientFactory().getProducer(props); + closeKafkaProducer = true; + } finally { + Thread.currentThread().setContextClassLoader(threadClassLoader); + } + LOG.debug("Created KafkaProducer: {}", kafkaProducer); + } + @Override protected void doStop() throws Exception { if (kafkaProducer != null && closeKafkaProducer) { @@ -217,18 +223,11 @@ public class KafkaProducer extends DefaultAsyncProducer { } if (innerMmessage.getHeader(KafkaConstants.PARTITION_KEY) != null) { - innerPartitionKey = endpoint.getConfiguration().getPartitionKey() != null - ? endpoint.getConfiguration().getPartitionKey() - : innerMmessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class); + innerPartitionKey = getInnerPartitionKey(innerMmessage); } if (innerMmessage.getHeader(KafkaConstants.KEY) != null) { - innerKey = endpoint.getConfiguration().getKey() != null - ? endpoint.getConfiguration().getKey() : innerMmessage.getHeader(KafkaConstants.KEY); - if (innerKey != null) { - innerKey = tryConvertToSerializedType(innerExchange, innerKey, - endpoint.getConfiguration().getKeySerializer()); - } + innerKey = getInnerKey(innerExchange, innerMmessage); } if (innerMmessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) != null) { @@ -240,7 +239,6 @@ public class KafkaProducer extends DefaultAsyncProducer { ex = innerExchange == null ? exchange : innerExchange; value = tryConvertToSerializedType(ex, innerMmessage.getBody(), endpoint.getConfiguration().getValueSerializer()); - } return new KeyValueHolder( @@ -249,6 +247,25 @@ public class KafkaProducer extends DefaultAsyncProducer { innerTopic, innerPartitionKey, innerTimestamp, innerKey, value, propagatedHeaders)); } + private Object getInnerKey(Exchange innerExchange, Message innerMmessage) { + Object innerKey; + innerKey = endpoint.getConfiguration().getKey() != null + ? endpoint.getConfiguration().getKey() : innerMmessage.getHeader(KafkaConstants.KEY); + if (innerKey != null) { + innerKey = tryConvertToSerializedType(innerExchange, innerKey, + endpoint.getConfiguration().getKeySerializer()); + } + return innerKey; + } + + private Integer getInnerPartitionKey(Message innerMmessage) { + Integer innerPartitionKey; + innerPartitionKey = endpoint.getConfiguration().getPartitionKey() != null + ? endpoint.getConfiguration().getPartitionKey() + : innerMmessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class); + return innerPartitionKey; + } + @Override public void remove() { msgList.remove(); @@ -257,13 +274,13 @@ public class KafkaProducer extends DefaultAsyncProducer { } // endpoint take precedence over header configuration - final Integer partitionKey = endpoint.getConfiguration().getPartitionKey() != null - ? endpoint.getConfiguration().getPartitionKey() - : exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class); + final Integer partitionKey = ObjectHelper.supplyIfEmpty(endpoint.getConfiguration().getPartitionKey(), + () -> exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class)); // endpoint take precedence over header configuration - Object key = endpoint.getConfiguration().getKey() != null - ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY); + Object key = ObjectHelper.supplyIfEmpty(endpoint.getConfiguration().getKey(), + () -> exchange.getIn().getHeader(KafkaConstants.KEY)); + if (key != null) { key = tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializer()); } @@ -348,7 +365,7 @@ public class KafkaProducer extends DefaultAsyncProducer { public boolean process(Exchange exchange, AsyncCallback callback) { try { Iterator<KeyValueHolder<Object, ProducerRecord>> c = createRecorder(exchange); - KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback); + KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback, workerPool, endpoint.getConfiguration()); while (c.hasNext()) { cb.increment(); KeyValueHolder<Object, ProducerRecord> exrec = c.next(); @@ -358,7 +375,7 @@ public class KafkaProducer extends DefaultAsyncProducer { } List<Callback> delegates = new ArrayList<>(Arrays.asList(cb)); if (exrec.getKey() != null) { - delegates.add(new KafkaProducerCallBack(exrec.getKey())); + delegates.add(new KafkaProducerCallBack(exrec.getKey(), workerPool, endpoint.getConfiguration())); } kafkaProducer.send(rec, new DelegatingCallback(delegates.toArray(new Callback[0]))); } @@ -397,98 +414,4 @@ public class KafkaProducer extends DefaultAsyncProducer { return answer != null ? answer : object; } - private static final class DelegatingCallback implements Callback { - - private final List<Callback> callbacks; - - public DelegatingCallback(Callback... callbacks) { - this.callbacks = Arrays.asList(callbacks); - } - - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - callbacks.forEach(c -> c.onCompletion(metadata, exception)); - } - } - - private final class KafkaProducerCallBack implements Callback { - - private final Object body; - private final AsyncCallback callback; - private final AtomicInteger count = new AtomicInteger(1); - private final List<RecordMetadata> recordMetadatas = new ArrayList<>(); - - KafkaProducerCallBack(Object body, AsyncCallback callback) { - this.body = body; - this.callback = callback; - if (endpoint.getConfiguration().isRecordMetadata()) { - if (body instanceof Exchange) { - Exchange ex = (Exchange) body; - ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas); - } - if (body instanceof Message) { - Message msg = (Message) body; - msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas); - } - } - } - - public KafkaProducerCallBack(Exchange exchange) { - this(exchange, null); - } - - public KafkaProducerCallBack(Message message) { - this(message, null); - } - - public KafkaProducerCallBack(Object body) { - this(body, null); - } - - void increment() { - count.incrementAndGet(); - } - - boolean allSent() { - if (count.decrementAndGet() == 0) { - LOG.trace("All messages sent, continue routing."); - // was able to get all the work done while queuing the requests - if (callback != null) { - callback.done(true); - } - return true; - } - return false; - } - - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if (e != null) { - if (body instanceof Exchange) { - ((Exchange) body).setException(e); - } - if (body instanceof Message && ((Message) body).getExchange() != null) { - ((Message) body).getExchange().setException(e); - } - } - - recordMetadatas.add(recordMetadata); - - if (count.decrementAndGet() == 0) { - // use worker pool to continue routing the exchange - // as this thread is from Kafka Callback and should not be used - // by Camel routing - workerPool.submit(new Runnable() { - @Override - public void run() { - LOG.trace("All messages sent, continue routing."); - if (callback != null) { - callback.done(false); - } - } - }); - } - } - } - } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java new file mode 100644 index 0000000..21ced69 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.producer.support; + +import java.util.Arrays; +import java.util.List; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; + +public final class DelegatingCallback implements Callback { + + private final List<Callback> callbacks; + + public DelegatingCallback(Callback... callbacks) { + this.callbacks = Arrays.asList(callbacks); + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + callbacks.forEach(c -> c.onCompletion(metadata, exception)); + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java new file mode 100644 index 0000000..83f09a0 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.producer.support; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaProducerCallBack implements Callback { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerCallBack.class); + + private final Object body; + private final AsyncCallback callback; + private final AtomicInteger count = new AtomicInteger(1); + private final List<RecordMetadata> recordMetadatas = new ArrayList<>(); + private final ExecutorService workerPool; + + public KafkaProducerCallBack(Object body, ExecutorService workerPool, KafkaConfiguration configuration) { + this(body, null, workerPool, configuration); + } + + public KafkaProducerCallBack(Object body, AsyncCallback callback, ExecutorService workerPool, + KafkaConfiguration configuration) { + this.body = body; + this.callback = callback; + this.workerPool = Objects.requireNonNull(workerPool, "A worker pool must be provided"); + + if (configuration.isRecordMetadata()) { + if (body instanceof Exchange) { + Exchange ex = (Exchange) body; + ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas); + } + if (body instanceof Message) { + Message msg = (Message) body; + msg.setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas); + } + } + } + + public void increment() { + count.incrementAndGet(); + } + + public boolean allSent() { + if (count.decrementAndGet() == 0) { + LOG.trace("All messages sent, continue routing."); + // was able to get all the work done while queuing the requests + if (callback != null) { + callback.done(true); + } + return true; + } + return false; + } + + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if (e != null) { + if (body instanceof Exchange) { + ((Exchange) body).setException(e); + } + if (body instanceof Message && ((Message) body).getExchange() != null) { + ((Message) body).getExchange().setException(e); + } + } + + recordMetadatas.add(recordMetadata); + + if (count.decrementAndGet() == 0) { + // use worker pool to continue routing the exchange + // as this thread is from Kafka Callback and should not be used + // by Camel routing + workerPool.submit(new Runnable() { + @Override + public void run() { + LOG.trace("All messages sent, continue routing."); + if (callback != null) { + callback.done(false); + } + } + }); + } + } +}