This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.11.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 01d5882849b6e58f5c456fc4b5917d08fefa71c5 Author: Marc Eiro <marc.e...@sap.com> AuthorDate: Fri Nov 12 12:38:10 2021 +0100 CAMEL-17174: fixed issue where exchange UnitOfWork was not being cleared upon completion. --- .../camel/impl/engine/DefaultUnitOfWork.java | 11 +++ .../camel/processor/UnitOfWorkHelperTest.java | 96 ++++++++++++++++++++++ .../org/apache/camel/support/UnitOfWorkHelper.java | 1 + 3 files changed, 108 insertions(+) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index 10b4d21..80b08c3 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -256,15 +256,26 @@ public class DefaultUnitOfWork implements UnitOfWork { // the exchange is now done if (exchange instanceof PooledExchange) { + // pooled exchange has its own done logic which will reset this uow for reuse + // so do not call onDone try { ((PooledExchange) exchange).done(false); } catch (Throwable e) { // must catch exceptions to ensure synchronizations is also invoked log.warn("Exception occurred during exchange done. This exception will be ignored.", e); } + } else { + onDone(); } } + protected void onDone() { + // MUST clear and set uow to null on exchange after done + // in case the same exchange is manually reused by Camel end users (should happen seldom) + ExtendedExchange ee = (ExtendedExchange) exchange; + ee.setUnitOfWork(null); + } + @Override public void beforeRoute(Exchange exchange, Route route) { if (log.isTraceEnabled()) { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkHelperTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkHelperTest.java new file mode 100644 index 0000000..23f36b9 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/UnitOfWorkHelperTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; +import org.apache.camel.Message; +import org.apache.camel.component.direct.DirectEndpoint; +import org.apache.camel.component.file.FileComponent; +import org.apache.camel.component.file.GenericFile; +import org.apache.camel.component.file.GenericFileMessage; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spi.CamelEvent; +import org.apache.camel.spi.CamelEvent.ExchangeCreatedEvent; +import org.apache.camel.support.DefaultExchange; +import org.apache.camel.support.EventNotifierSupport; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class UnitOfWorkHelperTest extends ContextTestSupport { + + private static final String FILE_CONTENT = "Lorem ipsum dolor sit amet"; + + @EndpointInject(value = "mock:result") + protected MockEndpoint resultEndpoint; + private DirectEndpoint fromEndpoint; + + private CustomEventNotifier eventNotifier; + private int numberOfExchangeCreatedEvents; + + @Test + void testUoWShouldBeClearedOnJobDone() throws Exception { + eventNotifier = new CustomEventNotifier(); + context.getManagementStrategy().addEventNotifier(eventNotifier); + Exchange testExchange = createExchange("testFile"); + + template.send("direct:from", testExchange); + template.send("direct:from", testExchange); + + assertEquals(2, numberOfExchangeCreatedEvents); + } + + private Exchange createExchange(String fileName) { + Exchange testExchange = new DefaultExchange(context); + + GenericFile<String> testFile = createFile(fileName); + Message testMessage = new GenericFileMessage<String>(testExchange, testFile); + testMessage.setBody(testFile); + + testExchange.setIn(testMessage); + ExtendedExchange extExchange = testExchange.adapt(ExtendedExchange.class); + extExchange.setFromEndpoint(fromEndpoint); + testExchange.setProperty(FileComponent.FILE_EXCHANGE_FILE, testFile); + + return testExchange; + } + + private GenericFile<String> createFile(final String fileName) { + GenericFile<String> testFile = new GenericFile<String>(); + + testFile.setFile(FILE_CONTENT); + testFile.setAbsoluteFilePath(fileName); + testFile.setBody(FILE_CONTENT); + + return testFile; + } + + public class CustomEventNotifier extends EventNotifierSupport { + + @Override + public void notify(final CamelEvent event) { + if (event instanceof ExchangeCreatedEvent) { + numberOfExchangeCreatedEvents += 1; + } + } + } + +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java index 47309a4..34b5d32 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Route; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.SynchronizationRouteAware;