This is an automated email from the ASF dual-hosted git repository.

klease pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ec3d8a9d35 CAMEL-18275: prevent completion in beforeConsumer mode 
from being handed over (#8105)
0ec3d8a9d35 is described below

commit 0ec3d8a9d35cb7e3b0f162b1a92ad1553546a446
Author: klease <38634989+kle...@users.noreply.github.com>
AuthorDate: Thu Aug 4 18:08:56 2022 +0200

    CAMEL-18275: prevent completion in beforeConsumer mode from being handed 
over (#8105)
    
    * CAMEL-18275: Address issue of completions not being run in SEDA pipeline
    When synchronizations are handed over, add a method to allow some 
housekeeping
    to be performed. In the 
OnCompletionProcessor.OnCompletionSynchronizationAfterConsumer
    this method stores the routeId on the Exchange.
    
    * CAMEL-18275: prevent completions in "beforeConsumer" mode from being 
handed over
    This ensures they run before the message is forwarded to the next route.
    Add 2 unit tests based on the example from Eduard Gomoliako.
---
 .../camel/processor/OnCompletionProcessor.java     |  6 ++
 .../OnCompletionAfterChainedSedaRoutes.java        | 64 ++++++++++++++++++++++
 .../OnCompletionBeforeChainedSedaRoutes.java       | 64 ++++++++++++++++++++++
 3 files changed, 134 insertions(+)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index 8607957db2e..b7de659df48 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -408,6 +408,7 @@ public class OnCompletionProcessor extends 
AsyncProcessorSupport implements Trac
 
         @Override
         public void onAfterRoute(Route route, Exchange exchange) {
+            LOG.debug("onAfterRoute from Route {}", route.getRouteId());
             // route scope = should be from this route
             if (routeScoped && !route.getRouteId().equals(routeId)) {
                 return;
@@ -449,6 +450,11 @@ public class OnCompletionProcessor extends 
AsyncProcessorSupport implements Trac
             }
         }
 
+        @Override
+        public boolean allowHandover() {
+            return false;
+        }
+
         @Override
         public String toString() {
             return "onAfterRoute";
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAfterChainedSedaRoutes.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAfterChainedSedaRoutes.java
new file mode 100644
index 00000000000..b5a39b7af40
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAfterChainedSedaRoutes.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class OnCompletionAfterChainedSedaRoutes extends ContextTestSupport {
+
+    @Test
+    public void testOnCompletionChained() throws Exception {
+        String body = "<body>";
+
+        getMockEndpoint("mock:end").expectedBodiesReceived(body);
+        getMockEndpoint("mock:adone").expectedBodiesReceived(body);
+        getMockEndpoint("mock:bdone").expectedBodiesReceived(body);
+        getMockEndpoint("mock:cdone").expectedBodiesReceived(body);
+        getMockEndpoint("mock:ddone").expectedBodiesReceived(body);
+
+        template.sendBody("direct:a", body);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:a")
+                .onCompletion().log("a - done").to("mock:adone").end()
+                .to("seda:b");
+
+                from("seda:b")
+                .onCompletion().log("b - done").to("mock:bdone").end()
+                .to("seda:c");
+
+                from("seda:c")
+                .onCompletion().log("c - done").to("mock:cdone").end()
+                .to("seda:d");
+
+                from("seda:d")
+                .onCompletion().log("d - done").to("mock:ddone").end()
+                .to("mock:end");
+            }
+
+        };
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionBeforeChainedSedaRoutes.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionBeforeChainedSedaRoutes.java
new file mode 100644
index 00000000000..a51e5afb55e
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionBeforeChainedSedaRoutes.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class OnCompletionBeforeChainedSedaRoutes extends ContextTestSupport {
+
+    @Test
+    public void testOnCompletionChained() throws Exception {
+        String body = "<body>";
+
+        getMockEndpoint("mock:end").expectedBodiesReceived(body);
+        getMockEndpoint("mock:adone").expectedBodiesReceived(body);
+        getMockEndpoint("mock:bdone").expectedBodiesReceived(body);
+        getMockEndpoint("mock:cdone").expectedBodiesReceived(body);
+        getMockEndpoint("mock:ddone").expectedBodiesReceived(body);
+
+        template.sendBody("direct:a", body);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:a")
+                .onCompletion().modeBeforeConsumer().log("a - 
done").to("mock:adone").end()
+                .to("seda:b");
+
+                from("seda:b")
+                .onCompletion().modeBeforeConsumer().log("b - 
done").to("mock:bdone").end()
+                .to("seda:c");
+
+                from("seda:c")
+                .onCompletion().modeBeforeConsumer().log("c - 
done").to("mock:cdone").end()
+                .to("seda:d");
+
+                from("seda:d")
+                .onCompletion().modeBeforeConsumer().log("d - 
done").to("mock:ddone").end()
+                .to("mock:end");
+            }
+
+        };
+    }
+}

Reply via email to