This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 10f89da6e66 CAMEL-20682: camel-kafka - KafkaIdempotentRepository misses continuou… (#14750) 10f89da6e66 is described below commit 10f89da6e661564b724bb839d015966f5b9c4424 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Jul 7 12:23:53 2024 +0200 CAMEL-20682: camel-kafka - KafkaIdempotentRepository misses continuou… (#14750) * CAMEL-20682: camel-kafka - KafkaIdempotentRepository misses continuous updates from its topic after startup --- .../catalog/beans/KafkaIdempotentRepository.json | 2 +- .../kafka/KafkaIdempotentRepositoryConfigurer.java | 12 ++ .../camel/bean/KafkaIdempotentRepository.json | 2 +- .../camel-kafka/src/main/docs/kafka-component.adoc | 19 +-- .../kafka/KafkaIdempotentRepository.java | 171 ++++++++++++++------- .../ROOT/pages/camel-4x-upgrade-guide-4_8.adoc | 13 ++ .../modules/ROOT/pages/camel-4x-upgrade-guide.adoc | 1 + 7 files changed, 154 insertions(+), 66 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json index 78ec3900e64..de825e33c41 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/beans/KafkaIdempotentRepository.json @@ -10,7 +10,7 @@ "groupId": "org.apache.camel", "artifactId": "camel-kafka", "version": "4.7.0-SNAPSHOT", - "properties": { "topic": { "index": 0, "kind": "property", "displayName": "Topic", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository should use a different topic." }, "bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap Servers", "required": true, "type": "strin [...] + "properties": { "topic": { "index": 0, "kind": "property", "displayName": "Topic", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository should use a different topic." }, "bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap Servers", "required": true, "type": "strin [...] } } diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java index 2ea11d6163d..d8bc437de27 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryConfigurer.java @@ -25,10 +25,14 @@ public class KafkaIdempotentRepositoryConfigurer extends org.apache.camel.suppor switch (ignoreCase ? name.toLowerCase() : name) { case "bootstrapservers": case "bootstrapServers": target.setBootstrapServers(property(camelContext, java.lang.String.class, value)); return true; + case "groupid": + case "groupId": target.setGroupId(property(camelContext, java.lang.String.class, value)); return true; case "maxcachesize": case "maxCacheSize": target.setMaxCacheSize(property(camelContext, int.class, value)); return true; case "polldurationms": case "pollDurationMs": target.setPollDurationMs(property(camelContext, int.class, value)); return true; + case "startuponly": + case "startupOnly": target.setStartupOnly(property(camelContext, boolean.class, value)); return true; case "topic": target.setTopic(property(camelContext, java.lang.String.class, value)); return true; default: return false; } @@ -39,10 +43,14 @@ public class KafkaIdempotentRepositoryConfigurer extends org.apache.camel.suppor switch (ignoreCase ? name.toLowerCase() : name) { case "bootstrapservers": case "bootstrapServers": return java.lang.String.class; + case "groupid": + case "groupId": return java.lang.String.class; case "maxcachesize": case "maxCacheSize": return int.class; case "polldurationms": case "pollDurationMs": return int.class; + case "startuponly": + case "startupOnly": return boolean.class; case "topic": return java.lang.String.class; default: return null; } @@ -54,10 +62,14 @@ public class KafkaIdempotentRepositoryConfigurer extends org.apache.camel.suppor switch (ignoreCase ? name.toLowerCase() : name) { case "bootstrapservers": case "bootstrapServers": return target.getBootstrapServers(); + case "groupid": + case "groupId": return target.getGroupId(); case "maxcachesize": case "maxCacheSize": return target.getMaxCacheSize(); case "polldurationms": case "pollDurationMs": return target.getPollDurationMs(); + case "startuponly": + case "startupOnly": return target.isStartupOnly(); case "topic": return target.getTopic(); default: return null; } diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json index 78ec3900e64..de825e33c41 100644 --- a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json +++ b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/bean/KafkaIdempotentRepository.json @@ -10,7 +10,7 @@ "groupId": "org.apache.camel", "artifactId": "camel-kafka", "version": "4.7.0-SNAPSHOT", - "properties": { "topic": { "index": 0, "kind": "property", "displayName": "Topic", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository should use a different topic." }, "bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap Servers", "required": true, "type": "strin [...] + "properties": { "topic": { "index": 0, "kind": "property", "displayName": "Topic", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository should use a different topic." }, "bootstrapServers": { "index": 1, "kind": "property", "displayName": "Bootstrap Servers", "required": true, "type": "strin [...] } } diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index a00f0e78863..4968467e4af 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -250,17 +250,18 @@ On startup, the instance subscribes to the topic, rewinds the offset to the begi Be mindful of the format of the header used for the uniqueness check. By default, it uses Strings as the data types. When using primitive numeric formats, the header must be deserialized accordingly. Check the samples below for examples. A `KafkaIdempotentRepository` has the following properties: -[width="100%",cols="2m,5",options="header"] +[width="100%",cols="2m,2m,5",options="header"] |=== -| Property | Description -| topic | The name of the Kafka topic to use to broadcast changes. (required) -| bootstrapServers | The `bootstrap.servers` property on the internal Kafka producer and consumer. Use this as shorthand if not setting `consumerConfig` and `producerConfig`. If used, this component will apply sensible default configurations for the producer and consumer. -| producerConfig | Sets the properties that will be used by the Kafka producer that broadcasts changes. Overrides `bootstrapServers`, so must define the Kafka `bootstrap.servers` property itself -| consumerConfig | Sets the properties that will be used by the Kafka consumer that populates the cache from the topic. Overrides `bootstrapServers`, so must define the Kafka `bootstrap.servers` property itself -| maxCacheSize | How many of the most recently used keys should be stored in memory (default 1000). -| pollDurationMs | The poll duration of the Kafka consumer. The local caches are updated immediately. This value will affect how far behind other peers that update their caches from the topic are relative to the idempotent consumer instance that sent the cache action message. The default value of this is 100 ms. + +| Property | Default | Description +| topic | | *Required* The name of the Kafka topic to use to broadcast changes. (required) +| bootstrapServers | | *Required* The `bootstrap.servers` property on the internal Kafka producer and consumer. Use this as shorthand if not setting `consumerConfig` and `producerConfig`. If used, this component will apply sensible default configurations for the producer and consumer. +| groupId | | The groupId to assign to the idempotent consumer. +| startupOnly | false | Whether to sync on startup only, or to continue syncing while Camel is running. +| maxCacheSize | 1000 | How many of the most recently used keys should be stored in memory (default 1000). +| pollDurationMs | 100 | The poll duration of the Kafka consumer. The local caches are updated immediately. This value will affect how far behind other peers that update their caches from the topic are relative to the idempotent consumer instance that sent the cache action message. The default value of this is 100 ms. If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's consumer and the Kafka brokers. The cache warmup process also depends on there being one poll that fetches nothing - this indicates that the stream has been consumed up to the current point. If the poll duration is excessively long for the rate at which messages are sent on the topic, there exists a possibility that the cache ca [...] -| groupId | The groupId to assign to the idempotent consumer. If not specified, it will be randomized. +| producerConfig | | Sets the properties that will be used by the Kafka producer that broadcasts changes. Overrides `bootstrapServers`, so must define the Kafka `bootstrap.servers` property itself +| consumerConfig | | Sets the properties that will be used by the Kafka consumer that populates the cache from the topic. Overrides `bootstrapServers`, so must define the Kafka `bootstrap.servers` property itself |=== The repository can be instantiated by defining the `topic` and `bootstrapServers`, or the `producerConfig` and `consumerConfig` property sets can be explicitly defined to enable features such as SSL/SASL. diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java index b5ce20da233..6a9e05db116 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java @@ -22,7 +22,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; @@ -33,10 +35,13 @@ import org.apache.camel.spi.Configurer; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.spi.Metadata; import org.apache.camel.support.LRUCacheFactory; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.StopWatch; import org.apache.camel.util.StringHelper; +import org.apache.camel.util.TimeUtils; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -63,8 +68,7 @@ import org.slf4j.LoggerFactory; * instance that uses the topic (e.g. typically on different machines running in parallel) controls its own consumer * group, so in a cluster of 10 Camel processes using the same topic each will control its own offset. On startup, the * instance consumes the full content of the topic, rebuilding the cache to the latest state. To use, this repository - * must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is - * CamelContext aware. + * must be placed in the Camel registry. */ @Metadata(label = "bean", description = "Idempotent repository that uses Kafka to store message ids. Uses a local cache of previously seen Message IDs." @@ -81,6 +85,9 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot private static final int DEFAULT_POLL_DURATION_MS = 100; private CamelContext camelContext; + private ExecutorService executorService; + private TopicPoller poller; + private final AtomicLong cacheCounter = new AtomicLong(); // internal properties private Map<String, Object> cache; private Consumer<String, String> consumer; @@ -88,7 +95,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot private Properties producerConfig; private Properties consumerConfig; - private String groupId; // not in use // configurable @Metadata(description = "Sets the name of the Kafka topic used by this idempotent repository." @@ -97,6 +103,9 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot private String topic; @Metadata(description = "The URL for the kafka brokers to use", required = true) private String bootstrapServers; + @Metadata(description = "A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the" + + " same group id, multiple processes can indicate that they are all part of the same consumer group.") + private String groupId; @Metadata(description = "Sets the maximum size of the local key cache.", defaultValue = "" + DEFAULT_MAXIMUM_CACHE_SIZE) private int maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE; @Metadata(description = "Sets the poll duration of the Kafka consumer. The local caches are updated immediately; this value will affect" @@ -110,6 +119,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot + " operate in an inconsistent state relative to its peers until it catches up.", defaultValue = "" + DEFAULT_POLL_DURATION_MS) private int pollDurationMs = DEFAULT_POLL_DURATION_MS; + @Metadata(description = "Whether to sync on startup only, or to continue syncing while Camel is running.") + private boolean startupOnly; enum CacheAction { add, @@ -124,10 +135,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS); } - /** - * @deprecated Use the constructor without groupId; the parameter groupId is ignored. - */ - @Deprecated public KafkaIdempotentRepository(String topic, String bootstrapServers, String groupId) { this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId); } @@ -143,9 +150,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS); } - /** - * @deprecated Use the constructor without groupId; the parameter groupId is ignored. - */ @Deprecated public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, String groupId) { this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, groupId); @@ -160,10 +164,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this.pollDurationMs = pollDurationMs; } - /** - * @deprecated Use the constructor without groupId; the parameter groupId is ignored. - */ - @Deprecated public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs, String groupId) { this.topic = topic; @@ -173,10 +173,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this.groupId = groupId; } - /** - * @deprecated Use the constructor without groupId; the parameter groupId is ignored. - */ - @Deprecated public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, int pollDurationMs, String groupId) { this.topic = topic; @@ -214,6 +210,17 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this.bootstrapServers = bootstrapServers; } + public boolean isStartupOnly() { + return startupOnly; + } + + /** + * Whether to sync on startup only, or to continue syncing while Camel is running. + */ + public void setStartupOnly(boolean startupOnly) { + this.startupOnly = startupOnly; + } + public Properties getProducerConfig() { return producerConfig; } @@ -225,7 +232,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot * <pre> * bootstrap.servers * </pre> - * + * <p> * property itself. Prefer using {@link #bootstrapServers} for default configuration unless you specifically need * non-standard configuration options such as SSL/SASL. * @@ -246,7 +253,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot * <pre> * bootstrap.servers * </pre> - * + * <p> * property itself. Prefer using {@link #bootstrapServers} for default configuration unless you specifically need * non-standard configuration options such as SSL/SASL. * @@ -290,21 +297,14 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot this.pollDurationMs = pollDurationMs; } - /** - * @deprecated The parameter groupId is ignored. - */ - @Deprecated public String getGroupId() { return groupId; } /** - * Sets the group id of the Kafka consumer. - * - * @param groupId The poll duration in milliseconds. - * @deprecated The parameter groupId is ignored. + * A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the + * same group id, multiple processes can indicate that they are all part of the same consumer group. */ - @Deprecated public void setGroupId(String groupId) { this.groupId = groupId; } @@ -330,6 +330,9 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot consumerConfig = new Properties(); StringHelper.notEmpty(bootstrapServers, "bootstrapServers"); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + if (groupId != null) { + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + } } if (producerConfig == null) { @@ -355,7 +358,34 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot producerConfig.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0"); producer = new KafkaProducer<>(producerConfig); - populateCache(); + poller = new TopicPoller(); + ServiceHelper.startService(poller); + // populate cache on startup to be ready + StopWatch watch = new StopWatch(); + LOG.info("Syncing KafkaIdempotentRepository from topic: {} starting", topic); + poller.run(); + LOG.info("Syncing KafkaIdempotentRepository from topic: {} complete: {}", topic, + TimeUtils.printDuration(watch.taken(), true)); + + if (!startupOnly) { + // continue sync job in background + executorService + = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "KafkaIdempotentRepositorySync"); + LOG.info("Syncing KafkaIdempotentRepository from topic: {} continuously using background thread", topic); + executorService.submit(poller); + } + } + + @Override + protected void doStop() throws Exception { + if (executorService != null && camelContext != null) { + camelContext.getExecutorServiceManager().shutdown(executorService); + executorService = null; + } + ServiceHelper.stopService(poller); + IOHelper.close(consumer, "consumer", LOG); + IOHelper.close(producer, "producer", LOG); + LOG.debug("Stopped KafkaIdempotentRepository. Cache counter: {}", cacheCounter.get()); } private void populateCache() { @@ -363,7 +393,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); Collection<TopicPartition> partitions = partitionInfos.stream() .map(pi -> new TopicPartition(pi.topic(), pi.partition())) - .collect(Collectors.toUnmodifiableList()); + .toList(); LOG.debug("Assigning consumer to partitions {}", partitions); consumer.assign(partitions); @@ -379,38 +409,60 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot addToCache(consumerRecord); } } + } + private class TopicPoller extends ServiceSupport implements Runnable { + + private final AtomicBoolean init = new AtomicBoolean(); + + @Override + public void run() { + if (init.compareAndSet(false, true)) { + // sync cache on startup + LOG.debug("TopicPoller populating cache on startup"); + populateCache(); + LOG.debug("TopicPoller populated cache on startup complete"); + return; + } + + LOG.debug("TopicPoller running"); + while (isRunAllowed()) { + try { + ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(pollDurationMs)); + for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { + addToCache(consumerRecord); + } + } catch (Exception e) { + LOG.warn("TopicPoller error syncing due to: " + e.getMessage() + ". This exception is ignored.", e); + } + } + LOG.debug("TopicPoller stopping"); + } } private void addToCache(ConsumerRecord<String, String> consumerRecord) { - CacheAction action = null; + cacheCounter.incrementAndGet(); + CacheAction action; try { action = CacheAction.valueOf(consumerRecord.value()); + String messageId = consumerRecord.key(); + if (action == CacheAction.add) { + LOG.debug("Adding to cache messageId:{}", messageId); + cache.put(messageId, messageId); + } else if (action == CacheAction.remove) { + LOG.debug("Removing from cache messageId:{}", messageId); + cache.remove(messageId); + } else if (action == CacheAction.clear) { + cache.clear(); + } else { + throw new IllegalArgumentException("Unknown action"); + } } catch (IllegalArgumentException iax) { - LOG.error( - "Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Shutting down.", + LOG.warn( + "Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Ignoring.", consumerRecord.key(), consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset()); } - String messageId = consumerRecord.key(); - if (action == CacheAction.add) { - LOG.debug("Adding to cache messageId:{}", messageId); - cache.put(messageId, messageId); - } else if (action == CacheAction.remove) { - LOG.debug("Removing from cache messageId:{}", messageId); - cache.remove(messageId); - } else if (action == CacheAction.clear) { - cache.clear(); - } else { - // this should never happen - throw new RuntimeException("Illegal action " + action + " for key " + consumerRecord.key()); - } - } - - @Override - protected void doStop() { - IOHelper.close(consumer, "consumer", LOG); - IOHelper.close(producer, "producer", LOG); } @Override @@ -444,7 +496,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot @Override @ManagedOperation(description = "Does the store contain the given key") public boolean contains(String key) { - LOG.debug("Checking cache for key:{}", key); return cache.containsKey(key); } @@ -468,4 +519,14 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot public void clear() { broadcastAction(null, CacheAction.clear); } + + @ManagedOperation(description = "Number of sync events received from the kafka topic") + public long getCacheCounter() { + return cacheCounter.get(); + } + + @ManagedOperation(description = "Number of elements currently in the cache") + public long getCacheSize() { + return cache != null ? cache.size() : 0; + } } diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_8.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_8.adoc new file mode 100644 index 00000000000..367e1bb595a --- /dev/null +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_8.adoc @@ -0,0 +1,13 @@ += Apache Camel 4.x Upgrade Guide + +This document is for helping you upgrade your Apache Camel application +from Camel 4.x to 4.y. For example, if you are upgrading Camel 4.0 to 4.2, then you should follow the guides +from both 4.0 to 4.1 and 4.1 to 4.2. + +== Upgrading Camel 4.7 to 4.8 + +=== camel-kafka + +The `KafkaIdempotentRepository` will now continue to sync cache updates after Camel has been started. +You can configure `startupOnly=true` to only sync the cache once on startup +(however then the cache is not synced with other Camel nodes in a cluster). diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc index f3eea1fec9b..8dfad84fd20 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide.adoc @@ -16,4 +16,5 @@ You can find upgrade guide for each release in the following pages: - xref:camel-4x-upgrade-guide-4_5.adoc[Upgrade guide 4.4 -> 4.5] - xref:camel-4x-upgrade-guide-4_6.adoc[Upgrade guide 4.5 -> 4.6] - xref:camel-4x-upgrade-guide-4_7.adoc[Upgrade guide 4.6 -> 4.7] +- xref:camel-4x-upgrade-guide-4_8.adoc[Upgrade guide 4.7 -> 4.8]