davsclaus commented on a change in pull request #6947: URL: https://github.com/apache/camel/pull/6947#discussion_r816473494
########## File path: core/camel-api/src/main/java/org/apache/camel/ResumeAware.java ########## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel; + +public interface ResumeAware<T extends ResumeStrategy> { Review comment: We should add javadoc to all our classes in camel-api ########## File path: core/camel-api/src/main/java/org/apache/camel/ResumeAware.java ########## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel; + +public interface ResumeAware<T extends ResumeStrategy> { Review comment: xxxAware interfaces in Camel also have the getter ########## File path: core/camel-api/src/main/java/org/apache/camel/Route.java ########## @@ -373,4 +373,9 @@ */ void addErrorHandlerFactoryReference(ErrorHandlerFactory source, ErrorHandlerFactory target); + /** + * Adds a resume strategy to the route + */ + void setResumeStrategy(ResumeStrategy resumeStrategy); Review comment: add or set ? ########## File path: core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java ########## @@ -21,12 +21,19 @@ * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume * processing records. */ -public interface ResumeStrategy<T> { +public interface ResumeStrategy { /** * A consumer, iterator or value class that can be used to set the index position from which to resume from. The * type is specific to the component. * */ void resume(); + + /** + * Starts the resume strategy + * + * @throws Exception + */ + void start() throws Exception; Review comment: Its IMHO better to extend Service that comes with lifecycle apis start/stop etc and use that, instead of your own "start" apis ########## File path: core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc ########## @@ -0,0 +1,179 @@ += Resume Strategies +:doctitle: Resume Strategies +:shortname: resume +:description: Provide strategies to allow consuming data from specific offsets +:since: +:supportlevel: Experimental + +The resume strategies allow users to implement strategies that point the consumer part of the routes to the last point of consumption. This allows Camel to skip reading and processing data that has already been consumed. + +The resume strategies can be used to allow quicker stop and resume operations when consuming large data sources. For instance, imagine a scenario where the file consumer is reading a large file. Without a resume strategy, stopping and starting Camel would cause the consumer in the File component to read all the bytes of the given file at the initial offset (offset 0). The resume strategy allow integrations can point the consumer to the exact offset to resume the operations. + +The resume strategies comes in 3 parts: + +* A DSL method that marks the route as supporting resume operations and points to an instance of a strategy implementation. +* A set of core infrastructure that allow integrations to implement different types of strategies +* Basic strategies implementations that can be extended to implement the specific resume strategies required by the integrations + +=== The DSL method + +The route needs to use the `resumable()` method followed by a `resumableStrategyRef` to point to an instance of the resume strategy in use. This instance needs to be bound in the Context registry. + +[source,java] +---- +getCamelContext().getRegistry().bind("testResumeStrategy", testResumeStrategy); + +from("some:component") + .process(this::process) + .resumable().resumableStrategyRef("testResumeStrategy"); Review comment: resumable should be first, before the process ########## File path: core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc ########## @@ -0,0 +1,179 @@ += Resume Strategies Review comment: Is this an EIP? I think this doc should be moved to docs/ where we have the general docs. This dir is only for EIPs ########## File path: core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.model; + +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlTransient; + +import org.apache.camel.ResumeStrategy; + +public class ResumableDefinition extends OutputExpressionNode { + @XmlAttribute(required = true) + private String resumeStrategyRef; Review comment: Rename to `resumeStrategy` ########## File path: core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/NoOffsetException.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.processor.resume; + +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeCamelException; + +/** + * An exception thrown if no camel offset header could be found on the message. + */ +public class NoOffsetException extends RuntimeCamelException { + + private final Exchange exchange; + + public NoOffsetException(Exchange exchange) { + super("There were not " + Exchange.OFFSET + " header defined on the message exchange: " + exchange); Review comment: Not -> no ########## File path: core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.processor.resume; + +import org.apache.camel.Exchange; +import org.apache.camel.Resumable; +import org.apache.camel.ResumeStrategy; +import org.apache.camel.UpdatableConsumerResumeStrategy; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.ExchangeHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResumableCompletion implements Synchronization { + private static final Logger LOG = LoggerFactory.getLogger(ResumableCompletion.class); + + private final ResumeStrategy resumeStrategy; + private final Resumable<?, ?> resumable; + + public ResumableCompletion(ResumeStrategy resumeStrategy, Resumable<?, ?> resumable) { + this.resumeStrategy = resumeStrategy; + this.resumable = resumable; + } + + @Override + public void onComplete(Exchange exchange) { + if (!ExchangeHelper.isFailureHandled(exchange)) { + if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) { + UpdatableConsumerResumeStrategy updatableConsumerResumeStrategy + = (UpdatableConsumerResumeStrategy) resumeStrategy; + try { + updatableConsumerResumeStrategy.updateLastOffset(resumable); + } catch (Exception e) { + LOG.error("Unable to update the offset: {}", e.getMessage(), e); + } + } else { + LOG.warn("Cannot perform an offset update because the strategy is not updatable"); + } + } + } + + @Override + public void onFailure(Exchange exchange) { + LOG.warn("Skipping offset update for {} due to failure in processing", resumable.getAddressable()); Review comment: I wonder if this gets very noisy as failing an exchange can be okay ########## File path: core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.processor.resume; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.Navigate; +import org.apache.camel.Processor; +import org.apache.camel.Resumable; +import org.apache.camel.ResumeStrategy; +import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.AsyncProcessorConverterHelper; +import org.apache.camel.support.AsyncProcessorSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResumableProcessor extends AsyncProcessorSupport + implements Navigate<Processor>, CamelContextAware, IdAware, RouteIdAware { + private static final Logger LOG = LoggerFactory.getLogger(ResumableProcessor.class); + private CamelContext camelContext; + private ResumeStrategy resumeStrategy; + private AsyncProcessor processor; + private String id; + private String routeId; + + private static class ResumableProcessorCallback implements AsyncCallback { + + private final Exchange exchange; + private final Synchronization completion; + private final AsyncCallback callback; + + public ResumableProcessorCallback(Exchange exchange, Synchronization completion, AsyncCallback callback) { + this.exchange = exchange; + this.completion = completion; + this.callback = callback; + } + + @Override + public void done(boolean doneSync) { + try { + if (exchange.isFailed()) { + completion.onFailure(exchange); + } else { + completion.onComplete(exchange); + } + } finally { + callback.done(doneSync); + } + } + } + + public ResumableProcessor(ResumeStrategy resumeStrategy, Processor processor) { + this.resumeStrategy = Objects.requireNonNull(resumeStrategy); + this.processor = AsyncProcessorConverterHelper.convert(processor); + + LOG.info("Enabling the resumable strategy of type: {}", resumeStrategy.getClass().getSimpleName()); Review comment: Better to do this logging in doInit or doStart etc ########## File path: core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc ########## @@ -0,0 +1,179 @@ += Resume Strategies +:doctitle: Resume Strategies +:shortname: resume +:description: Provide strategies to allow consuming data from specific offsets +:since: +:supportlevel: Experimental + +The resume strategies allow users to implement strategies that point the consumer part of the routes to the last point of consumption. This allows Camel to skip reading and processing data that has already been consumed. + +The resume strategies can be used to allow quicker stop and resume operations when consuming large data sources. For instance, imagine a scenario where the file consumer is reading a large file. Without a resume strategy, stopping and starting Camel would cause the consumer in the File component to read all the bytes of the given file at the initial offset (offset 0). The resume strategy allow integrations can point the consumer to the exact offset to resume the operations. + +The resume strategies comes in 3 parts: + +* A DSL method that marks the route as supporting resume operations and points to an instance of a strategy implementation. +* A set of core infrastructure that allow integrations to implement different types of strategies +* Basic strategies implementations that can be extended to implement the specific resume strategies required by the integrations + +=== The DSL method + +The route needs to use the `resumable()` method followed by a `resumableStrategyRef` to point to an instance of the resume strategy in use. This instance needs to be bound in the Context registry. Review comment: Why does it need to be bound, in Java DSL you should IMHO also be able to use the instance object also, resumableStrategy(new MyStrategy()) ########## File path: core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.model; + +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlTransient; + +import org.apache.camel.ResumeStrategy; + +public class ResumableDefinition extends OutputExpressionNode { + @XmlAttribute(required = true) + private String resumeStrategyRef; + + @XmlTransient + private ResumeStrategy resumeStrategy; + + @Override + public String getShortName() { + return "resumable"; + } + + public String getResumeStrategyRef() { + return resumeStrategyRef; + } + + public void setResumeStrategyRef(String resumeStrategyRef) { + this.resumeStrategyRef = resumeStrategyRef; + } + + public ResumeStrategy getResumeStrategy() { + return resumeStrategy; + } + + public void setResumeStrategy(ResumeStrategy resumeStrategy) { + this.resumeStrategy = resumeStrategy; + } + + // Fluent API + // ------------------------------------------------------------------------- + public ResumableDefinition resumableStrategyRef(String resumeStrategyRef) { Review comment: And add fluent method to set the strategy instance directly ########## File path: core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.processor.resume; + +import org.apache.camel.Exchange; +import org.apache.camel.Resumable; +import org.apache.camel.ResumeStrategy; +import org.apache.camel.UpdatableConsumerResumeStrategy; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.ExchangeHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResumableCompletion implements Synchronization { + private static final Logger LOG = LoggerFactory.getLogger(ResumableCompletion.class); + + private final ResumeStrategy resumeStrategy; + private final Resumable<?, ?> resumable; + + public ResumableCompletion(ResumeStrategy resumeStrategy, Resumable<?, ?> resumable) { + this.resumeStrategy = resumeStrategy; + this.resumable = resumable; + } + + @Override + public void onComplete(Exchange exchange) { + if (!ExchangeHelper.isFailureHandled(exchange)) { + if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) { + UpdatableConsumerResumeStrategy updatableConsumerResumeStrategy + = (UpdatableConsumerResumeStrategy) resumeStrategy; + try { + updatableConsumerResumeStrategy.updateLastOffset(resumable); + } catch (Exception e) { + LOG.error("Unable to update the offset: {}", e.getMessage(), e); + } + } else { + LOG.warn("Cannot perform an offset update because the strategy is not updatable"); Review comment: This can be noisy log DEBUG instead. Instead you should when starting up check if the strategy is updatable and then LOG only once about its limitation ########## File path: core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc ########## @@ -0,0 +1,179 @@ += Resume Strategies +:doctitle: Resume Strategies +:shortname: resume +:description: Provide strategies to allow consuming data from specific offsets +:since: +:supportlevel: Experimental + +The resume strategies allow users to implement strategies that point the consumer part of the routes to the last point of consumption. This allows Camel to skip reading and processing data that has already been consumed. + +The resume strategies can be used to allow quicker stop and resume operations when consuming large data sources. For instance, imagine a scenario where the file consumer is reading a large file. Without a resume strategy, stopping and starting Camel would cause the consumer in the File component to read all the bytes of the given file at the initial offset (offset 0). The resume strategy allow integrations can point the consumer to the exact offset to resume the operations. + +The resume strategies comes in 3 parts: + +* A DSL method that marks the route as supporting resume operations and points to an instance of a strategy implementation. +* A set of core infrastructure that allow integrations to implement different types of strategies +* Basic strategies implementations that can be extended to implement the specific resume strategies required by the integrations + +=== The DSL method + +The route needs to use the `resumable()` method followed by a `resumableStrategyRef` to point to an instance of the resume strategy in use. This instance needs to be bound in the Context registry. Review comment: xxxRef is old style, we are adjusting the DSL to be just xxx, so we should rename this ########## File path: core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.model; + +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlTransient; + +import org.apache.camel.ResumeStrategy; + +public class ResumableDefinition extends OutputExpressionNode { + @XmlAttribute(required = true) + private String resumeStrategyRef; + + @XmlTransient + private ResumeStrategy resumeStrategy; + + @Override + public String getShortName() { + return "resumable"; + } + + public String getResumeStrategyRef() { + return resumeStrategyRef; + } + + public void setResumeStrategyRef(String resumeStrategyRef) { + this.resumeStrategyRef = resumeStrategyRef; + } + + public ResumeStrategy getResumeStrategy() { + return resumeStrategy; + } + + public void setResumeStrategy(ResumeStrategy resumeStrategy) { + this.resumeStrategy = resumeStrategy; + } + + // Fluent API + // ------------------------------------------------------------------------- + public ResumableDefinition resumableStrategyRef(String resumeStrategyRef) { Review comment: resumableStrategy ########## File path: core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc ########## @@ -0,0 +1,179 @@ += Resume Strategies +:doctitle: Resume Strategies +:shortname: resume +:description: Provide strategies to allow consuming data from specific offsets +:since: +:supportlevel: Experimental + +The resume strategies allow users to implement strategies that point the consumer part of the routes to the last point of consumption. This allows Camel to skip reading and processing data that has already been consumed. + +The resume strategies can be used to allow quicker stop and resume operations when consuming large data sources. For instance, imagine a scenario where the file consumer is reading a large file. Without a resume strategy, stopping and starting Camel would cause the consumer in the File component to read all the bytes of the given file at the initial offset (offset 0). The resume strategy allow integrations can point the consumer to the exact offset to resume the operations. + +The resume strategies comes in 3 parts: + +* A DSL method that marks the route as supporting resume operations and points to an instance of a strategy implementation. +* A set of core infrastructure that allow integrations to implement different types of strategies +* Basic strategies implementations that can be extended to implement the specific resume strategies required by the integrations + +=== The DSL method + +The route needs to use the `resumable()` method followed by a `resumableStrategyRef` to point to an instance of the resume strategy in use. This instance needs to be bound in the Context registry. + +[source,java] +---- +getCamelContext().getRegistry().bind("testResumeStrategy", testResumeStrategy); + +from("some:component") + .process(this::process) + .resumable().resumableStrategyRef("testResumeStrategy"); +---- + + +=== The Core Interfaces + +These are the *core interfaces*: + +* org.apache.camel.ResumeStrategy: the basic resume strategy +* org.apache.camel.UpdatableConsumerResumeStrategy: an extension to the resume strategy to allow updatable strategies +* org.apache.camel.ResumeCache: an interface for local cache for resumable information + +These are the *core classes* supporting the strategies: + +* org.apache.camel.Resumable: an interface to allow users to work with abstract resumable entities (files, offsets, etc) +* org.apache.camel.ResumableSet: an interface for resumables with a 1-to-many relationship +* org.apache.camel.Offset: a generic offset without a concrete type (it may represent a long, a file name, etc) + + +These are the *supporting classes*: + +* org.apache.camel.support.Resumables: resumables handling support +* org.apache.camel.support.Offsets: offset handling support + + +=== Basic Strategies + +The basic strategies offer a component-specific skeleton that can be used to implement strategies. + +* AbstractKafkaResumeStrategy: a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets. + + +[source,java] +---- +public class KafkaResumeStrategy<K> extends AbstractKafkaResumeStrategy<K, Long> implements GenericFileResumeStrategy<File> { +private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeStrategy.class); +public static final int CACHE_SIZE = 100; + + private final String topic; + private final ResumeCache<K, Long> cache; + + public KafkaResumeStrategy(String topic, + ResumeCache<K, Long> cache, + DefaultProducerPropertyFactory producerPropertyFactory, + DefaultConsumerPropertyFactory consumerPropertyFactory) + { + super(topic, cache, producerPropertyFactory.getProperties(), consumerPropertyFactory.getProperties()); + this.topic = topic; + this.cache = cache; + } + + + private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) { + final File addressable = resumable.getAddressable(); + return getLastOffset((K) addressable); + } + + public Optional<Long> getLastOffset(K addressable) { + return cache.get(addressable); + } + + @Override + public void subscribe() { + checkAndSubscribe(topic, 1); + } + + @Override + public void resume(GenericFileResumable<File> resumable) { + final Optional<Long> lastOffsetOpt = getLastOffset(resumable); + + if (!lastOffsetOpt.isPresent()) { + return; + } + + final long lastOffset = lastOffsetOpt.get(); + resumable.updateLastOffset(lastOffset); + } + + @Override + public void resume() { + throw new UnsupportedOperationException("Cannot perform blind resume"); + } +} +---- + + +=== Local Cache Support + +A sample local cache implemented using https://github.com/ben-manes/caffeine[Caffeine]. + +[source,java] +---- +public class SingleItemCache<K> implements ResumeCache<K, Long> { + public static final int CACHE_SIZE = 100; + private final Cache<K, Long> cache = Caffeine.newBuilder() + .maximumSize(CACHE_SIZE) + .build(); + + @Override + public void add(K key, Long offsetValue) { + cache.put(key, offsetValue); + } + + @Override + public Optional<Long> get(K key) { + Long entry = cache.getIfPresent(key); + + if (entry == null) { + return Optional.empty(); + } + + return Optional.of(entry.longValue()); + } + + @Override + public boolean isFull() { + if (cache.estimatedSize() < CACHE_SIZE) { + return true; + } + + return false; + } +} +---- + + + +=== Known Limitations, Bugs and Other Notes + +* When using the converters with the file component, beware of the differences in the behavior from `Reader` and `InputStream`: + +For instance, the behavior of: + +[source,java] +---- +from("file:{{input.dir}}?noop=true&fileName={{input.file}}") + .convertBodyTo(Reader.class) + .process(this::process) + .resumable().resumableStrategyRef("testResumeStrategy"); Review comment: Move up ########## File path: core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.reifier; + +import org.apache.camel.Processor; +import org.apache.camel.ResumeStrategy; +import org.apache.camel.Route; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.ResumableDefinition; +import org.apache.camel.processor.resume.ResumableProcessor; +import org.apache.camel.util.ObjectHelper; + +public class ResumableReifier extends ExpressionReifier<ResumableDefinition> { + protected ResumableReifier(Route route, ProcessorDefinition<?> definition) { + super(route, ResumableDefinition.class.cast(definition)); + + } + + @Override + public Processor createProcessor() throws Exception { + Processor childProcessor = createChildProcessor(false); + + ResumeStrategy resumeStrategy = resolveResumeStrategy(); + ObjectHelper.notNull(resumeStrategy, "resumeStrategy", definition); + resumeStrategy.start(); Review comment: Do not start in create, as reifier should ideally only create, and start is done later when Camel starts the routes ########## File path: core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.model; + +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlTransient; + +import org.apache.camel.ResumeStrategy; + +public class ResumableDefinition extends OutputExpressionNode { Review comment: Should be NoOutput, and why do you need an expression? ########## File path: core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.reifier; + +import org.apache.camel.Processor; +import org.apache.camel.ResumeStrategy; +import org.apache.camel.Route; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.ResumableDefinition; +import org.apache.camel.processor.resume.ResumableProcessor; +import org.apache.camel.util.ObjectHelper; + +public class ResumableReifier extends ExpressionReifier<ResumableDefinition> { + protected ResumableReifier(Route route, ProcessorDefinition<?> definition) { + super(route, ResumableDefinition.class.cast(definition)); + + } + + @Override + public Processor createProcessor() throws Exception { + Processor childProcessor = createChildProcessor(false); + + ResumeStrategy resumeStrategy = resolveResumeStrategy(); + ObjectHelper.notNull(resumeStrategy, "resumeStrategy", definition); + resumeStrategy.start(); Review comment: And let ResumableProcessor handle the lifecycle where it can start the strategy from its doInit / doStart methods ########## File path: core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc ########## @@ -0,0 +1,179 @@ += Resume Strategies +:doctitle: Resume Strategies +:shortname: resume +:description: Provide strategies to allow consuming data from specific offsets +:since: +:supportlevel: Experimental + +The resume strategies allow users to implement strategies that point the consumer part of the routes to the last point of consumption. This allows Camel to skip reading and processing data that has already been consumed. + +The resume strategies can be used to allow quicker stop and resume operations when consuming large data sources. For instance, imagine a scenario where the file consumer is reading a large file. Without a resume strategy, stopping and starting Camel would cause the consumer in the File component to read all the bytes of the given file at the initial offset (offset 0). The resume strategy allow integrations can point the consumer to the exact offset to resume the operations. + +The resume strategies comes in 3 parts: + +* A DSL method that marks the route as supporting resume operations and points to an instance of a strategy implementation. +* A set of core infrastructure that allow integrations to implement different types of strategies +* Basic strategies implementations that can be extended to implement the specific resume strategies required by the integrations + +=== The DSL method + +The route needs to use the `resumable()` method followed by a `resumableStrategyRef` to point to an instance of the resume strategy in use. This instance needs to be bound in the Context registry. + +[source,java] +---- +getCamelContext().getRegistry().bind("testResumeStrategy", testResumeStrategy); + +from("some:component") + .process(this::process) + .resumable().resumableStrategyRef("testResumeStrategy"); +---- + + +=== The Core Interfaces + +These are the *core interfaces*: + +* org.apache.camel.ResumeStrategy: the basic resume strategy +* org.apache.camel.UpdatableConsumerResumeStrategy: an extension to the resume strategy to allow updatable strategies +* org.apache.camel.ResumeCache: an interface for local cache for resumable information + +These are the *core classes* supporting the strategies: + +* org.apache.camel.Resumable: an interface to allow users to work with abstract resumable entities (files, offsets, etc) +* org.apache.camel.ResumableSet: an interface for resumables with a 1-to-many relationship +* org.apache.camel.Offset: a generic offset without a concrete type (it may represent a long, a file name, etc) + + +These are the *supporting classes*: + +* org.apache.camel.support.Resumables: resumables handling support +* org.apache.camel.support.Offsets: offset handling support + + +=== Basic Strategies + +The basic strategies offer a component-specific skeleton that can be used to implement strategies. + +* AbstractKafkaResumeStrategy: a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets. + + +[source,java] +---- +public class KafkaResumeStrategy<K> extends AbstractKafkaResumeStrategy<K, Long> implements GenericFileResumeStrategy<File> { +private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeStrategy.class); +public static final int CACHE_SIZE = 100; + + private final String topic; + private final ResumeCache<K, Long> cache; + + public KafkaResumeStrategy(String topic, + ResumeCache<K, Long> cache, + DefaultProducerPropertyFactory producerPropertyFactory, + DefaultConsumerPropertyFactory consumerPropertyFactory) + { + super(topic, cache, producerPropertyFactory.getProperties(), consumerPropertyFactory.getProperties()); + this.topic = topic; + this.cache = cache; + } + + + private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) { + final File addressable = resumable.getAddressable(); + return getLastOffset((K) addressable); + } + + public Optional<Long> getLastOffset(K addressable) { + return cache.get(addressable); + } + + @Override + public void subscribe() { + checkAndSubscribe(topic, 1); + } + + @Override + public void resume(GenericFileResumable<File> resumable) { + final Optional<Long> lastOffsetOpt = getLastOffset(resumable); + + if (!lastOffsetOpt.isPresent()) { + return; + } + + final long lastOffset = lastOffsetOpt.get(); + resumable.updateLastOffset(lastOffset); + } + + @Override + public void resume() { + throw new UnsupportedOperationException("Cannot perform blind resume"); + } +} +---- + + +=== Local Cache Support + +A sample local cache implemented using https://github.com/ben-manes/caffeine[Caffeine]. + +[source,java] +---- +public class SingleItemCache<K> implements ResumeCache<K, Long> { + public static final int CACHE_SIZE = 100; + private final Cache<K, Long> cache = Caffeine.newBuilder() + .maximumSize(CACHE_SIZE) + .build(); + + @Override + public void add(K key, Long offsetValue) { + cache.put(key, offsetValue); + } + + @Override + public Optional<Long> get(K key) { + Long entry = cache.getIfPresent(key); + + if (entry == null) { + return Optional.empty(); + } + + return Optional.of(entry.longValue()); + } + + @Override + public boolean isFull() { + if (cache.estimatedSize() < CACHE_SIZE) { + return true; + } + + return false; + } +} +---- + + + +=== Known Limitations, Bugs and Other Notes + +* When using the converters with the file component, beware of the differences in the behavior from `Reader` and `InputStream`: + +For instance, the behavior of: + +[source,java] +---- +from("file:{{input.dir}}?noop=true&fileName={{input.file}}") + .convertBodyTo(Reader.class) + .process(this::process) + .resumable().resumableStrategyRef("testResumeStrategy"); +---- + +Is different from the behavior of: + +[source,java] +---- +from("file:{{input.dir}}?noop=true&fileName={{input.file}}") + .convertBodyTo(InputStream.class) + .process(this::process) + .resumable().resumableStrategyRef("testResumeStrategy"); Review comment: Move up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org