davsclaus commented on a change in pull request #6947:
URL: https://github.com/apache/camel/pull/6947#discussion_r816473494



##########
File path: core/camel-api/src/main/java/org/apache/camel/ResumeAware.java
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel;
+
+public interface ResumeAware<T extends ResumeStrategy> {

Review comment:
       We should add javadoc to all our classes in camel-api

##########
File path: core/camel-api/src/main/java/org/apache/camel/ResumeAware.java
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel;
+
+public interface ResumeAware<T extends ResumeStrategy> {

Review comment:
       xxxAware interfaces in Camel also have the getter

##########
File path: core/camel-api/src/main/java/org/apache/camel/Route.java
##########
@@ -373,4 +373,9 @@
      */
     void addErrorHandlerFactoryReference(ErrorHandlerFactory source, 
ErrorHandlerFactory target);
 
+    /**
+     * Adds a resume strategy to the route
+     */
+    void setResumeStrategy(ResumeStrategy resumeStrategy);

Review comment:
       add or set ?

##########
File path: core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
##########
@@ -21,12 +21,19 @@
  * Defines a strategy for handling resume operations. Implementations can 
define different ways to handle how to resume
  * processing records.
  */
-public interface ResumeStrategy<T> {
+public interface ResumeStrategy {
 
     /**
      * A consumer, iterator or value class that can be used to set the index 
position from which to resume from. The
      * type is specific to the component.
      *
      */
     void resume();
+
+    /**
+     * Starts the resume strategy
+     * 
+     * @throws Exception
+     */
+    void start() throws Exception;

Review comment:
       Its IMHO better to extend Service that comes with lifecycle apis 
start/stop etc and use that, instead of your own "start" apis

##########
File path: 
core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
##########
@@ -0,0 +1,179 @@
+= Resume Strategies
+:doctitle: Resume Strategies
+:shortname: resume
+:description: Provide strategies to allow consuming data from specific offsets
+:since:
+:supportlevel: Experimental
+
+The resume strategies allow users to implement strategies that point the 
consumer part of the routes to the last point of consumption. This allows Camel 
to skip reading and processing data that has already been consumed.
+
+The resume strategies can be used to allow quicker stop and resume operations 
when consuming large data sources. For instance, imagine a scenario where the 
file consumer is reading a large file. Without a resume strategy, stopping and 
starting Camel would cause the consumer in the File component to read all the 
bytes of the given file at the initial offset (offset 0). The resume strategy 
allow integrations can point the consumer to the exact offset to resume the 
operations.
+
+The resume strategies comes in 3 parts:
+
+* A DSL method that marks the route as supporting resume operations and points 
to an instance of a strategy implementation.
+* A set of core infrastructure that allow integrations to implement different 
types of strategies
+* Basic strategies implementations that can be extended to implement the 
specific resume strategies required by the integrations
+
+=== The DSL method
+
+The route needs to use the `resumable()` method followed by a 
`resumableStrategyRef` to point to an instance of the resume strategy in use. 
This instance needs to be bound in the Context registry.
+
+[source,java]
+----
+getCamelContext().getRegistry().bind("testResumeStrategy", testResumeStrategy);
+
+from("some:component")
+    .process(this::process)
+    .resumable().resumableStrategyRef("testResumeStrategy");

Review comment:
       resumable should be first, before the process

##########
File path: 
core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
##########
@@ -0,0 +1,179 @@
+= Resume Strategies

Review comment:
       Is this an EIP? I think this doc should be moved to docs/ where we have 
the general docs. This dir is only for EIPs

##########
File path: 
core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.ResumeStrategy;
+
+public class ResumableDefinition extends OutputExpressionNode {
+    @XmlAttribute(required = true)
+    private String resumeStrategyRef;

Review comment:
       Rename to `resumeStrategy`

##########
File path: 
core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/NoOffsetException.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+
+/**
+ * An exception thrown if no camel offset header could be found on the message.
+ */
+public class NoOffsetException extends RuntimeCamelException {
+
+    private final Exchange exchange;
+
+    public NoOffsetException(Exchange exchange) {
+        super("There were not " + Exchange.OFFSET + " header defined on the 
message exchange: " + exchange);

Review comment:
       Not -> no

##########
File path: 
core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Resumable;
+import org.apache.camel.ResumeStrategy;
+import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ExchangeHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResumableCompletion implements Synchronization {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResumableCompletion.class);
+
+    private final ResumeStrategy resumeStrategy;
+    private final Resumable<?, ?> resumable;
+
+    public ResumableCompletion(ResumeStrategy resumeStrategy, Resumable<?, ?> 
resumable) {
+        this.resumeStrategy = resumeStrategy;
+        this.resumable = resumable;
+    }
+
+    @Override
+    public void onComplete(Exchange exchange) {
+        if (!ExchangeHelper.isFailureHandled(exchange)) {
+            if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) {
+                UpdatableConsumerResumeStrategy updatableConsumerResumeStrategy
+                        = (UpdatableConsumerResumeStrategy) resumeStrategy;
+                try {
+                    
updatableConsumerResumeStrategy.updateLastOffset(resumable);
+                } catch (Exception e) {
+                    LOG.error("Unable to update the offset: {}", 
e.getMessage(), e);
+                }
+            } else {
+                LOG.warn("Cannot perform an offset update because the strategy 
is not updatable");
+            }
+        }
+    }
+
+    @Override
+    public void onFailure(Exchange exchange) {
+        LOG.warn("Skipping offset update for {} due to failure in processing", 
resumable.getAddressable());

Review comment:
       I wonder if this gets very noisy as failing an exchange can be okay

##########
File path: 
core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
+import org.apache.camel.Resumable;
+import org.apache.camel.ResumeStrategy;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResumableProcessor extends AsyncProcessorSupport
+        implements Navigate<Processor>, CamelContextAware, IdAware, 
RouteIdAware {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResumableProcessor.class);
+    private CamelContext camelContext;
+    private ResumeStrategy resumeStrategy;
+    private AsyncProcessor processor;
+    private String id;
+    private String routeId;
+
+    private static class ResumableProcessorCallback implements AsyncCallback {
+
+        private final Exchange exchange;
+        private final Synchronization completion;
+        private final AsyncCallback callback;
+
+        public ResumableProcessorCallback(Exchange exchange, Synchronization 
completion, AsyncCallback callback) {
+            this.exchange = exchange;
+            this.completion = completion;
+            this.callback = callback;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            try {
+                if (exchange.isFailed()) {
+                    completion.onFailure(exchange);
+                } else {
+                    completion.onComplete(exchange);
+                }
+            } finally {
+                callback.done(doneSync);
+            }
+        }
+    }
+
+    public ResumableProcessor(ResumeStrategy resumeStrategy, Processor 
processor) {
+        this.resumeStrategy = Objects.requireNonNull(resumeStrategy);
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
+
+        LOG.info("Enabling the resumable strategy of type: {}", 
resumeStrategy.getClass().getSimpleName());

Review comment:
       Better to do this logging in doInit or doStart etc

##########
File path: 
core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
##########
@@ -0,0 +1,179 @@
+= Resume Strategies
+:doctitle: Resume Strategies
+:shortname: resume
+:description: Provide strategies to allow consuming data from specific offsets
+:since:
+:supportlevel: Experimental
+
+The resume strategies allow users to implement strategies that point the 
consumer part of the routes to the last point of consumption. This allows Camel 
to skip reading and processing data that has already been consumed.
+
+The resume strategies can be used to allow quicker stop and resume operations 
when consuming large data sources. For instance, imagine a scenario where the 
file consumer is reading a large file. Without a resume strategy, stopping and 
starting Camel would cause the consumer in the File component to read all the 
bytes of the given file at the initial offset (offset 0). The resume strategy 
allow integrations can point the consumer to the exact offset to resume the 
operations.
+
+The resume strategies comes in 3 parts:
+
+* A DSL method that marks the route as supporting resume operations and points 
to an instance of a strategy implementation.
+* A set of core infrastructure that allow integrations to implement different 
types of strategies
+* Basic strategies implementations that can be extended to implement the 
specific resume strategies required by the integrations
+
+=== The DSL method
+
+The route needs to use the `resumable()` method followed by a 
`resumableStrategyRef` to point to an instance of the resume strategy in use. 
This instance needs to be bound in the Context registry.

Review comment:
       Why does it need to be bound, in Java DSL you should IMHO also be able 
to use the instance object also, resumableStrategy(new MyStrategy())

##########
File path: 
core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.ResumeStrategy;
+
+public class ResumableDefinition extends OutputExpressionNode {
+    @XmlAttribute(required = true)
+    private String resumeStrategyRef;
+
+    @XmlTransient
+    private ResumeStrategy resumeStrategy;
+
+    @Override
+    public String getShortName() {
+        return "resumable";
+    }
+
+    public String getResumeStrategyRef() {
+        return resumeStrategyRef;
+    }
+
+    public void setResumeStrategyRef(String resumeStrategyRef) {
+        this.resumeStrategyRef = resumeStrategyRef;
+    }
+
+    public ResumeStrategy getResumeStrategy() {
+        return resumeStrategy;
+    }
+
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
+    }
+
+    // Fluent API
+    // 
-------------------------------------------------------------------------
+    public ResumableDefinition resumableStrategyRef(String resumeStrategyRef) {

Review comment:
       And add fluent method to set the strategy instance directly

##########
File path: 
core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Resumable;
+import org.apache.camel.ResumeStrategy;
+import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ExchangeHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResumableCompletion implements Synchronization {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResumableCompletion.class);
+
+    private final ResumeStrategy resumeStrategy;
+    private final Resumable<?, ?> resumable;
+
+    public ResumableCompletion(ResumeStrategy resumeStrategy, Resumable<?, ?> 
resumable) {
+        this.resumeStrategy = resumeStrategy;
+        this.resumable = resumable;
+    }
+
+    @Override
+    public void onComplete(Exchange exchange) {
+        if (!ExchangeHelper.isFailureHandled(exchange)) {
+            if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) {
+                UpdatableConsumerResumeStrategy updatableConsumerResumeStrategy
+                        = (UpdatableConsumerResumeStrategy) resumeStrategy;
+                try {
+                    
updatableConsumerResumeStrategy.updateLastOffset(resumable);
+                } catch (Exception e) {
+                    LOG.error("Unable to update the offset: {}", 
e.getMessage(), e);
+                }
+            } else {
+                LOG.warn("Cannot perform an offset update because the strategy 
is not updatable");

Review comment:
       This can be noisy log DEBUG instead. 
   
   Instead you should when starting up check if the strategy is updatable and 
then LOG only once about its limitation

##########
File path: 
core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
##########
@@ -0,0 +1,179 @@
+= Resume Strategies
+:doctitle: Resume Strategies
+:shortname: resume
+:description: Provide strategies to allow consuming data from specific offsets
+:since:
+:supportlevel: Experimental
+
+The resume strategies allow users to implement strategies that point the 
consumer part of the routes to the last point of consumption. This allows Camel 
to skip reading and processing data that has already been consumed.
+
+The resume strategies can be used to allow quicker stop and resume operations 
when consuming large data sources. For instance, imagine a scenario where the 
file consumer is reading a large file. Without a resume strategy, stopping and 
starting Camel would cause the consumer in the File component to read all the 
bytes of the given file at the initial offset (offset 0). The resume strategy 
allow integrations can point the consumer to the exact offset to resume the 
operations.
+
+The resume strategies comes in 3 parts:
+
+* A DSL method that marks the route as supporting resume operations and points 
to an instance of a strategy implementation.
+* A set of core infrastructure that allow integrations to implement different 
types of strategies
+* Basic strategies implementations that can be extended to implement the 
specific resume strategies required by the integrations
+
+=== The DSL method
+
+The route needs to use the `resumable()` method followed by a 
`resumableStrategyRef` to point to an instance of the resume strategy in use. 
This instance needs to be bound in the Context registry.

Review comment:
       xxxRef is old style, we are adjusting the DSL to be just xxx, so we 
should rename this

##########
File path: 
core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.ResumeStrategy;
+
+public class ResumableDefinition extends OutputExpressionNode {
+    @XmlAttribute(required = true)
+    private String resumeStrategyRef;
+
+    @XmlTransient
+    private ResumeStrategy resumeStrategy;
+
+    @Override
+    public String getShortName() {
+        return "resumable";
+    }
+
+    public String getResumeStrategyRef() {
+        return resumeStrategyRef;
+    }
+
+    public void setResumeStrategyRef(String resumeStrategyRef) {
+        this.resumeStrategyRef = resumeStrategyRef;
+    }
+
+    public ResumeStrategy getResumeStrategy() {
+        return resumeStrategy;
+    }
+
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
+    }
+
+    // Fluent API
+    // 
-------------------------------------------------------------------------
+    public ResumableDefinition resumableStrategyRef(String resumeStrategyRef) {

Review comment:
       resumableStrategy

##########
File path: 
core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
##########
@@ -0,0 +1,179 @@
+= Resume Strategies
+:doctitle: Resume Strategies
+:shortname: resume
+:description: Provide strategies to allow consuming data from specific offsets
+:since:
+:supportlevel: Experimental
+
+The resume strategies allow users to implement strategies that point the 
consumer part of the routes to the last point of consumption. This allows Camel 
to skip reading and processing data that has already been consumed.
+
+The resume strategies can be used to allow quicker stop and resume operations 
when consuming large data sources. For instance, imagine a scenario where the 
file consumer is reading a large file. Without a resume strategy, stopping and 
starting Camel would cause the consumer in the File component to read all the 
bytes of the given file at the initial offset (offset 0). The resume strategy 
allow integrations can point the consumer to the exact offset to resume the 
operations.
+
+The resume strategies comes in 3 parts:
+
+* A DSL method that marks the route as supporting resume operations and points 
to an instance of a strategy implementation.
+* A set of core infrastructure that allow integrations to implement different 
types of strategies
+* Basic strategies implementations that can be extended to implement the 
specific resume strategies required by the integrations
+
+=== The DSL method
+
+The route needs to use the `resumable()` method followed by a 
`resumableStrategyRef` to point to an instance of the resume strategy in use. 
This instance needs to be bound in the Context registry.
+
+[source,java]
+----
+getCamelContext().getRegistry().bind("testResumeStrategy", testResumeStrategy);
+
+from("some:component")
+    .process(this::process)
+    .resumable().resumableStrategyRef("testResumeStrategy");
+----
+
+
+=== The Core Interfaces
+
+These are the *core interfaces*:
+
+* org.apache.camel.ResumeStrategy: the basic resume strategy
+* org.apache.camel.UpdatableConsumerResumeStrategy: an extension to the resume 
strategy to allow updatable strategies
+* org.apache.camel.ResumeCache: an interface for local cache for resumable 
information
+
+These are the *core classes* supporting the strategies:
+
+* org.apache.camel.Resumable: an interface to allow users to work with 
abstract resumable entities (files, offsets, etc)
+* org.apache.camel.ResumableSet: an interface for resumables with a 1-to-many 
relationship
+* org.apache.camel.Offset: a generic offset without a concrete type (it may 
represent a long, a file name, etc)
+
+
+These are the *supporting classes*:
+
+* org.apache.camel.support.Resumables: resumables handling support
+* org.apache.camel.support.Offsets: offset handling support
+
+
+=== Basic Strategies
+
+The basic strategies offer a component-specific skeleton that can be used to 
implement strategies.
+
+* AbstractKafkaResumeStrategy: a resume strategy from the `camel-kafka` 
component that uses Kafka as the store for the offsets.
+
+
+[source,java]
+----
+public class KafkaResumeStrategy<K> extends AbstractKafkaResumeStrategy<K, 
Long> implements GenericFileResumeStrategy<File> {
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaResumeStrategy.class);
+public static final int CACHE_SIZE = 100;
+
+    private final String topic;
+    private final ResumeCache<K, Long> cache;
+
+    public KafkaResumeStrategy(String topic,
+                               ResumeCache<K, Long> cache,
+                               DefaultProducerPropertyFactory 
producerPropertyFactory,
+                               DefaultConsumerPropertyFactory 
consumerPropertyFactory)
+    {
+        super(topic, cache, producerPropertyFactory.getProperties(), 
consumerPropertyFactory.getProperties());
+        this.topic = topic;
+        this.cache = cache;
+    }
+
+
+    private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) 
{
+        final File addressable = resumable.getAddressable();
+        return getLastOffset((K) addressable);
+    }
+
+    public Optional<Long> getLastOffset(K addressable) {
+        return cache.get(addressable);
+    }
+
+    @Override
+    public void subscribe() {
+        checkAndSubscribe(topic, 1);
+    }
+
+    @Override
+    public void resume(GenericFileResumable<File> resumable) {
+        final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
+
+        if (!lastOffsetOpt.isPresent()) {
+            return;
+        }
+
+        final long lastOffset = lastOffsetOpt.get();
+        resumable.updateLastOffset(lastOffset);
+    }
+
+    @Override
+    public void resume() {
+        throw new UnsupportedOperationException("Cannot perform blind resume");
+    }
+}
+----
+
+
+=== Local Cache Support
+
+A sample local cache implemented using 
https://github.com/ben-manes/caffeine[Caffeine].
+
+[source,java]
+----
+public class SingleItemCache<K> implements ResumeCache<K, Long> {
+    public static final int CACHE_SIZE = 100;
+    private final Cache<K, Long> cache = Caffeine.newBuilder()
+            .maximumSize(CACHE_SIZE)
+            .build();
+
+    @Override
+    public void add(K key, Long offsetValue) {
+        cache.put(key, offsetValue);
+    }
+
+    @Override
+    public Optional<Long> get(K key) {
+        Long entry = cache.getIfPresent(key);
+
+        if (entry == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(entry.longValue());
+    }
+
+    @Override
+    public boolean isFull() {
+        if (cache.estimatedSize() < CACHE_SIZE) {
+            return true;
+        }
+
+        return false;
+    }
+}
+----
+
+
+
+=== Known Limitations, Bugs and Other Notes
+
+* When using the converters with the file component, beware of the differences 
in the behavior from `Reader` and `InputStream`:
+
+For instance, the behavior of:
+
+[source,java]
+----
+from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
+    .convertBodyTo(Reader.class)
+    .process(this::process)
+    .resumable().resumableStrategyRef("testResumeStrategy");

Review comment:
       Move up

##########
File path: 
core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.reifier;
+
+import org.apache.camel.Processor;
+import org.apache.camel.ResumeStrategy;
+import org.apache.camel.Route;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ResumableDefinition;
+import org.apache.camel.processor.resume.ResumableProcessor;
+import org.apache.camel.util.ObjectHelper;
+
+public class ResumableReifier extends ExpressionReifier<ResumableDefinition> {
+    protected ResumableReifier(Route route, ProcessorDefinition<?> definition) 
{
+        super(route, ResumableDefinition.class.cast(definition));
+
+    }
+
+    @Override
+    public Processor createProcessor() throws Exception {
+        Processor childProcessor = createChildProcessor(false);
+
+        ResumeStrategy resumeStrategy = resolveResumeStrategy();
+        ObjectHelper.notNull(resumeStrategy, "resumeStrategy", definition);
+        resumeStrategy.start();

Review comment:
       Do not start in create, as reifier should ideally only create, and start 
is done later when Camel starts the routes

##########
File path: 
core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.ResumeStrategy;
+
+public class ResumableDefinition extends OutputExpressionNode {

Review comment:
       Should be NoOutput, and why do you need an expression?

##########
File path: 
core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.reifier;
+
+import org.apache.camel.Processor;
+import org.apache.camel.ResumeStrategy;
+import org.apache.camel.Route;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ResumableDefinition;
+import org.apache.camel.processor.resume.ResumableProcessor;
+import org.apache.camel.util.ObjectHelper;
+
+public class ResumableReifier extends ExpressionReifier<ResumableDefinition> {
+    protected ResumableReifier(Route route, ProcessorDefinition<?> definition) 
{
+        super(route, ResumableDefinition.class.cast(definition));
+
+    }
+
+    @Override
+    public Processor createProcessor() throws Exception {
+        Processor childProcessor = createChildProcessor(false);
+
+        ResumeStrategy resumeStrategy = resolveResumeStrategy();
+        ObjectHelper.notNull(resumeStrategy, "resumeStrategy", definition);
+        resumeStrategy.start();

Review comment:
       And let ResumableProcessor handle the lifecycle where it can start the 
strategy from its doInit / doStart methods

##########
File path: 
core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
##########
@@ -0,0 +1,179 @@
+= Resume Strategies
+:doctitle: Resume Strategies
+:shortname: resume
+:description: Provide strategies to allow consuming data from specific offsets
+:since:
+:supportlevel: Experimental
+
+The resume strategies allow users to implement strategies that point the 
consumer part of the routes to the last point of consumption. This allows Camel 
to skip reading and processing data that has already been consumed.
+
+The resume strategies can be used to allow quicker stop and resume operations 
when consuming large data sources. For instance, imagine a scenario where the 
file consumer is reading a large file. Without a resume strategy, stopping and 
starting Camel would cause the consumer in the File component to read all the 
bytes of the given file at the initial offset (offset 0). The resume strategy 
allow integrations can point the consumer to the exact offset to resume the 
operations.
+
+The resume strategies comes in 3 parts:
+
+* A DSL method that marks the route as supporting resume operations and points 
to an instance of a strategy implementation.
+* A set of core infrastructure that allow integrations to implement different 
types of strategies
+* Basic strategies implementations that can be extended to implement the 
specific resume strategies required by the integrations
+
+=== The DSL method
+
+The route needs to use the `resumable()` method followed by a 
`resumableStrategyRef` to point to an instance of the resume strategy in use. 
This instance needs to be bound in the Context registry.
+
+[source,java]
+----
+getCamelContext().getRegistry().bind("testResumeStrategy", testResumeStrategy);
+
+from("some:component")
+    .process(this::process)
+    .resumable().resumableStrategyRef("testResumeStrategy");
+----
+
+
+=== The Core Interfaces
+
+These are the *core interfaces*:
+
+* org.apache.camel.ResumeStrategy: the basic resume strategy
+* org.apache.camel.UpdatableConsumerResumeStrategy: an extension to the resume 
strategy to allow updatable strategies
+* org.apache.camel.ResumeCache: an interface for local cache for resumable 
information
+
+These are the *core classes* supporting the strategies:
+
+* org.apache.camel.Resumable: an interface to allow users to work with 
abstract resumable entities (files, offsets, etc)
+* org.apache.camel.ResumableSet: an interface for resumables with a 1-to-many 
relationship
+* org.apache.camel.Offset: a generic offset without a concrete type (it may 
represent a long, a file name, etc)
+
+
+These are the *supporting classes*:
+
+* org.apache.camel.support.Resumables: resumables handling support
+* org.apache.camel.support.Offsets: offset handling support
+
+
+=== Basic Strategies
+
+The basic strategies offer a component-specific skeleton that can be used to 
implement strategies.
+
+* AbstractKafkaResumeStrategy: a resume strategy from the `camel-kafka` 
component that uses Kafka as the store for the offsets.
+
+
+[source,java]
+----
+public class KafkaResumeStrategy<K> extends AbstractKafkaResumeStrategy<K, 
Long> implements GenericFileResumeStrategy<File> {
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaResumeStrategy.class);
+public static final int CACHE_SIZE = 100;
+
+    private final String topic;
+    private final ResumeCache<K, Long> cache;
+
+    public KafkaResumeStrategy(String topic,
+                               ResumeCache<K, Long> cache,
+                               DefaultProducerPropertyFactory 
producerPropertyFactory,
+                               DefaultConsumerPropertyFactory 
consumerPropertyFactory)
+    {
+        super(topic, cache, producerPropertyFactory.getProperties(), 
consumerPropertyFactory.getProperties());
+        this.topic = topic;
+        this.cache = cache;
+    }
+
+
+    private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) 
{
+        final File addressable = resumable.getAddressable();
+        return getLastOffset((K) addressable);
+    }
+
+    public Optional<Long> getLastOffset(K addressable) {
+        return cache.get(addressable);
+    }
+
+    @Override
+    public void subscribe() {
+        checkAndSubscribe(topic, 1);
+    }
+
+    @Override
+    public void resume(GenericFileResumable<File> resumable) {
+        final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
+
+        if (!lastOffsetOpt.isPresent()) {
+            return;
+        }
+
+        final long lastOffset = lastOffsetOpt.get();
+        resumable.updateLastOffset(lastOffset);
+    }
+
+    @Override
+    public void resume() {
+        throw new UnsupportedOperationException("Cannot perform blind resume");
+    }
+}
+----
+
+
+=== Local Cache Support
+
+A sample local cache implemented using 
https://github.com/ben-manes/caffeine[Caffeine].
+
+[source,java]
+----
+public class SingleItemCache<K> implements ResumeCache<K, Long> {
+    public static final int CACHE_SIZE = 100;
+    private final Cache<K, Long> cache = Caffeine.newBuilder()
+            .maximumSize(CACHE_SIZE)
+            .build();
+
+    @Override
+    public void add(K key, Long offsetValue) {
+        cache.put(key, offsetValue);
+    }
+
+    @Override
+    public Optional<Long> get(K key) {
+        Long entry = cache.getIfPresent(key);
+
+        if (entry == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(entry.longValue());
+    }
+
+    @Override
+    public boolean isFull() {
+        if (cache.estimatedSize() < CACHE_SIZE) {
+            return true;
+        }
+
+        return false;
+    }
+}
+----
+
+
+
+=== Known Limitations, Bugs and Other Notes
+
+* When using the converters with the file component, beware of the differences 
in the behavior from `Reader` and `InputStream`:
+
+For instance, the behavior of:
+
+[source,java]
+----
+from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
+    .convertBodyTo(Reader.class)
+    .process(this::process)
+    .resumable().resumableStrategyRef("testResumeStrategy");
+----
+
+Is different from the behavior of:
+
+[source,java]
+----
+from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
+    .convertBodyTo(InputStream.class)
+    .process(this::process)
+    .resumable().resumableStrategyRef("testResumeStrategy");

Review comment:
       Move up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to