This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 7931726e0eb CAMEL-18174: allow intermittent mode in the resume API
7931726e0eb is described below

commit 7931726e0ebdf3d995b11ef2a7413c3cba68cc96
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Wed Jun 8 13:16:24 2022 +0200

    CAMEL-18174: allow intermittent mode in the resume API
---
 .../org/apache/camel/catalog/models/resumable.json  |  1 +
 .../apache/camel/catalog/schemas/camel-spring.xsd   |  8 ++++++++
 .../docs/modules/eips/pages/resume-strategies.adoc  | 10 ++++++++++
 .../resources/org/apache/camel/model/resumable.json |  1 +
 .../org/apache/camel/model/ResumableDefinition.java | 21 +++++++++++++++++++++
 .../camel/processor/resume/ResumableCompletion.java | 10 +++++++---
 .../camel/processor/resume/ResumableProcessor.java  |  7 +++++--
 .../org/apache/camel/reifier/ResumableReifier.java  |  4 +++-
 .../FileConsumerResumeFromOffsetStrategyTest.java   | 21 +++++++++++++++++++++
 .../java/org/apache/camel/xml/in/ModelParser.java   |  9 +++++----
 .../dsl/yaml/deserializers/ModelDeserializers.java  |  6 ++++++
 .../src/generated/resources/camel-yaml-dsl.json     |  3 +++
 .../src/generated/resources/camelYamlDsl.json       |  3 +++
 13 files changed, 94 insertions(+), 10 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
index 03a0a440e46..7ff91cd1275 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
@@ -13,6 +13,7 @@
   },
   "properties": {
     "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", 
"required": true, "type": "object", "javaType": 
"org.apache.camel.resume.ResumeStrategy", "deprecated": false, "autowired": 
false, "secret": false, "description": "Sets the resume strategy to use" },
+    "intermittent": { "kind": "attribute", "displayName": "Intermittent", 
"label": "advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Sets whether the offsets will be 
intermittently present or whether they must be present in every exchange" },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the id of this node" 
},
     "description": { "kind": "element", "displayName": "Description", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.model.DescriptionDefinition", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the description of 
this node" }
   }
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index 7148e8b0b05..6095aaa734a 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -10515,6 +10515,14 @@ Sets the resume strategy to use.
             ]]></xs:documentation>
           </xs:annotation>
         </xs:attribute>
+        <xs:attribute name="intermittent" type="xs:string">
+          <xs:annotation>
+            <xs:documentation xml:lang="en"><![CDATA[
+Sets whether the offsets will be intermittently present or whether they must be
+present in every exchange. Default value: false
+            ]]></xs:documentation>
+          </xs:annotation>
+        </xs:attribute>
       </xs:extension>
     </xs:complexContent>
   </xs:complexType>
diff --git 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index fb3a637cca5..1cd95bdcfb5 100644
--- 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -59,6 +59,16 @@ In some circumstances, such as when dealing with File I/O, 
it may be necessary t
 * `org.apache.camel.support.Resumables` - resumables handling support
 * `org.apache.camel.support.Offsets` - offset handling support
 
+=== Intermittent Mode
+
+In some cases it may be necessary to avoid updating the offset for every 
exchange. You can enable the intermittent mode to modify the route behavior so 
that missing offsets will not cause an exception:
+
+[source,java]
+----
+from("some:component")
+.resumable(new MyTestResumeStrategy()).intermittent(true)
+.process(this::process)
+----
 
 == Builtin Resume Strategies
 
diff --git 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
index 03a0a440e46..7ff91cd1275 100644
--- 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
+++ 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
@@ -13,6 +13,7 @@
   },
   "properties": {
     "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", 
"required": true, "type": "object", "javaType": 
"org.apache.camel.resume.ResumeStrategy", "deprecated": false, "autowired": 
false, "secret": false, "description": "Sets the resume strategy to use" },
+    "intermittent": { "kind": "attribute", "displayName": "Intermittent", 
"label": "advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Sets whether the offsets will be 
intermittently present or whether they must be present in every exchange" },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the id of this node" 
},
     "description": { "kind": "element", "displayName": "Description", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.model.DescriptionDefinition", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the description of 
this node" }
   }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
