This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 10f89da6e66 CAMEL-20682: camel-kafka - KafkaIdempotentRepository 
misses continuou… (#14750)
10f89da6e66 is described below

commit 10f89da6e661564b724bb839d015966f5b9c4424
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Jul 7 12:23:53 2024 +0200

    CAMEL-20682: camel-kafka - KafkaIdempotentRepository misses continuou… 
(#14750)
    
    * CAMEL-20682: camel-kafka - KafkaIdempotentRepository misses continuous 
updates from its topic after startup
---
 .../catalog/beans/KafkaIdempotentRepository.json   |   2 +-
 .../kafka/KafkaIdempotentRepositoryConfigurer.java |  12 ++
 .../camel/bean/KafkaIdempotentRepository.json      |   2 +-
 .../camel-kafka/src/main/docs/kafka-component.adoc |  19 +--
 .../kafka/KafkaIdempotentRepository.java           | 171 ++++++++++++++-------
 .../ROOT/pages/camel-4x-upgrade-guide-4_8.adoc     |  13 ++
 .../modules/ROOT/pages/camel-4x-upgrade-guide.adoc |   1 +
 7 files changed, 154 insertions(+), 66 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json
index 78ec3900e64..de825e33c41 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json
@@ -10,7 +10,7 @@
     "groupId": "org.apache.camel",
     "artifactId": "camel-kafka",
     "version": "4.7.0-SNAPSHOT",
-    "properties": { "topic": { "index": 0, "kind": "property", "displayName": 
"Topic", "required": true, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the name of the Kafka topic used by this idempotent repository. Each 
functionally-separate repository should use a different topic." }, 
"bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap 
Servers", "required": true, "type": "strin [...]
+    "properties": { "topic": { "index": 0, "kind": "property", "displayName": 
"Topic", "required": true, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the name of the Kafka topic used by this idempotent repository. Each 
functionally-separate repository should use a different topic." }, 
"bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap 
Servers", "required": true, "type": "strin [...]
   }
 }
 
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java
index 2ea11d6163d..d8bc437de27 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java
@@ -25,10 +25,14 @@ public class KafkaIdempotentRepositoryConfigurer extends 
org.apache.camel.suppor
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "bootstrapservers":
         case "bootstrapServers": 
target.setBootstrapServers(property(camelContext, java.lang.String.class, 
value)); return true;
+        case "groupid":
+        case "groupId": target.setGroupId(property(camelContext, 
java.lang.String.class, value)); return true;
         case "maxcachesize":
         case "maxCacheSize": target.setMaxCacheSize(property(camelContext, 
int.class, value)); return true;
         case "polldurationms":
         case "pollDurationMs": target.setPollDurationMs(property(camelContext, 
int.class, value)); return true;
+        case "startuponly":
+        case "startupOnly": target.setStartupOnly(property(camelContext, 
boolean.class, value)); return true;
         case "topic": target.setTopic(property(camelContext, 
java.lang.String.class, value)); return true;
         default: return false;
         }
@@ -39,10 +43,14 @@ public class KafkaIdempotentRepositoryConfigurer extends 
org.apache.camel.suppor
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "bootstrapservers":
         case "bootstrapServers": return java.lang.String.class;
+        case "groupid":
+        case "groupId": return java.lang.String.class;
         case "maxcachesize":
         case "maxCacheSize": return int.class;
         case "polldurationms":
         case "pollDurationMs": return int.class;
+        case "startuponly":
+        case "startupOnly": return boolean.class;
         case "topic": return java.lang.String.class;
         default: return null;
         }
@@ -54,10 +62,14 @@ public class KafkaIdempotentRepositoryConfigurer extends 
org.apache.camel.suppor
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "bootstrapservers":
         case "bootstrapServers": return target.getBootstrapServers();
+        case "groupid":
+        case "groupId": return target.getGroupId();
         case "maxcachesize":
         case "maxCacheSize": return target.getMaxCacheSize();
         case "polldurationms":
         case "pollDurationMs": return target.getPollDurationMs();
+        case "startuponly":
+        case "startupOnly": return target.isStartupOnly();
         case "topic": return target.getTopic();
         default: return null;
         }
diff --git 
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json
 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json
index 78ec3900e64..de825e33c41 100644
--- 
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json
+++ 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json
@@ -10,7 +10,7 @@
     "groupId": "org.apache.camel",
     "artifactId": "camel-kafka",
     "version": "4.7.0-SNAPSHOT",
-    "properties": { "topic": { "index": 0, "kind": "property", "displayName": 
"Topic", "required": true, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the name of the Kafka topic used by this idempotent repository. Each 
functionally-separate repository should use a different topic." }, 
"bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap 
Servers", "required": true, "type": "strin [...]
+    "properties": { "topic": { "index": 0, "kind": "property", "displayName": 
"Topic", "required": true, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the name of the Kafka topic used by this idempotent repository. Each 
functionally-separate repository should use a different topic." }, 
"bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap 
Servers", "required": true, "type": "strin [...]
   }
 }
 
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index a00f0e78863..4968467e4af 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -250,17 +250,18 @@ On startup, the instance subscribes to the topic, rewinds 
the offset to the begi
 Be mindful of the format of the header used for the uniqueness check. By 
default, it uses Strings as the data types. When using primitive numeric 
formats, the header must be deserialized accordingly. Check the samples below 
for examples.
 
 A `KafkaIdempotentRepository` has the following properties:
-[width="100%",cols="2m,5",options="header"]
+[width="100%",cols="2m,2m,5",options="header"]
 |===
-| Property | Description
-| topic | The name of the Kafka topic to use to broadcast changes. (required)
-| bootstrapServers | The `bootstrap.servers` property on the internal Kafka 
producer and consumer. Use this as shorthand if not setting `consumerConfig` 
and `producerConfig`. If used, this component will apply sensible default 
configurations for the producer and consumer.
-| producerConfig | Sets the properties that will be used by the Kafka producer 
that broadcasts changes. Overrides `bootstrapServers`, so must define the Kafka 
`bootstrap.servers` property itself
-| consumerConfig | Sets the properties that will be used by the Kafka consumer 
that populates the cache from the topic. Overrides `bootstrapServers`, so must 
define the Kafka `bootstrap.servers` property itself
-| maxCacheSize | How many of the most recently used keys should be stored in 
memory (default 1000).
-| pollDurationMs | The poll duration of the Kafka consumer. The local caches 
are updated immediately. This value will affect how far behind other peers that 
update their caches from the topic are relative to the idempotent consumer 
instance that sent the cache action message. The default value of this is 100 
ms. +
+| Property | Default | Description
+| topic | | *Required* The name of the Kafka topic to use to broadcast 
changes. (required)
+| bootstrapServers | | *Required* The `bootstrap.servers` property on the 
internal Kafka producer and consumer. Use this as shorthand if not setting 
`consumerConfig` and `producerConfig`. If used, this component will apply 
sensible default configurations for the producer and consumer.
+| groupId | | The groupId to assign to the idempotent consumer.
+| startupOnly | false | Whether to sync on startup only, or to continue 
syncing while Camel is running.
+| maxCacheSize | 1000 | How many of the most recently used keys should be 
stored in memory (default 1000).
+| pollDurationMs | 100 | The poll duration of the Kafka consumer. The local 
caches are updated immediately. This value will affect how far behind other 
peers that update their caches from the topic are relative to the idempotent 
consumer instance that sent the cache action message. The default value of this 
is 100 ms.
 If setting this value explicitly, be aware that there is a tradeoff between 
the remote cache liveness and the volume of network traffic between this 
repository's consumer and the Kafka brokers. The cache warmup process also 
depends on there being one poll that fetches nothing - this indicates that the 
stream has been consumed up to the current point. If the poll duration is 
excessively long for the rate at which messages are sent on the topic, there 
exists a possibility that the cache ca [...]
-| groupId | The groupId to assign to the idempotent consumer. If not 
specified, it will be randomized.
+| producerConfig | | Sets the properties that will be used by the Kafka 
producer that broadcasts changes. Overrides `bootstrapServers`, so must define 
the Kafka `bootstrap.servers` property itself
+| consumerConfig | | Sets the properties that will be used by the Kafka 
consumer that populates the cache from the topic. Overrides `bootstrapServers`, 
so must define the Kafka `bootstrap.servers` property itself
 |===
 
 The repository can be instantiated by defining the `topic` and 
`bootstrapServers`, or the `producerConfig` and `consumerConfig` property sets 
can be explicitly defined to enable features such as SSL/SASL.
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index b5ce20da233..6a9e05db116 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -22,7 +22,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
@@ -33,10 +35,13 @@ import org.apache.camel.spi.Configurer;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.support.LRUCacheFactory;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.StringHelper;
+import org.apache.camel.util.TimeUtils;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -63,8 +68,7 @@ import org.slf4j.LoggerFactory;
  * instance that uses the topic (e.g. typically on different machines running 
in parallel) controls its own consumer
  * group, so in a cluster of 10 Camel processes using the same topic each will 
control its own offset. On startup, the
  * instance consumes the full content of the topic, rebuilding the cache to 
the latest state. To use, this repository
- * must be placed in the Camel registry, either manually or by registration as 
a bean in Spring/Blueprint, as it is
- * CamelContext aware.
+ * must be placed in the Camel registry.
  */
 @Metadata(label = "bean",
           description = "Idempotent repository that uses Kafka to store 
message ids. Uses a local cache of previously seen Message IDs."
@@ -81,6 +85,9 @@ public class KafkaIdempotentRepository extends ServiceSupport 
implements Idempot
     private static final int DEFAULT_POLL_DURATION_MS = 100;
 
     private CamelContext camelContext;
+    private ExecutorService executorService;
+    private TopicPoller poller;
+    private final AtomicLong cacheCounter = new AtomicLong();
     // internal properties
     private Map<String, Object> cache;
     private Consumer<String, String> consumer;
@@ -88,7 +95,6 @@ public class KafkaIdempotentRepository extends ServiceSupport 
implements Idempot
 
     private Properties producerConfig;
     private Properties consumerConfig;
-    private String groupId; // not in use
 
     // configurable
     @Metadata(description = "Sets the name of the Kafka topic used by this 
idempotent repository."
@@ -97,6 +103,9 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
     private String topic;
     @Metadata(description = "The URL for the kafka brokers to use", required = 
true)
     private String bootstrapServers;
+    @Metadata(description = "A string that uniquely identifies the group of 
consumer processes to which this consumer belongs. By setting the"
+                            + " same group id, multiple processes can indicate 
that they are all part of the same consumer group.")
+    private String groupId;
     @Metadata(description = "Sets the maximum size of the local key cache.", 
defaultValue = "" + DEFAULT_MAXIMUM_CACHE_SIZE)
     private int maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
     @Metadata(description = "Sets the poll duration of the Kafka consumer. The 
local caches are updated immediately; this value will affect"
@@ -110,6 +119,8 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
                             + " operate in an inconsistent state relative to 
its peers until it catches up.",
               defaultValue = "" + DEFAULT_POLL_DURATION_MS)
     private int pollDurationMs = DEFAULT_POLL_DURATION_MS;
+    @Metadata(description = "Whether to sync on startup only, or to continue 
syncing while Camel is running.")
+    private boolean startupOnly;
 
     enum CacheAction {
         add,
@@ -124,10 +135,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, 
DEFAULT_POLL_DURATION_MS);
     }
 
-    /**
-     * @deprecated Use the constructor without groupId; the parameter groupId 
is ignored.
-     */
-    @Deprecated
     public KafkaIdempotentRepository(String topic, String bootstrapServers, 
String groupId) {
         this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, 
DEFAULT_POLL_DURATION_MS, groupId);
     }
@@ -143,9 +150,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this(topic, consumerConfig, producerConfig, 
DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
     }
 
-    /**
-     * @deprecated Use the constructor without groupId; the parameter groupId 
is ignored.
-     */
     @Deprecated
     public KafkaIdempotentRepository(String topic, Properties consumerConfig, 
Properties producerConfig, String groupId) {
         this(topic, consumerConfig, producerConfig, 
DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId);
@@ -160,10 +164,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this.pollDurationMs = pollDurationMs;
     }
 
-    /**
-     * @deprecated Use the constructor without groupId; the parameter groupId 
is ignored.
-     */
-    @Deprecated
     public KafkaIdempotentRepository(String topic, String bootstrapServers, 
int maxCacheSize, int pollDurationMs,
                                      String groupId) {
         this.topic = topic;
@@ -173,10 +173,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this.groupId = groupId;
     }
 
-    /**
-     * @deprecated Use the constructor without groupId; the parameter groupId 
is ignored.
-     */
-    @Deprecated
     public KafkaIdempotentRepository(String topic, Properties consumerConfig, 
Properties producerConfig, int maxCacheSize,
                                      int pollDurationMs, String groupId) {
         this.topic = topic;
@@ -214,6 +210,17 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this.bootstrapServers = bootstrapServers;
     }
 
+    public boolean isStartupOnly() {
+        return startupOnly;
+    }
+
+    /**
+     * Whether to sync on startup only, or to continue syncing while Camel is 
running.
+     */
+    public void setStartupOnly(boolean startupOnly) {
+        this.startupOnly = startupOnly;
+    }
+
     public Properties getProducerConfig() {
         return producerConfig;
     }
@@ -225,7 +232,7 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
      * <pre>
      * bootstrap.servers
      * </pre>
-     *
+     * <p>
      * property itself. Prefer using {@link #bootstrapServers} for default 
configuration unless you specifically need
      * non-standard configuration options such as SSL/SASL.
      *
@@ -246,7 +253,7 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
      * <pre>
      * bootstrap.servers
      * </pre>
-     *
+     * <p>
      * property itself. Prefer using {@link #bootstrapServers} for default 
configuration unless you specifically need
      * non-standard configuration options such as SSL/SASL.
      *
@@ -290,21 +297,14 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         this.pollDurationMs = pollDurationMs;
     }
 
-    /**
-     * @deprecated The parameter groupId is ignored.
-     */
-    @Deprecated
     public String getGroupId() {
         return groupId;
     }
 
     /**
-     * Sets the group id of the Kafka consumer.
-     *
-     * @param      groupId The poll duration in milliseconds.
-     * @deprecated         The parameter groupId is ignored.
+     * A string that uniquely identifies the group of consumer processes to 
which this consumer belongs. By setting the
+     * same group id, multiple processes can indicate that they are all part 
of the same consumer group.
      */
-    @Deprecated
     public void setGroupId(String groupId) {
         this.groupId = groupId;
     }
@@ -330,6 +330,9 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
             consumerConfig = new Properties();
             StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
             consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+            if (groupId != null) {
+                consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+            }
         }
 
         if (producerConfig == null) {
@@ -355,7 +358,34 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         producerConfig.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0");
         producer = new KafkaProducer<>(producerConfig);
 
-        populateCache();
+        poller = new TopicPoller();
+        ServiceHelper.startService(poller);
+        // populate cache on startup to be ready
+        StopWatch watch = new StopWatch();
+        LOG.info("Syncing KafkaIdempotentRepository from topic: {} starting", 
topic);
+        poller.run();
+        LOG.info("Syncing KafkaIdempotentRepository from topic: {} complete: 
{}", topic,
+                TimeUtils.printDuration(watch.taken(), true));
+
+        if (!startupOnly) {
+            // continue sync job in background
+            executorService
+                    = 
camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, 
"KafkaIdempotentRepositorySync");
+            LOG.info("Syncing KafkaIdempotentRepository from topic: {} 
continuously using background thread", topic);
+            executorService.submit(poller);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (executorService != null && camelContext != null) {
+            camelContext.getExecutorServiceManager().shutdown(executorService);
+            executorService = null;
+        }
+        ServiceHelper.stopService(poller);
+        IOHelper.close(consumer, "consumer", LOG);
+        IOHelper.close(producer, "producer", LOG);
+        LOG.debug("Stopped KafkaIdempotentRepository. Cache counter: {}", 
cacheCounter.get());
     }
 
     private void populateCache() {
@@ -363,7 +393,7 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
         List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
         Collection<TopicPartition> partitions = partitionInfos.stream()
                 .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
-                .collect(Collectors.toUnmodifiableList());
+                .toList();
 
         LOG.debug("Assigning consumer to partitions {}", partitions);
         consumer.assign(partitions);
@@ -379,38 +409,60 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
                 addToCache(consumerRecord);
             }
         }
+    }
 
+    private class TopicPoller extends ServiceSupport implements Runnable {
+
+        private final AtomicBoolean init = new AtomicBoolean();
+
+        @Override
+        public void run() {
+            if (init.compareAndSet(false, true)) {
+                // sync cache on startup
+                LOG.debug("TopicPoller populating cache on startup");
+                populateCache();
+                LOG.debug("TopicPoller populated cache on startup complete");
+                return;
+            }
+
+            LOG.debug("TopicPoller running");
+            while (isRunAllowed()) {
+                try {
+                    ConsumerRecords<String, String> consumerRecords = 
consumer.poll(Duration.ofMillis(pollDurationMs));
+                    for (ConsumerRecord<String, String> consumerRecord : 
consumerRecords) {
+                        addToCache(consumerRecord);
+                    }
+                } catch (Exception e) {
+                    LOG.warn("TopicPoller error syncing due to: " + 
e.getMessage() + ". This exception is ignored.", e);
+                }
+            }
+            LOG.debug("TopicPoller stopping");
+        }
     }
 
     private void addToCache(ConsumerRecord<String, String> consumerRecord) {
-        CacheAction action = null;
+        cacheCounter.incrementAndGet();
+        CacheAction action;
         try {
             action = CacheAction.valueOf(consumerRecord.value());
+            String messageId = consumerRecord.key();
+            if (action == CacheAction.add) {
+                LOG.debug("Adding to cache messageId:{}", messageId);
+                cache.put(messageId, messageId);
+            } else if (action == CacheAction.remove) {
+                LOG.debug("Removing from cache messageId:{}", messageId);
+                cache.remove(messageId);
+            } else if (action == CacheAction.clear) {
+                cache.clear();
+            } else {
+                throw new IllegalArgumentException("Unknown action");
+            }
         } catch (IllegalArgumentException iax) {
-            LOG.error(
-                    "Unexpected action value:\"{}\" received on [topic:{}, 
partition:{}, offset:{}]. Shutting down.",
+            LOG.warn(
+                    "Unexpected action value:\"{}\" received on [topic:{}, 
partition:{}, offset:{}]. Ignoring.",
                     consumerRecord.key(), consumerRecord.topic(),
                     consumerRecord.partition(), consumerRecord.offset());
         }
-        String messageId = consumerRecord.key();
-        if (action == CacheAction.add) {
-            LOG.debug("Adding to cache messageId:{}", messageId);
-            cache.put(messageId, messageId);
-        } else if (action == CacheAction.remove) {
-            LOG.debug("Removing from cache messageId:{}", messageId);
-            cache.remove(messageId);
-        } else if (action == CacheAction.clear) {
-            cache.clear();
-        } else {
-            // this should never happen
-            throw new RuntimeException("Illegal action " + action + " for key 
" + consumerRecord.key());
-        }
-    }
-
-    @Override
-    protected void doStop() {
-        IOHelper.close(consumer, "consumer", LOG);
-        IOHelper.close(producer, "producer", LOG);
     }
 
     @Override
@@ -444,7 +496,6 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
     @Override
     @ManagedOperation(description = "Does the store contain the given key")
     public boolean contains(String key) {
-        LOG.debug("Checking cache for key:{}", key);
         return cache.containsKey(key);
     }
 
@@ -468,4 +519,14 @@ public class KafkaIdempotentRepository extends 
ServiceSupport implements Idempot
     public void clear() {
         broadcastAction(null, CacheAction.clear);
     }
+
+    @ManagedOperation(description = "Number of sync events received from the 
kafka topic")
+    public long getCacheCounter() {
+        return cacheCounter.get();
+    }
+
+    @ManagedOperation(description = "Number of elements currently in the 
cache")
+    public long getCacheSize() {
+        return cache != null ? cache.size() : 0;
+    }
 }
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_8.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_8.adoc
new file mode 100644
index 00000000000..367e1bb595a
--- /dev/null
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_8.adoc
@@ -0,0 +1,13 @@
+= Apache Camel 4.x Upgrade Guide
+
+This document is for helping you upgrade your Apache Camel application
+from Camel 4.x to 4.y. For example, if you are upgrading Camel 4.0 to 4.2, 
then you should follow the guides
+from both 4.0 to 4.1 and 4.1 to 4.2.
+
+== Upgrading Camel 4.7 to 4.8
+
+=== camel-kafka
+
+The `KafkaIdempotentRepository` will now continue to sync cache updates after 
Camel has been started.
+You can configure `startupOnly=true` to only sync the cache once on startup
+(however then the cache is not synced with other Camel nodes in a cluster).
diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc
index f3eea1fec9b..8dfad84fd20 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc
@@ -16,4 +16,5 @@ You can find upgrade guide for each release in the following 
pages:
 - xref:camel-4x-upgrade-guide-4_5.adoc[Upgrade guide 4.4 -> 4.5]
 - xref:camel-4x-upgrade-guide-4_6.adoc[Upgrade guide 4.5 -> 4.6]
 - xref:camel-4x-upgrade-guide-4_7.adoc[Upgrade guide 4.6 -> 4.7]
+- xref:camel-4x-upgrade-guide-4_8.adoc[Upgrade guide 4.7 -> 4.8]
 

Reply via email to