This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 7931726e0eb CAMEL-18174: allow intermittent mode in the resume API 7931726e0eb is described below commit 7931726e0ebdf3d995b11ef2a7413c3cba68cc96 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Wed Jun 8 13:16:24 2022 +0200 CAMEL-18174: allow intermittent mode in the resume API --- .../org/apache/camel/catalog/models/resumable.json | 1 + .../apache/camel/catalog/schemas/camel-spring.xsd | 8 ++++++++ .../docs/modules/eips/pages/resume-strategies.adoc | 10 ++++++++++ .../resources/org/apache/camel/model/resumable.json | 1 + .../org/apache/camel/model/ResumableDefinition.java | 21 +++++++++++++++++++++ .../camel/processor/resume/ResumableCompletion.java | 10 +++++++--- .../camel/processor/resume/ResumableProcessor.java | 7 +++++-- .../org/apache/camel/reifier/ResumableReifier.java | 4 +++- .../FileConsumerResumeFromOffsetStrategyTest.java | 21 +++++++++++++++++++++ .../java/org/apache/camel/xml/in/ModelParser.java | 9 +++++---- .../dsl/yaml/deserializers/ModelDeserializers.java | 6 ++++++ .../src/generated/resources/camel-yaml-dsl.json | 3 +++ .../src/generated/resources/camelYamlDsl.json | 3 +++ 13 files changed, 94 insertions(+), 10 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json index 03a0a440e46..7ff91cd1275 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json @@ -13,6 +13,7 @@ }, "properties": { "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" }, + "intermittent": { "kind": "attribute", "displayName": "Intermittent", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether the offsets will be intermittently present or whether they must be present in every exchange" }, "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" } } diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd index 7148e8b0b05..6095aaa734a 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd @@ -10515,6 +10515,14 @@ Sets the resume strategy to use. ]]></xs:documentation> </xs:annotation> </xs:attribute> + <xs:attribute name="intermittent" type="xs:string"> + <xs:annotation> + <xs:documentation xml:lang="en"><![CDATA[ +Sets whether the offsets will be intermittently present or whether they must be +present in every exchange. Default value: false + ]]></xs:documentation> + </xs:annotation> + </xs:attribute> </xs:extension> </xs:complexContent> </xs:complexType> diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc index fb3a637cca5..1cd95bdcfb5 100644 --- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc +++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc @@ -59,6 +59,16 @@ In some circumstances, such as when dealing with File I/O, it may be necessary t * `org.apache.camel.support.Resumables` - resumables handling support * `org.apache.camel.support.Offsets` - offset handling support +=== Intermittent Mode + +In some cases it may be necessary to avoid updating the offset for every exchange. You can enable the intermittent mode to modify the route behavior so that missing offsets will not cause an exception: + +[source,java] +---- +from("some:component") +.resumable(new MyTestResumeStrategy()).intermittent(true) +.process(this::process) +---- == Builtin Resume Strategies diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json index 03a0a440e46..7ff91cd1275 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json @@ -13,6 +13,7 @@ }, "properties": { "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" }, + "intermittent": { "kind": "attribute", "displayName": "Intermittent", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether the offsets will be intermittently present or whether they must be present in every exchange" }, "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" } } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java index bc68161ae30..96d07951092 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java @@ -44,6 +44,10 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition> enums = "TRACE,DEBUG,INFO,WARN,ERROR,OFF") private String loggingLevel; + @XmlAttribute + @Metadata(label = "advanced", javaType = "java.lang.Boolean", defaultValue = "false") + private String intermittent; + @Override public String getShortName() { return "resumable"; @@ -78,6 +82,14 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition> this.loggingLevel = loggingLevelRef; } + public String getIntermittent() { + return intermittent; + } + + public void setIntermittent(String intermitent) { + this.intermittent = intermitent; + } + // Fluent API // ------------------------------------------------------------------------- @@ -114,4 +126,13 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition> setLoggingLevel(loggingLevelRef); return this; } + + /** + * Sets whether the offsets will be intermittently present or whether they must be present in every exchange + */ + public ResumableDefinition intermittent(boolean intermittent) { + setIntermittent(Boolean.toString(intermittent)); + + return this; + } } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java index c4675683332..dfb75ee5b8f 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java @@ -33,10 +33,12 @@ public class ResumableCompletion implements Synchronization { private final ResumeStrategy resumeStrategy; private final LoggingLevel loggingLevel; + private final boolean intermittent; - public ResumableCompletion(ResumeStrategy resumeStrategy, LoggingLevel loggingLevel) { + public ResumableCompletion(ResumeStrategy resumeStrategy, LoggingLevel loggingLevel, boolean intermittent) { this.resumeStrategy = resumeStrategy; this.loggingLevel = loggingLevel; + this.intermittent = intermittent; } @Override @@ -67,8 +69,10 @@ public class ResumableCompletion implements Synchronization { LOG.debug("Cannot perform an offset update because the strategy is not updatable"); } } else { - exchange.setException(new NoOffsetException(exchange)); - LOG.warn("Cannot update the last offset because it's not available"); + if (!intermittent) { + exchange.setException(new NoOffsetException(exchange)); + LOG.warn("Cannot update the last offset because it's not available"); + } } } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java index 34b2b475c7e..a41ee86d96d 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java @@ -50,13 +50,16 @@ public class ResumableProcessor extends AsyncProcessorSupport private final ResumeStrategy resumeStrategy; private final AsyncProcessor processor; private final LoggingLevel loggingLevel; + private final boolean intermittent; private String id; private String routeId; - public ResumableProcessor(ResumeStrategy resumeStrategy, Processor processor, LoggingLevel loggingLevel) { + public ResumableProcessor(ResumeStrategy resumeStrategy, Processor processor, LoggingLevel loggingLevel, + boolean intermittent) { this.resumeStrategy = Objects.requireNonNull(resumeStrategy); this.processor = AsyncProcessorConverterHelper.convert(processor); this.loggingLevel = loggingLevel; + this.intermittent = intermittent; } @Override @@ -76,7 +79,7 @@ public class ResumableProcessor extends AsyncProcessorSupport @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { - final Synchronization onCompletion = new ResumableCompletion(resumeStrategy, loggingLevel); + final Synchronization onCompletion = new ResumableCompletion(resumeStrategy, loggingLevel, intermittent); exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java index da5c21e9dc1..be76af8c7ae 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java @@ -40,7 +40,9 @@ public class ResumableReifier extends ProcessorReifier<ResumableDefinition> { route.setResumeStrategy(resumeStrategy); LoggingLevel loggingLevel = resolveLoggingLevel(); - return new ResumableProcessor(resumeStrategy, childProcessor, loggingLevel); + + boolean intermittent = parseBoolean(definition.getIntermittent(), false); + return new ResumableProcessor(resumeStrategy, childProcessor, loggingLevel, intermittent); } protected ResumeStrategy resolveResumeStrategy() { diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java index 1c038f4a639..ae10d306ea5 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java @@ -128,6 +128,22 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport Assertions.assertFalse(((FailResumeAdapter) FAIL_RESUME_STRATEGY.getAdapter()).called); } + @DisplayName("Tests whether it a missing offset does not cause a failure when using intermittent mode") + @Test + public void testMissingOffsetWithIntermittentMode() throws InterruptedException { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceivedInAnyOrder("01234567890"); + + template.sendBodyAndHeader(fileUri("resumeMissingOffsetIntermittent"), "01234567890", Exchange.FILE_NAME, + "resume-from-offset.txt"); + + assertMockEndpointsSatisfied(); + + List<Exchange> exchangeList = mock.getExchanges(); + Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages"); + Assertions.assertFalse(((FailResumeAdapter) FAIL_RESUME_STRATEGY.getAdapter()).called); + } + @DisplayName("Tests whether we can start from the beginning (i.e.: no resume strategy)") @Test public void testNoResume() throws Exception { @@ -161,6 +177,11 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport .log("${body}") .convertBodyTo(String.class).to("mock:result"); + from(fileUri("resumeMissingOffsetIntermittent?noop=true&recursive=true")) + .resumable().resumeStrategy("resumeNotToBeCalledStrategy").intermittent(true) + .log("${body}") + .convertBodyTo(String.class).to("mock:result"); + from(fileUri("resumeNone?noop=true&recursive=true")) .convertBodyTo(String.class).to("mock:result"); } diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java index 61758a0af9e..4cae265b4f6 100644 --- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java +++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java @@ -927,11 +927,12 @@ public class ModelParser extends BaseParser { } protected ResumableDefinition doParseResumableDefinition() throws IOException, XmlPullParserException { return doParse(new ResumableDefinition(), (def, key, val) -> { - if ("resumeStrategy".equals(key)) { - def.setResumeStrategy(val); - return true; + switch (key) { + case "intermittent": def.setIntermittent(val); break; + case "resumeStrategy": def.setResumeStrategy(val); break; + default: return processorDefinitionAttributeHandler().accept(def, key, val); } - return processorDefinitionAttributeHandler().accept(def, key, val); + return true; }, (def, key) -> { if ("loggingLevel".equals(key)) { def.setLoggingLevel(doParseText()); diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java index 25eed181695..dad10658748 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java @@ -12476,6 +12476,7 @@ public final class ModelDeserializers extends YamlDeserializerSupport { @YamlProperty(name = "description", type = "string"), @YamlProperty(name = "id", type = "string"), @YamlProperty(name = "inherit-error-handler", type = "boolean"), + @YamlProperty(name = "intermittent", type = "boolean"), @YamlProperty(name = "resume-strategy", type = "string", required = true) } ) @@ -12498,6 +12499,11 @@ public final class ModelDeserializers extends YamlDeserializerSupport { target.setInheritErrorHandler(java.lang.Boolean.valueOf(val)); break; } + case "intermittent": { + String val = asText(node); + target.setIntermittent(val); + break; + } case "resume-strategy": { String val = asText(node); target.setResumeStrategy(val); diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json index d05a9b3af39..edf0ecd0ca8 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json @@ -2259,6 +2259,9 @@ "inherit-error-handler" : { "type" : "boolean" }, + "intermittent" : { + "type" : "boolean" + }, "resume-strategy" : { "type" : "string" } diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json index 998852dfac8..61e7b809d70 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camelYamlDsl.json @@ -2163,6 +2163,9 @@ "inheritErrorHandler" : { "type" : "boolean" }, + "intermittent" : { + "type" : "boolean" + }, "resumeStrategy" : { "type" : "string" }