index bc68161ae30..96d07951092 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
@@ -44,6 +44,10 @@ public class ResumableDefinition extends 
NoOutputDefinition<ResumableDefinition>
               enums = "TRACE,DEBUG,INFO,WARN,ERROR,OFF")
     private String loggingLevel;
 
+    @XmlAttribute
+    @Metadata(label = "advanced", javaType = "java.lang.Boolean", defaultValue 
= "false")
+    private String intermittent;
+
     @Override
     public String getShortName() {
         return "resumable";
@@ -78,6 +82,14 @@ public class ResumableDefinition extends 
NoOutputDefinition<ResumableDefinition>
         this.loggingLevel = loggingLevelRef;
     }
 
+    public String getIntermittent() {
+        return intermittent;
+    }
+
+    public void setIntermittent(String intermitent) {
+        this.intermittent = intermitent;
+    }
+
     // Fluent API
     // 
-------------------------------------------------------------------------
 
@@ -114,4 +126,13 @@ public class ResumableDefinition extends 
NoOutputDefinition<ResumableDefinition>
         setLoggingLevel(loggingLevelRef);
         return this;
     }
+
+    /**
+     * Sets whether the offsets will be intermittently present or whether they 
must be present in every exchange
+     */
+    public ResumableDefinition intermittent(boolean intermittent) {
+        setIntermittent(Boolean.toString(intermittent));
+
+        return this;
+    }
 }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
