This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new f7932e4ec60 CAMEL-18148: cleanup the offset update interfaces f7932e4ec60 is described below commit f7932e4ec604febdb393577cc5a6f9e5fd77c820 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Mon Oct 10 13:45:28 2022 +0200 CAMEL-18148: cleanup the offset update interfaces This should simplify further work to improve serialization and reliability --- .../resume/kafka/KafkaResumeStrategy.java | 6 +--- .../kafka/SingleNodeKafkaResumeStrategy.java | 4 +-- .../KafkaConsumerWithResumeRouteStrategyIT.java | 4 +-- .../org/apache/camel/resume/ResumeStrategy.java | 8 +++++ .../resume/UpdatableConsumerResumeStrategy.java | 34 ---------------------- .../docs/modules/eips/pages/resume-strategies.adoc | 2 +- .../processor/resume/ResumableCompletion.java | 16 ++++------ .../processor/resume/TransientResumeStrategy.java | 6 ++++ .../FileConsumerResumeFromOffsetStrategyTest.java | 12 ++------ 9 files changed, 26 insertions(+), 66 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java index 9951e09303b..925ed2b2ba6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java @@ -17,15 +17,11 @@ package org.apache.camel.processor.resume.kafka; -import org.apache.camel.resume.Resumable; import org.apache.camel.resume.ResumeStrategy; -import org.apache.camel.resume.UpdatableConsumerResumeStrategy; /** * Base interface for resume strategies that publish the offsets to a Kafka topic - * - * @param <T> the type of resumable */ -public interface KafkaResumeStrategy<T extends Resumable> extends UpdatableConsumerResumeStrategy<T>, ResumeStrategy { +public interface KafkaResumeStrategy extends ResumeStrategy { } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java index 9a60cce760f..8689eeed96e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java @@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory; * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node * integrations. */ -public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy<T> { +public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy { private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class); private Consumer<byte[], byte[]> consumer; @@ -124,7 +124,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka } @Override - public void updateLastOffset(T offset) throws Exception { + public <T extends Resumable> void updateLastOffset(T offset) throws Exception { OffsetKey<?> key = offset.getOffsetKey(); Offset<?> offsetValue = offset.getLastOffset(); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java index 0f7ab577cbf..c6d903cd546 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java @@ -34,7 +34,6 @@ import org.apache.camel.resume.Offset; import org.apache.camel.resume.OffsetKey; import org.apache.camel.resume.Resumable; import org.apache.camel.resume.ResumeAdapter; -import org.apache.camel.resume.UpdatableConsumerResumeStrategy; import org.apache.camel.support.resume.Resumables; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -62,8 +61,7 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes private TestUpdateStrategy resumeStrategy; private CountDownLatch messagesLatch; - private static class TestUpdateStrategy extends TransientResumeStrategy - implements UpdatableConsumerResumeStrategy<Resumable> { + private static class TestUpdateStrategy extends TransientResumeStrategy { private final CountDownLatch messagesLatch; private boolean startCalled; private boolean offsetNull = true; diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java index 26039f712de..a9325b829b4 100644 --- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java @@ -57,4 +57,12 @@ public interface ResumeStrategy extends Service { default void loadCache() throws Exception { } + + /** + * Updates the last processed offset + * + * @param offset the offset to update + * @throws Exception if unable to update the offset + */ + <T extends Resumable> void updateLastOffset(T offset) throws Exception; } diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java deleted file mode 100644 index 52204789bc0..00000000000 --- a/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.resume; - -/** - * An updatable resume strategy - * - * @param <T> the type of the addressable value for the resumable object (for example, a file would use a Long value) - */ -public interface UpdatableConsumerResumeStrategy<T extends Resumable> { - - /** - * Updates the last processed offset - * - * @param offset the offset to update - * @throws Exception if unable to update the offset - */ - void updateLastOffset(T offset) throws Exception; -} diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc index e45a0f898d5..8328d99931c 100644 --- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc +++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc @@ -93,7 +93,7 @@ available for each strategy. For instance, to configure either one of the Kafka === Implementing New Builtin Resume Strategies -New builtin resume strategies can be created by implementing the `UpdatableConsumerResumeStrategy` and the `ResumeStrategy` interfaces. Check the code for `SingleNodeKafkaResumeStrategy` for implementation details. +New builtin resume strategies can be created by implementing the `ResumeStrategy` interface. Check the code for `SingleNodeKafkaResumeStrategy` for implementation details. == Local Cache Support diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java index dfb75ee5b8f..35035215b59 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java @@ -21,7 +21,6 @@ import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; import org.apache.camel.resume.Resumable; import org.apache.camel.resume.ResumeStrategy; -import org.apache.camel.resume.UpdatableConsumerResumeStrategy; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.ExchangeHelper; @@ -57,17 +56,12 @@ public class ResumableCompletion implements Synchronization { LOG.trace("Processing the resumable of type: {}", resumable.getLastOffset().getValue()); } - if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) { - UpdatableConsumerResumeStrategy updatableConsumerResumeStrategy - = (UpdatableConsumerResumeStrategy) resumeStrategy; - try { - updatableConsumerResumeStrategy.updateLastOffset(resumable); - } catch (Exception e) { - LOG.error("Unable to update the offset: {}", e.getMessage(), e); - } - } else { - LOG.debug("Cannot perform an offset update because the strategy is not updatable"); + try { + resumeStrategy.updateLastOffset(resumable); + } catch (Exception e) { + LOG.error("Unable to update the offset: {}", e.getMessage(), e); } + } else { if (!intermittent) { exchange.setException(new NoOffsetException(exchange)); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java index 205612b17e9..0e926152536 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java @@ -17,6 +17,7 @@ package org.apache.camel.processor.resume; +import org.apache.camel.resume.Resumable; import org.apache.camel.resume.ResumeAdapter; import org.apache.camel.resume.ResumeStrategy; @@ -46,6 +47,11 @@ public class TransientResumeStrategy implements ResumeStrategy { return ResumeStrategy.super.getAdapter(clazz); } + @Override + public <T extends Resumable> void updateLastOffset(T offset) { + // this is NO-OP + } + @Override public void start() { // this is NO-OP diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java index e4828ec3b73..3eee355cbe5 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java @@ -32,8 +32,6 @@ import org.apache.camel.component.file.consumer.FileResumeAdapter; import org.apache.camel.component.file.consumer.adapters.DirectoryEntries; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.resume.TransientResumeStrategy; -import org.apache.camel.resume.Resumable; -import org.apache.camel.resume.UpdatableConsumerResumeStrategy; import org.apache.camel.support.resume.Resumables; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; @@ -75,8 +73,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport } private static class FailResumeAdapter - implements FileResumeAdapter, DirectoryEntriesResumeAdapter, UpdatableConsumerResumeStrategy<Resumable> { - private boolean called; + implements FileResumeAdapter, DirectoryEntriesResumeAdapter { @Override public void resume() { @@ -88,10 +85,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport DirectoryEntries.doResume(fileSet, f -> true); } - @Override - public void updateLastOffset(Resumable offset) { - called = true; - } + } private static final TransientResumeStrategy FAIL_RESUME_STRATEGY = new TransientResumeStrategy(new FailResumeAdapter()); @@ -125,7 +119,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport List<Exchange> exchangeList = mock.getExchanges(); Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages"); - Assertions.assertFalse(((FailResumeAdapter) FAIL_RESUME_STRATEGY.getAdapter()).called); } @DisplayName("Tests whether it a missing offset does not cause a failure when using intermittent mode") @@ -141,7 +134,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport List<Exchange> exchangeList = mock.getExchanges(); Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages"); - Assertions.assertFalse(((FailResumeAdapter) FAIL_RESUME_STRATEGY.getAdapter()).called); } @DisplayName("Tests whether we can start from the beginning (i.e.: no resume strategy)")