This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch sc-original in repository https://gitbox.apache.org/repos/asf/camel.git
commit a97ce4e299ee78e7f04fa427a5d11f0410ccc938 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Jul 31 17:16:19 2023 +0200 CAMEL-19670: camel-core - useOriginalMessage should do like stream-caching advice making defensive copy that is safe to re-read. Added unit test from Bartosz Popiela into camel-zipfile. --- .../camel/catalog/models/deadLetterChannel.json | 4 +- .../camel/catalog/models/defaultErrorHandler.json | 4 +- .../catalog/models/jtaTransactionErrorHandler.json | 4 +- .../apache/camel/catalog/models/onCompletion.json | 2 +- .../apache/camel/catalog/models/onException.json | 4 +- .../models/springTransactionErrorHandler.json | 4 +- .../zipfile/ZipSplitterUseOriginalMessageTest.java | 92 +++++++++++++++++++ .../camel/impl/engine/CamelInternalProcessor.java | 63 +------------ .../camel/impl/engine/DefaultUnitOfWork.java | 24 ++++- .../camel/impl/engine/StreamCachingHelper.java | 90 ++++++++++++++++++ .../model/errorhandler/deadLetterChannel.json | 4 +- .../model/errorhandler/defaultErrorHandler.json | 4 +- .../errorhandler/jtaTransactionErrorHandler.json | 4 +- .../springTransactionErrorHandler.json | 4 +- .../org/apache/camel/model/onCompletion.json | 2 +- .../org/apache/camel/model/onException.json | 4 +- .../apache/camel/model/OnCompletionDefinition.java | 54 ++++++++++- .../apache/camel/model/OnExceptionDefinition.java | 12 ++- .../DefaultErrorHandlerDefinition.java | 18 ++-- ...OnExceptionUseOriginalMessageStreamTwoTest.java | 102 +++++++++++++++++++++ 20 files changed, 402 insertions(+), 97 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/deadLetterChannel.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/deadLetterChannel.json index 9566fa14230..2b8b46b1d39 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/deadLetterChannel.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/deadLetterChannel.json @@ -17,8 +17,8 @@ "loggerRef": { "index": 2, "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "index": 3, "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "index": 4, "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] - "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] + "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] + "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] "onRedeliveryRef": { "index": 7, "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "index": 8, "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any excep [...] "onPrepareFailureRef": { "index": 9, "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/defaultErrorHandler.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/defaultErrorHandler.json index 4356476af29..177f147284f 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/defaultErrorHandler.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/defaultErrorHandler.json @@ -15,8 +15,8 @@ "loggerRef": { "index": 0, "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "index": 1, "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "index": 2, "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "index": 3, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] - "useOriginalBody": { "index": 4, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] + "useOriginalMessage": { "index": 3, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] + "useOriginalBody": { "index": 4, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] "onRedeliveryRef": { "index": 5, "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "index": 6, "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any excep [...] "onPrepareFailureRef": { "index": 7, "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/jtaTransactionErrorHandler.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/jtaTransactionErrorHandler.json index d7f79d23591..fec43a80fe0 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/jtaTransactionErrorHandler.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/jtaTransactionErrorHandler.json @@ -17,8 +17,8 @@ "loggerRef": { "index": 2, "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "index": 3, "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "index": 4, "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] - "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] + "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] + "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] "onRedeliveryRef": { "index": 7, "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "index": 8, "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any excep [...] "onPrepareFailureRef": { "index": 9, "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onCompletion.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onCompletion.json index 8841728e19b..22dbfddc6b8 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onCompletion.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onCompletion.json @@ -17,7 +17,7 @@ "onFailureOnly": { "index": 2, "kind": "attribute", "displayName": "On Failure Only", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will only synchronize when the org.apache.camel.Exchange ended with failure (exception or FAULT message)." }, "parallelProcessing": { "index": 3, "kind": "attribute", "displayName": "Parallel Processing", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the on completion process will run asynchronously by a separate thread from a thread pool. By default this is false, meaning the on completion process will run synchronously using the same [...] "executorService": { "index": 4, "kind": "attribute", "displayName": "Executor Service", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well." }, - "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input message body when an org.apache.camel.Exchange for this on completion. By default this feature is off." }, + "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input message body when an org.apache.camel.Exchange for this on completion. The original input message is defensively copied, and the copied message body is converted to org.apache [...] "onWhen": { "index": 6, "kind": "element", "displayName": "On When", "required": false, "type": "object", "javaType": "org.apache.camel.model.WhenDefinition", "deprecated": false, "autowired": false, "secret": false, "asPredicate": true, "description": "Sets an additional predicate that should be true before the onCompletion is triggered. To be used for fine grained controlling whether a completion callback should be invoked or not" }, "outputs": { "index": 7, "kind": "element", "displayName": "Outputs", "required": true, "type": "array", "javaType": "java.util.List", "oneOf": [ "aggregate", "bean", "choice", "circuitBreaker", "claimCheck", "convertBodyTo", "delay", "doCatch", "doFinally", "doTry", "dynamicRouter", "enrich", "filter", "idempotentConsumer", "intercept", "interceptFrom", "interceptSendToEndpoint", "kamelet", "loadBalance", "log", "loop", "marshal", "multicast", "onCompletion", "onException", "onFallb [...] "disabled": { "index": 8, "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." }, diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onException.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onException.json index b06d3c11a0f..8f0408d418d 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onException.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onException.json @@ -21,8 +21,8 @@ "continued": { "index": 6, "kind": "expression", "displayName": "Continued", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.model.ExpressionSubElementDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired" [...] "onRedeliveryRef": { "index": 7, "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "index": 8, "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception throw [...] - "useOriginalMessage": { "index": 9, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] - "useOriginalBody": { "index": 10, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] + "useOriginalMessage": { "index": 9, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] + "useOriginalBody": { "index": 10, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] "outputs": { "index": 11, "kind": "element", "displayName": "Outputs", "required": true, "type": "array", "javaType": "java.util.List", "oneOf": [ "aggregate", "bean", "choice", "circuitBreaker", "claimCheck", "convertBodyTo", "delay", "doCatch", "doFinally", "doTry", "dynamicRouter", "enrich", "filter", "idempotentConsumer", "intercept", "interceptFrom", "interceptSendToEndpoint", "kamelet", "loadBalance", "log", "loop", "marshal", "multicast", "onCompletion", "onException", "onFall [...] "disabled": { "index": 12, "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." }, "id": { "index": 13, "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/springTransactionErrorHandler.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/springTransactionErrorHandler.json index 4e000eaa676..d2263353c52 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/springTransactionErrorHandler.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/springTransactionErrorHandler.json @@ -17,8 +17,8 @@ "loggerRef": { "index": 2, "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "index": 3, "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "index": 4, "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] - "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] + "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] + "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] "onRedeliveryRef": { "index": 7, "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "index": 8, "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any excep [...] "onPrepareFailureRef": { "index": 9, "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter [...] diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterUseOriginalMessageTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterUseOriginalMessageTest.java new file mode 100644 index 00000000000..48a6662f299 --- /dev/null +++ b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterUseOriginalMessageTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dataformat.zipfile; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.model.dataformat.ZipFileDataFormat; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +public class ZipSplitterUseOriginalMessageTest extends CamelTestSupport { + + private List<String> list1 = new ArrayList<>(); + private List<String> list2 = new ArrayList<>(); + + @Test + public void testSplitter() throws InterruptedException { + MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry"); + processZipEntry.expectedBodiesReceivedInAnyOrder("chau", "hi", "hola", "another_chiau", "another_hi"); + MockEndpoint.assertIsSatisfied(context); + + // should be the same + Arrays.deepEquals(list1.toArray(), list2.toArray()); + } + + private org.apache.camel.model.dataformat.ZipFileDataFormat multiEntryZipFormat() { + var zipFormat = new ZipFileDataFormat(); + zipFormat.setUsingIterator("true"); + return zipFormat; + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + onException(Exception.class) + // turn on original message which caused CAMEL-19670 + .useOriginalMessage(); + + // Unzip file and Split it according to FileEntry + from("file:src/test/resources/org/apache/camel/dataformat/zipfile/data?delay=1000&noop=true") + .log("Start processing big file: ${header.CamelFileName}") + .unmarshal(multiEntryZipFormat()) + .split(bodyAs(Iterator.class)).streaming() + .log("${body}") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // should be able to read the stream + String s = exchange.getMessage().getBody(String.class); + list1.add(s); + } + }) + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // should be able to read the stream again + String s = exchange.getMessage().getBody(String.class); + list2.add(s); + } + }) + .to("mock:processZipEntry") + .end() + .log("Done processing big file: ${header.CamelFileName}"); + } + }; + + } + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 3b74ff97342..8c4291246af 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -38,7 +38,6 @@ import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.StatefulService; import org.apache.camel.StreamCache; -import org.apache.camel.StreamCacheException; import org.apache.camel.impl.debugger.BacklogDebugger; import org.apache.camel.impl.debugger.BacklogTracer; import org.apache.camel.impl.debugger.DefaultBacklogTracerEventMessage; @@ -73,7 +72,6 @@ import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.support.UnitOfWorkHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; import org.apache.camel.support.service.ServiceHelper; -import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -992,68 +990,13 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In @Override public StreamCache before(Exchange exchange) throws Exception { - final Message inMessage = exchange.getIn(); - - // check if body is already cached - try { - Object body = inMessage.getBody(); - if (body == null) { - return null; - } else if (body instanceof StreamCache) { - StreamCache sc = (StreamCache) body; - // reset so the cache is ready to be used before processing - sc.reset(); - return sc; - } - } catch (Exception e) { - // lets allow Camels error handler to deal with stream cache failures - StreamCacheException tce = new StreamCacheException(null, e); - exchange.setException(tce); - // because this is stream caching error then we cannot use redelivery as the message body is corrupt - // so mark as redelivery exhausted - exchange.getExchangeExtension().setRedeliveryExhausted(true); - } - // check if we somewhere failed due to a stream caching exception - Throwable cause = exchange.getException(); - if (cause == null) { - cause = exchange.getProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, Throwable.class); - } - return tryStreamCache(exchange, inMessage, cause); - } - - private StreamCache tryStreamCache(Exchange exchange, Message inMessage, Throwable cause) { - final boolean failed = cause != null && ObjectHelper.getException(StreamCacheException.class, cause) != null; - if (!failed) { - boolean disabled = exchange.getExchangeExtension().isStreamCacheDisabled(); - if (disabled) { - return null; - } - try { - // cache the body and if we could do that replace it as the new body - StreamCache sc = strategy.cache(exchange); - if (sc != null) { - inMessage.setBody(sc); - } - return sc; - } catch (Exception e) { - // lets allow Camels error handler to deal with stream cache failures - StreamCacheException tce = new StreamCacheException(exchange.getMessage().getBody(), e); - exchange.setException(tce); - // because this is stream caching error then we cannot use redelivery as the message body is corrupt - // so mark as redelivery exhausted - exchange.getExchangeExtension().setRedeliveryExhausted(true); - } - } - return null; + return StreamCachingHelper.convertToStreamCache(strategy, exchange, exchange.getIn()); } @Override public void after(Exchange exchange, StreamCache sc) throws Exception { - Object body = exchange.getMessage().getBody(); - if (body instanceof StreamCache) { - // reset so the cache is ready to be reused after processing - ((StreamCache) body).reset(); - } + // reset cached streams so they can be read again + MessageHelper.resetStreamCache(exchange.getMessage()); } @Override diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index 002290086af..7e237e3d085 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -99,17 +99,35 @@ public class DefaultUnitOfWork implements UnitOfWork { } } + private boolean isStreamCacheInUse(Exchange exchange) { + boolean inUse = streamCachingStrategy.isEnabled(); + if (inUse) { + // the original route (from route) may have disabled stream caching + String rid = exchange.getFromRouteId(); + if (rid != null) { + Route route = exchange.getContext().getRoute(rid); + if (route != null) { + inUse = route.isStreamCaching() != null && route.isStreamCaching(); + } + } + } + return inUse; + } + private void doOnPrepare(Exchange exchange) { // unit of work is reused, so setup for this exchange this.exchange = exchange; if (allowUseOriginalMessage) { this.originalInMessage = exchange.getIn().copy(); - if (streamCachingStrategy.isEnabled()) { - // if the input body is streaming we need to cache it, so we can access the original input message - StreamCache cache = streamCachingStrategy.cache(this.originalInMessage); + if (isStreamCacheInUse(exchange)) { + // if the input body is streaming we need to cache it, so we can access the original input message (like stream caching advice does) + StreamCache cache + = StreamCachingHelper.convertToStreamCache(streamCachingStrategy, exchange, this.originalInMessage); if (cache != null) { this.originalInMessage.setBody(cache); + // replace original incoming message with stream cache + this.exchange.getIn().setBody(cache); } } } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java new file mode 100644 index 00000000000..63fba73c035 --- /dev/null +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; +import org.apache.camel.Message; +import org.apache.camel.StreamCache; +import org.apache.camel.StreamCacheException; +import org.apache.camel.spi.StreamCachingStrategy; +import org.apache.camel.util.ObjectHelper; + +/** + * Helper for {@link org.apache.camel.StreamCache} in Camel route engine. + */ +final class StreamCachingHelper { + + private StreamCachingHelper() { + } + + public static StreamCache convertToStreamCache(StreamCachingStrategy strategy, Exchange exchange, Message message) { + // check if body is already cached + try { + Object body = message.getBody(); + if (body == null) { + return null; + } else if (body instanceof StreamCache) { + StreamCache sc = (StreamCache) body; + // reset so the cache is ready to be used before processing + sc.reset(); + return sc; + } + } catch (Exception e) { + // lets allow Camels error handler to deal with stream cache failures + StreamCacheException tce = new StreamCacheException(null, e); + exchange.setException(tce); + // because this is stream caching error then we cannot use redelivery as the message body is corrupt + // so mark as redelivery exhausted + exchange.getExchangeExtension().setRedeliveryExhausted(true); + } + // check if we somewhere failed due to a stream caching exception + Throwable cause = exchange.getException(); + if (cause == null) { + cause = exchange.getProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, Throwable.class); + } + return tryStreamCache(strategy, exchange, message, cause); + } + + private static StreamCache tryStreamCache( + StreamCachingStrategy strategy, Exchange exchange, Message inMessage, Throwable cause) { + final boolean failed = cause != null && ObjectHelper.getException(StreamCacheException.class, cause) != null; + if (!failed) { + boolean disabled = exchange.getExchangeExtension().isStreamCacheDisabled(); + if (disabled) { + return null; + } + try { + // cache the body and if we could do that replace it as the new body + StreamCache sc = strategy.cache(exchange); + if (sc != null) { + inMessage.setBody(sc); + } + return sc; + } catch (Exception e) { + // lets allow Camels error handler to deal with stream cache failures + StreamCacheException tce = new StreamCacheException(exchange.getMessage().getBody(), e); + exchange.setException(tce); + // because this is stream caching error then we cannot use redelivery as the message body is corrupt + // so mark as redelivery exhausted + exchange.getExchangeExtension().setRedeliveryExhausted(true); + } + } + return null; + } + +} diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/deadLetterChannel.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/deadLetterChannel.json index 9566fa14230..2b8b46b1d39 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/deadLetterChannel.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/deadLetterChannel.json @@ -17,8 +17,8 @@ "loggerRef": { "index": 2, "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "index": 3, "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "index": 4, "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] - "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] + "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] + "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] "onRedeliveryRef": { "index": 7, "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "index": 8, "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any excep [...] "onPrepareFailureRef": { "index": 9, "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter [...] diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/defaultErrorHandler.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/defaultErrorHandler.json index 4356476af29..177f147284f 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/defaultErrorHandler.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/defaultErrorHandler.json @@ -15,8 +15,8 @@ "loggerRef": { "index": 0, "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "index": 1, "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "index": 2, "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "index": 3, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] - "useOriginalBody": { "index": 4, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] + "useOriginalMessage": { "index": 3, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] + "useOriginalBody": { "index": 4, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] "onRedeliveryRef": { "index": 5, "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "index": 6, "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any excep [...] "onPrepareFailureRef": { "index": 7, "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter [...] diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/jtaTransactionErrorHandler.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/jtaTransactionErrorHandler.json index d7f79d23591..fec43a80fe0 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/jtaTransactionErrorHandler.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/jtaTransactionErrorHandler.json @@ -17,8 +17,8 @@ "loggerRef": { "index": 2, "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "index": 3, "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "index": 4, "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] - "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] + "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] + "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] "onRedeliveryRef": { "index": 7, "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "index": 8, "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any excep [...] "onPrepareFailureRef": { "index": 9, "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter [...] diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/springTransactionErrorHandler.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/springTransactionErrorHandler.json index 4e000eaa676..d2263353c52 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/springTransactionErrorHandler.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/springTransactionErrorHandler.json @@ -17,8 +17,8 @@ "loggerRef": { "index": 2, "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "index": 3, "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "index": 4, "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] - "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] + "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] + "useOriginalBody": { "index": 6, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] "onRedeliveryRef": { "index": 7, "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "index": 8, "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any excep [...] "onPrepareFailureRef": { "index": 9, "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter [...] diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/onCompletion.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/onCompletion.json index 8841728e19b..22dbfddc6b8 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/onCompletion.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/onCompletion.json @@ -17,7 +17,7 @@ "onFailureOnly": { "index": 2, "kind": "attribute", "displayName": "On Failure Only", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will only synchronize when the org.apache.camel.Exchange ended with failure (exception or FAULT message)." }, "parallelProcessing": { "index": 3, "kind": "attribute", "displayName": "Parallel Processing", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled then the on completion process will run asynchronously by a separate thread from a thread pool. By default this is false, meaning the on completion process will run synchronously using the same [...] "executorService": { "index": 4, "kind": "attribute", "displayName": "Executor Service", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well." }, - "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input message body when an org.apache.camel.Exchange for this on completion. By default this feature is off." }, + "useOriginalMessage": { "index": 5, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input message body when an org.apache.camel.Exchange for this on completion. The original input message is defensively copied, and the copied message body is converted to org.apache [...] "onWhen": { "index": 6, "kind": "element", "displayName": "On When", "required": false, "type": "object", "javaType": "org.apache.camel.model.WhenDefinition", "deprecated": false, "autowired": false, "secret": false, "asPredicate": true, "description": "Sets an additional predicate that should be true before the onCompletion is triggered. To be used for fine grained controlling whether a completion callback should be invoked or not" }, "outputs": { "index": 7, "kind": "element", "displayName": "Outputs", "required": true, "type": "array", "javaType": "java.util.List", "oneOf": [ "aggregate", "bean", "choice", "circuitBreaker", "claimCheck", "convertBodyTo", "delay", "doCatch", "doFinally", "doTry", "dynamicRouter", "enrich", "filter", "idempotentConsumer", "intercept", "interceptFrom", "interceptSendToEndpoint", "kamelet", "loadBalance", "log", "loop", "marshal", "multicast", "onCompletion", "onException", "onFallb [...] "disabled": { "index": 8, "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." }, diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/onException.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/onException.json index b06d3c11a0f..8f0408d418d 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/onException.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/onException.json @@ -21,8 +21,8 @@ "continued": { "index": 6, "kind": "expression", "displayName": "Continued", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.model.ExpressionSubElementDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired" [...] "onRedeliveryRef": { "index": 7, "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "index": 8, "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception throw [...] - "useOriginalMessage": { "index": 9, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] - "useOriginalBody": { "index": 10, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] + "useOriginalMessage": { "index": 9, "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attem [...] + "useOriginalBody": { "index": 10, "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have [...] "outputs": { "index": 11, "kind": "element", "displayName": "Outputs", "required": true, "type": "array", "javaType": "java.util.List", "oneOf": [ "aggregate", "bean", "choice", "circuitBreaker", "claimCheck", "convertBodyTo", "delay", "doCatch", "doFinally", "doTry", "dynamicRouter", "enrich", "filter", "idempotentConsumer", "intercept", "interceptFrom", "interceptSendToEndpoint", "kamelet", "loadBalance", "log", "loop", "marshal", "multicast", "onCompletion", "onException", "onFall [...] "disabled": { "index": 12, "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." }, "id": { "index": 13, "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java index b664cd7f9fa..ce660728732 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java @@ -215,8 +215,10 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit * Will use the original input message body when an {@link org.apache.camel.Exchange} for this on completion. * <p/> * The original input message is defensively copied, and the copied message body is converted to - * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is - * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be + * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route), + * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache} + * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body. + * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be * able to re-read when accessed later. * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current @@ -231,12 +233,42 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit * By default this feature is off. * * @return the builder + * @deprecated use {@link #useOriginalMessage()} */ + @Deprecated public OnCompletionDefinition useOriginalBody() { setUseOriginalMessage(Boolean.toString(true)); return this; } + /** + * Will use the original input message when an {@link org.apache.camel.Exchange} for this on completion. + * <p/> + * The original input message is defensively copied, and the copied message body is converted to + * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route), + * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache} + * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body. + * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be + * able to re-read when accessed later. + * <p/> + * <b>Important:</b> The original input means the input message that are bounded by the current + * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they + * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints + * such as JMS or HTTP then the consumer will create a new unit of work, with the message it received as input as + * the original input. Also some EIP patterns such as splitter, multicast, will create a new unit of work boundary + * for the messages in their sub-route (eg the split message); however these EIPs have an option named + * <tt>shareUnitOfWork</tt> which allows to combine with the parent unit of work in regard to error handling and + * therefore use the parent original message. + * <p/> + * By default this feature is off. + * + * @return the builder + */ + public OnCompletionDefinition useOriginalMessage() { + setUseOriginalMessage(Boolean.toString(true)); + return this; + } + /** * To use a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel * processing is automatic implied, and you do not have to enable that option as well. @@ -346,7 +378,25 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit /** * Will use the original input message body when an {@link org.apache.camel.Exchange} for this on completion. * <p/> + * The original input message is defensively copied, and the copied message body is converted to + * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route), + * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache} + * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body. + * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be + * able to re-read when accessed later. + * <p/> + * <b>Important:</b> The original input means the input message that are bounded by the current + * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they + * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints + * such as JMS or HTTP then the consumer will create a new unit of work, with the message it received as input as + * the original input. Also some EIP patterns such as splitter, multicast, will create a new unit of work boundary + * for the messages in their sub-route (eg the split message); however these EIPs have an option named + * <tt>shareUnitOfWork</tt> which allows to combine with the parent unit of work in regard to error handling and + * therefore use the parent original message. + * <p/> * By default this feature is off. + * + * @return the builder */ public void setUseOriginalMessage(String useOriginalMessage) { this.useOriginalMessage = useOriginalMessage; diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java index bd7e1b07556..592b76a10af 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java @@ -668,8 +668,10 @@ public class OnExceptionDefinition extends OutputDefinition<OnExceptionDefinitio * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> * The original input message is defensively copied, and the copied message body is converted to - * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is - * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be + * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route), + * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache} + * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body. + * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be * able to re-read when accessed later. * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current @@ -710,8 +712,10 @@ public class OnExceptionDefinition extends OutputDefinition<OnExceptionDefinitio * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> * The original input message is defensively copied, and the copied message body is converted to - * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is - * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be + * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route), + * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache} + * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body. + * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be * able to re-read when accessed later. * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java index d1d3b69df8c..722ea738d84 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java @@ -199,8 +199,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition { * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> * The original input message is defensively copied, and the copied message body is converted to - * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is - * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be + * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route), + * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache} + * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body. + * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be * able to re-read when accessed later. * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current @@ -241,8 +243,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition { * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> * The original input message is defensively copied, and the copied message body is converted to - * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is - * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be + * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route), + * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache} + * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body. + * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be * able to re-read when accessed later. * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current @@ -697,8 +701,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition { * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> * The original input message is defensively copied, and the copied message body is converted to - * {@link org.apache.camel.StreamCache} if possible, to ensure the body can be read when the original message is - * being used later. If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be + * {@link org.apache.camel.StreamCache} if possible (stream caching is enabled, can be disabled globally or on the original route), + * to ensure the body can be read when the original message is being used later. If the body is converted to {@link org.apache.camel.StreamCache} + * then the message body on the current {@link org.apache.camel.Exchange} is replaced with the {@link org.apache.camel.StreamCache} body. + * If the body is not converted to {@link org.apache.camel.StreamCache} then the body will not be * able to re-read when accessed later. * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTwoTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTwoTest.java new file mode 100644 index 00000000000..86045242b3d --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTwoTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.onexception; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.StreamCache; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.converter.IOConverter; +import org.apache.camel.spi.DataFormat; +import org.apache.camel.support.service.ServiceSupport; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class OnExceptionUseOriginalMessageStreamTwoTest extends ContextTestSupport { + + private final List<String> list1 = new ArrayList<>(); + private final List<String> list2 = new ArrayList<>(); + + @Test + void convertUseOriginalMessage() { + String data = "data"; + InputStream is = new ByteArrayInputStream(data.getBytes()); + template.sendBody("direct:start", is); + + Assertions.assertEquals(list1.get(0), list2.get(0)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(Exception.class) + .useOriginalMessage() + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Assertions.assertTrue(exchange.getMessage().getBody() instanceof StreamCache); + String s = exchange.getMessage().getBody(String.class); + list1.add(s); + } + }) + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Assertions.assertTrue(exchange.getMessage().getBody() instanceof StreamCache); + String s = exchange.getMessage().getBody(String.class); + list2.add(s); + } + }) + .handled(true); + + from("direct:start") + .unmarshal(new MyDataFormat()); + } + }; + } + + public static class MyDataFormatException extends Exception { + + public MyDataFormatException(String message) { + super(message); + } + } + + public class MyDataFormat extends ServiceSupport implements DataFormat { + + @Override + public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception { + // noop + } + + @Override + public Object unmarshal(Exchange exchange, InputStream stream) throws Exception { + // simulate reading the entire stream so its not re-readable later + String s = IOConverter.toString(stream, exchange); + throw new MyDataFormatException(s); + } + } +}