index c4675683332..dfb75ee5b8f 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -33,10 +33,12 @@ public class ResumableCompletion implements Synchronization 
{
 
     private final ResumeStrategy resumeStrategy;
     private final LoggingLevel loggingLevel;
+    private final boolean intermittent;
 
-    public ResumableCompletion(ResumeStrategy resumeStrategy, LoggingLevel 
loggingLevel) {
+    public ResumableCompletion(ResumeStrategy resumeStrategy, LoggingLevel 
loggingLevel, boolean intermittent) {
         this.resumeStrategy = resumeStrategy;
         this.loggingLevel = loggingLevel;
+        this.intermittent = intermittent;
     }
 
     @Override
@@ -67,8 +69,10 @@ public class ResumableCompletion implements Synchronization {
                 LOG.debug("Cannot perform an offset update because the 
strategy is not updatable");
             }
         } else {
-            exchange.setException(new NoOffsetException(exchange));
-            LOG.warn("Cannot update the last offset because it's not 
available");
+            if (!intermittent) {
+                exchange.setException(new NoOffsetException(exchange));
+                LOG.warn("Cannot update the last offset because it's not 
available");
+            }
         }
     }
 
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
index 34b2b475c7e..a41ee86d96d 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
@@ -50,13 +50,16 @@ public class ResumableProcessor extends 
AsyncProcessorSupport
     private final ResumeStrategy resumeStrategy;
     private final AsyncProcessor processor;
     private final LoggingLevel loggingLevel;
+    private final boolean intermittent;
     private String id;
     private String routeId;
 
-    public ResumableProcessor(ResumeStrategy resumeStrategy, Processor 
processor, LoggingLevel loggingLevel) {
+    public ResumableProcessor(ResumeStrategy resumeStrategy, Processor 
processor, LoggingLevel loggingLevel,
+                              boolean intermittent) {
         this.resumeStrategy = Objects.requireNonNull(resumeStrategy);
         this.processor = AsyncProcessorConverterHelper.convert(processor);
         this.loggingLevel = loggingLevel;
+        this.intermittent = intermittent;
     }
 
     @Override
@@ -76,7 +79,7 @@ public class ResumableProcessor extends AsyncProcessorSupport
 
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
-        final Synchronization onCompletion = new 
ResumableCompletion(resumeStrategy, loggingLevel);
+        final Synchronization onCompletion = new 
ResumableCompletion(resumeStrategy, loggingLevel, intermittent);
 
         exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
 
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index da5c21e9dc1..be76af8c7ae 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -40,7 +40,9 @@ public class ResumableReifier extends 
ProcessorReifier<ResumableDefinition> {
 
         route.setResumeStrategy(resumeStrategy);
         LoggingLevel loggingLevel = resolveLoggingLevel();
-        return new ResumableProcessor(resumeStrategy, childProcessor, 
loggingLevel);
+
+        boolean intermittent = parseBoolean(definition.getIntermittent(), 
false);
+        return new ResumableProcessor(resumeStrategy, childProcessor, 
loggingLevel, intermittent);
     }
 
     protected ResumeStrategy resolveResumeStrategy() {
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index 1c038f4a639..ae10d306ea5 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -128,6 +128,22 @@ public class FileConsumerResumeFromOffsetStrategyTest 
extends ContextTestSupport
         Assertions.assertFalse(((FailResumeAdapter) 
FAIL_RESUME_STRATEGY.getAdapter()).called);
     }
 
+    @DisplayName("Tests whether it a missing offset does not cause a failure 
when using intermittent mode")
+    @Test
+    public void testMissingOffsetWithIntermittentMode() throws 
InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceivedInAnyOrder("01234567890");
+
+        template.sendBodyAndHeader(fileUri("resumeMissingOffsetIntermittent"), 
"01234567890", Exchange.FILE_NAME,
+                "resume-from-offset.txt");
+
+        assertMockEndpointsSatisfied();
+
+        List<Exchange> exchangeList = mock.getExchanges();
+        Assertions.assertFalse(exchangeList.isEmpty(), "It should have 
received a few messages");
+        Assertions.assertFalse(((FailResumeAdapter) 
FAIL_RESUME_STRATEGY.getAdapter()).called);
+    }
+
     @DisplayName("Tests whether we can start from the beginning (i.e.: no 
resume strategy)")
     @Test
     public void testNoResume() throws Exception {
@@ -161,6 +177,11 @@ public class FileConsumerResumeFromOffsetStrategyTest 
extends ContextTestSupport
                         .log("${body}")
                         .convertBodyTo(String.class).to("mock:result");
 
+                
from(fileUri("resumeMissingOffsetIntermittent?noop=true&recursive=true"))
+                        
.resumable().resumeStrategy("resumeNotToBeCalledStrategy").intermittent(true)
+                        .log("${body}")
+                        .convertBodyTo(String.class).to("mock:result");
+
                 from(fileUri("resumeNone?noop=true&recursive=true"))
                         .convertBodyTo(String.class).to("mock:result");
             }
diff --git 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index 61758a0af9e..4cae265b4f6 100644
--- 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++ 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -927,11 +927,12 @@ public class ModelParser extends BaseParser {
     }
     protected ResumableDefinition doParseResumableDefinition() throws 
IOException, XmlPullParserException {
         return doParse(new ResumableDefinition(), (def, key, val) -> {
-            if ("resumeStrategy".equals(key)) {
-                def.setResumeStrategy(val);
-                return true;
+            switch (key) {
+                case "intermittent": def.setIntermittent(val); break;
+                case "resumeStrategy": def.setResumeStrategy(val); break;
+                default: return 
processorDefinitionAttributeHandler().accept(def, key, val);
             }
-            return processorDefinitionAttributeHandler().accept(def, key, val);
+            return true;
         }, (def, key) -> {
             if ("loggingLevel".equals(key)) {
                 def.setLoggingLevel(doParseText());
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index 25eed181695..dad10658748 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -12476,6 +12476,7 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     @YamlProperty(name = "description", type = "string"),
                     @YamlProperty(name = "id", type = "string"),
                     @YamlProperty(name = "inherit-error-handler", type = 
"boolean"),
+                    @YamlProperty(name = "intermittent", type = "boolean"),
                     @YamlProperty(name = "resume-strategy", type = "string", 
required = true)
             }
     )
@@ -12498,6 +12499,11 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     
target.setInheritErrorHandler(java.lang.Boolean.valueOf(val));
                     break;
                 }
+                case "intermittent": {
+                    String val = asText(node);
+                    target.setIntermittent(val);
+                    break;
+                }
                 case "resume-strategy": {
                     String val = asText(node);
                     target.setResumeStrategy(val);
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
index d05a9b3af39..edf0ecd0ca8 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json
@@ -2259,6 +2259,9 @@
           "inherit-error-handler" : {
             "type" : "boolean"
           },
+          "intermittent" : {
+            "type" : "boolean"
+          },
           "resume-strategy" : {
             "type" : "string"
           }
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json
index 998852dfac8..61e7b809d70 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json
@@ -2163,6 +2163,9 @@
           "inheritErrorHandler" : {
             "type" : "boolean"
           },
+          "intermittent" : {
+            "type" : "boolean"
+          },
           "resumeStrategy" : {
             "type" : "string"
           }

Reply via email to