This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4b86fcd88241de1550ba2fa1c1fffec161c984a8 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Jul 30 06:01:01 2019 +0200 camel3 - Remove sub unit of work as its no longer needed/in-use as the EIPs has been refactored to handle this better. --- .../java/org/apache/camel/spi/SubUnitOfWork.java | 45 ----------- .../apache/camel/spi/SubUnitOfWorkCallback.java | 56 -------------- .../main/java/org/apache/camel/spi/UnitOfWork.java | 32 +------- .../camel/impl/engine/DefaultSubUnitOfWork.java | 71 ----------------- .../camel/impl/engine/DefaultUnitOfWork.java | 88 ---------------------- .../camel/processor/CamelInternalProcessor.java | 19 ----- .../errorhandler/RedeliveryErrorHandler.java | 21 +----- .../apache/camel/model/MulticastDefinition.java | 1 - .../camel/model/RecipientListDefinition.java | 1 - .../org/apache/camel/model/SplitDefinition.java | 1 - 10 files changed, 3 insertions(+), 332 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/SubUnitOfWork.java b/core/camel-api/src/main/java/org/apache/camel/spi/SubUnitOfWork.java deleted file mode 100644 index cf6a7a3..0000000 --- a/core/camel-api/src/main/java/org/apache/camel/spi/SubUnitOfWork.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.spi; - -import java.util.List; - -/** - * A sub unit of work is a way of implement sub-transactions in Camel routing. - * This is needed by some EIPs where you can have sub routes such as the Splitter. - * The Camel end user may want to indicate that the Splitter should act as a - * <b>single combined</b> unit of work. - * - * @see SubUnitOfWorkCallback - */ -public interface SubUnitOfWork { - - /** - * Is the {@link SubUnitOfWork} marked as failed. - * - * @return <tt>true</tt> to indicate this sub unit of work is failed. - */ - boolean isFailed(); - - /** - * If failed then a number of exceptions could have occurred, causing the {@link SubUnitOfWork} to fail. - * - * @return the caused exceptions. - */ - List<Exception> getExceptions(); - -} diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/SubUnitOfWorkCallback.java b/core/camel-api/src/main/java/org/apache/camel/spi/SubUnitOfWorkCallback.java deleted file mode 100644 index 20e1458..0000000 --- a/core/camel-api/src/main/java/org/apache/camel/spi/SubUnitOfWorkCallback.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.spi; - -import org.apache.camel.Exchange; - -/** - * To allow unit of work for the {@link UnitOfWork} while processing a number of {@link Exchange}s. - * <p/> - * A sub unit of work is a way of implement sub-transactions in Camel routing. - * This is needed by some EIPs where you can have sub routes such as the Splitter. - * The Camel end user may want to indicate that the Splitter should act as a - * <b>single combined</b> unit of work. - * <p/> - * To implement this, we use this {@link SubUnitOfWorkCallback} - * which allows us to have the sub routes participate in a {@link SubUnitOfWork} - * And then the outcome of the {@link SubUnitOfWork} will be a single atomic commit or rollback. - * <p/> - * When using a {@link SubUnitOfWork} we need to tap into the sub routes, and ensure they callback with the progress - * of the sub {@link Exchange} being processed. For example the error handler, we need to tap into, and - * ensure that any exhausted sub {@link Exchange} is propagated into the result of the {@link SubUnitOfWork}. - * This {@link SubUnitOfWorkCallback} allows us to do that. - * - * @see SubUnitOfWork - */ -public interface SubUnitOfWorkCallback { - - /** - * The exchange is exhausted, by a redeliverable error handler. - * - * @param exchange the exchange - */ - void onExhausted(Exchange exchange); - - /** - * The exchange is done. - * - * @param exchange the exchange. - */ - void onDone(Exchange exchange); - -} diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java index c36f003..42bac62 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java @@ -190,7 +190,7 @@ public interface UnitOfWork extends Service { /** * Strategy for optional work to be execute before processing * <p/> - * For example the {@link org.apache.camel.impl.MDCUnitOfWork} leverages this + * For example the MDCUnitOfWork leverages this * to ensure MDC is handled correctly during routing exchanges using the * asynchronous routing engine. * @@ -214,15 +214,13 @@ public interface UnitOfWork extends Service { /** * Create a child unit of work, which is associated to this unit of work as its parent. * <p/> - * This is often used when EIPs need to support {@link SubUnitOfWork}s. For example a splitter, + * This is often used when EIPs need to support child unit of works. For example a splitter, * where the sub messages of the splitter all participate in the same sub unit of work. * That sub unit of work then decides whether the Splitter (in general) is failed or a * processed successfully. * * @param childExchange the child exchange * @return the created child unit of work - * @see SubUnitOfWork - * @see SubUnitOfWorkCallback */ UnitOfWork createChildUnitOfWork(Exchange childExchange); @@ -233,30 +231,4 @@ public interface UnitOfWork extends Service { */ void setParentUnitOfWork(UnitOfWork parentUnitOfWork); - /** - * Gets the {@link SubUnitOfWorkCallback} if this unit of work participates in a sub unit of work. - * - * @return the callback, or <tt>null</tt> if this unit of work is not part of a sub unit of work. - * @see #beginSubUnitOfWork(org.apache.camel.Exchange) - */ - SubUnitOfWorkCallback getSubUnitOfWorkCallback(); - - /** - * Begins a {@link SubUnitOfWork}, where sub (child) unit of works participate in a parent unit of work. - * The {@link SubUnitOfWork} will callback to the parent unit of work using {@link SubUnitOfWorkCallback}s. - * - * @param exchange the exchange - */ - void beginSubUnitOfWork(Exchange exchange); - - /** - * Ends a {@link SubUnitOfWork}. - * <p/> - * The {@link #beginSubUnitOfWork(org.apache.camel.Exchange)} must have been invoked - * prior to this operation. - * - * @param exchange the exchange - */ - void endSubUnitOfWork(Exchange exchange); - } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSubUnitOfWork.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSubUnitOfWork.java deleted file mode 100644 index 6fa8ee0..0000000 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSubUnitOfWork.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.impl.engine; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.camel.Exchange; -import org.apache.camel.spi.SubUnitOfWork; -import org.apache.camel.spi.SubUnitOfWorkCallback; - -/** - * A default implementation of {@link org.apache.camel.spi.SubUnitOfWork} combined - * with a {@link SubUnitOfWorkCallback} to gather callbacks into this {@link SubUnitOfWork} state - */ -public class DefaultSubUnitOfWork implements SubUnitOfWork, SubUnitOfWorkCallback { - - private List<Exception> failedExceptions; - private boolean failed; - - @Override - public void onExhausted(Exchange exchange) { - if (exchange.getException() != null) { - addFailedException(exchange.getException()); - failed = true; - } - } - - @Override - public void onDone(Exchange exchange) { - if (exchange.getException() != null) { - addFailedException(exchange.getException()); - failed = true; - } - } - - @Override - public boolean isFailed() { - return failed; - } - - @Override - public List<Exception> getExceptions() { - return failedExceptions; - } - - private void addFailedException(Exception exception) { - if (failedExceptions == null) { - failedExceptions = new ArrayList<>(); - } - if (!failedExceptions.contains(exception)) { - // avoid adding the same exception multiple times - failedExceptions.add(exception); - } - } - -} diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index 78a24f2..75508ac 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -23,21 +23,17 @@ import java.util.Deque; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; -import java.util.NoSuchElementException; import java.util.Set; import java.util.function.Predicate; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; -import org.apache.camel.CamelUnitOfWorkException; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.Service; import org.apache.camel.spi.RouteContext; -import org.apache.camel.spi.SubUnitOfWork; -import org.apache.camel.spi.SubUnitOfWorkCallback; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.SynchronizationVetoable; import org.apache.camel.spi.UnitOfWork; @@ -68,7 +64,6 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { private Message originalInMessage; private Set<Object> transactedBy; private final Deque<RouteContext> routeContextStack = new ArrayDeque<>(); - private Deque<DefaultSubUnitOfWork> subUnitOfWorks; private final transient Logger log; public DefaultUnitOfWork(Exchange exchange) { @@ -168,9 +163,6 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { transactedBy.clear(); } routeContextStack.clear(); - if (subUnitOfWorks != null) { - subUnitOfWorks.clear(); - } originalInMessage = null; parent = null; id = null; @@ -233,17 +225,6 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { // at first done the synchronizations UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, log); - // notify uow callback if in use - try { - SubUnitOfWorkCallback uowCallback = getSubUnitOfWorkCallback(); - if (uowCallback != null) { - uowCallback.onDone(exchange); - } - } catch (Throwable e) { - // must catch exceptions to ensure synchronizations is also invoked - log.warn("Exception occurred during savepoint onDone. This exception will be ignored.", e); - } - // unregister from inflight registry, before signalling we are done if (exchange.getContext() != null) { exchange.getContext().getInflightRepository().remove(exchange); @@ -328,75 +309,6 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) { } - @Override - public void beginSubUnitOfWork(Exchange exchange) { - if (log.isTraceEnabled()) { - log.trace("beginSubUnitOfWork exchangeId: {}", exchange.getExchangeId()); - } - - if (subUnitOfWorks == null) { - subUnitOfWorks = new ArrayDeque<>(); - } - subUnitOfWorks.push(new DefaultSubUnitOfWork()); - } - - @Override - public void endSubUnitOfWork(Exchange exchange) { - if (log.isTraceEnabled()) { - log.trace("endSubUnitOfWork exchangeId: {}", exchange.getExchangeId()); - } - - if (subUnitOfWorks == null || subUnitOfWorks.isEmpty()) { - return; - } - - // pop last sub unit of work as its now ended - SubUnitOfWork subUoW = null; - try { - subUoW = subUnitOfWorks.pop(); - } catch (NoSuchElementException e) { - // ignore - } - if (subUoW != null && subUoW.isFailed()) { - // the sub unit of work failed so set an exception containing all the caused exceptions - // and mark the exchange for rollback only - - // if there are multiple exceptions then wrap those into another exception with them all - Exception cause; - List<Exception> list = subUoW.getExceptions(); - if (list != null) { - if (list.size() == 1) { - cause = list.get(0); - } else { - cause = new CamelUnitOfWorkException(exchange, list); - } - exchange.setException(cause); - } - // mark it as rollback and that the unit of work is exhausted. This ensures that we do not try - // to redeliver this exception (again) - exchange.setProperty(Exchange.ROLLBACK_ONLY, true); - exchange.setProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, true); - // and remove any indications of error handled which will prevent this exception to be noticed - // by the error handler which we want to react with the result of the sub unit of work - exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, null); - exchange.setProperty(Exchange.FAILURE_HANDLED, null); - if (log.isTraceEnabled()) { - log.trace("endSubUnitOfWork exchangeId: {} with {} caused exceptions.", exchange.getExchangeId(), list != null ? list.size() : 0); - } - } - } - - @Override - public SubUnitOfWorkCallback getSubUnitOfWorkCallback() { - // if there is a parent-child relationship between unit of works - // then we should use the callback strategies from the parent - if (parent != null) { - return parent.getSubUnitOfWorkCallback(); - } - - return subUnitOfWorks != null ? subUnitOfWorks.peek() : null; - } - private Set<Object> getTransactedBy() { if (transactedBy == null) { transactedBy = new LinkedHashSet<>(); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 19d21dd..766e0c6 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -627,25 +627,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { } /** - * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality. - */ - public static class SubUnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> { - - @Override - public UnitOfWork before(Exchange exchange) throws Exception { - // begin savepoint - exchange.getUnitOfWork().beginSubUnitOfWork(exchange); - return exchange.getUnitOfWork(); - } - - @Override - public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception { - // end sub unit of work - unitOfWork.endSubUnitOfWork(exchange); - } - } - - /** * Advice when Message History has been enabled. */ @SuppressWarnings("unchecked") diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index ec43a0a..2f89916 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -40,7 +40,6 @@ import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.ExchangeFormatter; import org.apache.camel.spi.ShutdownPrepared; -import org.apache.camel.spi.SubUnitOfWorkCallback; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter; import org.apache.camel.support.AsyncProcessorConverterHelper; @@ -409,25 +408,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC) if (!redeliverAllowed || exhausted) { - Processor target = null; - boolean deliver = true; - - // the unit of work may have an optional callback associated we need to leverage - UnitOfWork uow = exchange.getUnitOfWork(); - if (uow != null) { - SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback(); - if (uowCallback != null) { - // signal to the callback we are exhausted - uowCallback.onExhausted(exchange); - // do not deliver to the failure processor as its been handled by the callback instead - deliver = false; - } - } - - if (deliver) { - // should deliver to failure processor (either from onException or the dead letter channel) - target = failureProcessor != null ? failureProcessor : deadLetter; - } + Processor target = failureProcessor != null ? failureProcessor : deadLetter; // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) boolean isDeadLetterChannel = isDeadLetterChannel() && target == deadLetter; diff --git a/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java index a7cc497..70bc7d4 100644 --- a/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java +++ b/core/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java @@ -297,7 +297,6 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i * This means each sub exchange has its own individual unit of work. * * @return the builder. - * @see org.apache.camel.spi.SubUnitOfWork */ public MulticastDefinition shareUnitOfWork() { setShareUnitOfWork(true); diff --git a/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java index 100bf72..95eec6a 100644 --- a/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java +++ b/core/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java @@ -329,7 +329,6 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext * This means each sub exchange has its own individual unit of work. * * @return the builder. - * @see org.apache.camel.spi.SubUnitOfWork */ public RecipientListDefinition<Type> shareUnitOfWork() { setShareUnitOfWork(true); diff --git a/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java index aeb5141..3743d5b 100644 --- a/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java +++ b/core/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java @@ -292,7 +292,6 @@ public class SplitDefinition extends OutputExpressionNode implements ExecutorSer * This means each splitted exchange has its own individual unit of work. * * @return the builder. - * @see org.apache.camel.spi.SubUnitOfWork */ public SplitDefinition shareUnitOfWork() { setShareUnitOfWork(true);