This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0d8117e794fff41df3759a6edd409cea2fdefa34
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Aug 23 20:43:20 2020 +0200

    CAMEL-15450: ClusteredRoutePolicyFactory throw NullPointerException when 
adding a route to CamelContext that has been started
---
 .../apache/camel/cluster/CamelClusterService.java  |   4 +-
 .../camel/impl/cluster/ClusteredRoutePolicy.java   |  33 +++-
 .../cluster/ClusteredRoutePolicyFactoryTest.java   | 191 +++++++++++++++++++++
 .../camel/cluster/ClusteredRoutePolicyTest.java    | 190 ++++++++++++++++++++
 4 files changed, 407 insertions(+), 11 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/cluster/CamelClusterService.java
 
b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelClusterService.java
index e833987..61e1b12 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/cluster/CamelClusterService.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelClusterService.java
@@ -47,7 +47,7 @@ public interface CamelClusterService extends Service, 
CamelContextAware, IdAware
     /**
      * Release a view if it has no references.
      *
-     * @param  view      the view.
+     * @param view the view.
      */
     void releaseView(CamelClusterView view) throws Exception;
 
@@ -69,7 +69,7 @@ public interface CamelClusterService extends Service, 
CamelContextAware, IdAware
     /**
      * Check if the service is the leader on the given namespace.
      *
-     * @param  namespace the namespace.
+     * @param namespace the namespace.
      */
     boolean isLeader(String namespace);
 
diff --git 
a/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
 
b/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
index 85a603a..3787b09 100644
--- 
a/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
+++ 
b/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java
@@ -47,7 +47,7 @@ import org.apache.camel.util.ReferenceCount;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@ManagedResource(description = "Clustered Route policy using")
+@ManagedResource(description = "Clustered Route policy")
 public final class ClusteredRoutePolicy extends RoutePolicySupport implements 
CamelContextAware {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ClusteredRoutePolicy.class);
@@ -64,6 +64,7 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
     private final CamelClusterService.Selector clusterServiceSelector;
     private CamelClusterService clusterService;
     private CamelClusterView clusterView;
+    private volatile boolean clusterViewAddListenerDone;
 
     private Duration initialDelay;
     private ScheduledExecutorService executorService;
@@ -105,11 +106,13 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
 
             try {
                 // Remove event listener
-                clusterView.removeEventListener(leadershipEventListener);
+                if (clusterView != null) {
+                    clusterView.removeEventListener(leadershipEventListener);
 
-                // If all the routes have been shut down then the view and its
-                // resources can eventually be released.
-                clusterView.getClusterService().releaseView(clusterView);
+                    // If all the routes have been shut down then the view and 
its
+                    // resources can eventually be released.
+                    clusterView.getClusterService().releaseView(clusterView);
+                }
             } catch (Exception e) {
                 throw new RuntimeException(e);
             } finally {
@@ -193,6 +196,10 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
                 clusterService.getClass().getName());
 
         clusterView = clusterService.getView(namespace);
+        if (!clusterViewAddListenerDone) {
+            clusterView.addEventListener(leadershipEventListener);
+            clusterViewAddListenerDone = true;
+        }
     }
 
     @Override
@@ -288,11 +295,19 @@ public final class ClusteredRoutePolicy extends 
RoutePolicySupport implements Ca
     }
 
     private void onCamelContextStarted() {
-        LOG.debug("Apply cluster policy (stopped-routes='{}', 
started-routes='{}')",
-                
stoppedRoutes.stream().map(Route::getId).collect(Collectors.joining(",")),
-                
startedRoutes.stream().map(Route::getId).collect(Collectors.joining(",")));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Apply cluster policy (stopped-routes='{}', 
started-routes='{}')",
+                    
stoppedRoutes.stream().map(Route::getId).collect(Collectors.joining(",")),
+                    
startedRoutes.stream().map(Route::getId).collect(Collectors.joining(",")));
+        }
 
-        clusterView.addEventListener(leadershipEventListener);
+        if (clusterView != null && !clusterViewAddListenerDone) {
+            clusterView.addEventListener(leadershipEventListener);
+            clusterViewAddListenerDone = true;
+        } else {
+            // cluster view is not initialized yet, so lets add its listener 
in doStart
+            clusterViewAddListenerDone = false;
+        }
     }
 
     // ****************************************************
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyFactoryTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..1c51166
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyFactoryTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.cluster;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.cluster.ClusteredRoutePolicyFactory;
+import org.apache.camel.support.cluster.AbstractCamelClusterService;
+import org.apache.camel.support.cluster.AbstractCamelClusterView;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ClusteredRoutePolicyFactoryTest extends ContextTestSupport {
+
+    private ClusteredRoutePolicyFactory factory;
+    private TestClusterService cs;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        cs = new TestClusterService("my-cluster-service");
+        context.addService(cs);
+
+        factory = ClusteredRoutePolicyFactory.forNamespace("my-ns");
+        context.addRoutePolicyFactory(factory);
+
+        return context;
+    }
+
+    @Test
+    public void testClusteredRoutePolicyFactory() throws Exception {
+        // route is stopped as we are not leader yet
+        assertEquals(ServiceStatus.Stopped, 
context.getRouteController().getRouteStatus("foo"));
+
+        MockEndpoint mock = getMockEndpoint("mock:foo");
+        mock.expectedBodiesReceived("Hello Foo");
+
+        cs.getView().setLeader(true);
+
+        template.sendBody("seda:foo", "Hello Foo");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("foo"));
+    }
+
+    @Test
+    public void testClusteredRoutePolicyFactoryAddRoute() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:bar").routeId("bar")
+                        .to("mock:bar");
+            }
+        });
+
+        // route is stopped as we are not leader yet
+        assertEquals(ServiceStatus.Stopped, 
context.getRouteController().getRouteStatus("foo"));
+        assertEquals(ServiceStatus.Stopped, 
context.getRouteController().getRouteStatus("bar"));
+
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello Foo");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Hello Bar");
+
+        cs.getView().setLeader(true);
+
+        template.sendBody("seda:foo", "Hello Foo");
+        template.sendBody("seda:bar", "Hello Bar");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("foo"));
+        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("bar"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo").routeId("foo")
+                        .to("mock:foo");
+            }
+        };
+    }
+
+    // *********************************
+    // Helpers
+    // *********************************
+
+    private static class TestClusterView extends AbstractCamelClusterView {
+        private boolean leader;
+
+        public TestClusterView(CamelClusterService cluster, String namespace) {
+            super(cluster, namespace);
+        }
+
+        @Override
+        public Optional<CamelClusterMember> getLeader() {
+            return leader ? Optional.of(getLocalMember()) : Optional.empty();
+        }
+
+        @Override
+        public CamelClusterMember getLocalMember() {
+            return new CamelClusterMember() {
+                @Override
+                public boolean isLeader() {
+                    return leader;
+                }
+
+                @Override
+                public boolean isLocal() {
+                    return true;
+                }
+
+                @Override
+                public String getId() {
+                    return getClusterService().getId();
+                }
+            };
+        }
+
+        @Override
+        public List<CamelClusterMember> getMembers() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+        }
+
+        public boolean isLeader() {
+            return leader;
+        }
+
+        public void setLeader(boolean leader) {
+            this.leader = leader;
+
+            if (isRunAllowed()) {
+                fireLeadershipChangedEvent(getLeader());
+            }
+        }
+    }
+
+    private static class TestClusterService extends 
AbstractCamelClusterService<TestClusterView> {
+
+        private TestClusterView view;
+
+        public TestClusterService(String id) {
+            super(id);
+        }
+
+        @Override
+        protected TestClusterView createView(String namespace) throws 
Exception {
+            if (view == null) {
+                view = new TestClusterView(this, namespace);
+            }
+            return view;
+        }
+
+        public TestClusterView getView() {
+            return view;
+        }
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
new file mode 100644
index 0000000..8714eef
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.cluster;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.cluster.ClusteredRoutePolicy;
+import org.apache.camel.support.cluster.AbstractCamelClusterService;
+import org.apache.camel.support.cluster.AbstractCamelClusterView;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ClusteredRoutePolicyTest extends ContextTestSupport {
+
+    private ClusteredRoutePolicy policy;
+    private TestClusterService cs;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        cs = new TestClusterService("my-cluster-service");
+        context.addService(cs);
+
+        policy = ClusteredRoutePolicy.forNamespace("my-ns");
+
+        return context;
+    }
+
+    @Test
+    public void testClusteredRoutePolicyFactory() throws Exception {
+        // route is stopped as we are not leader yet
+        assertEquals(ServiceStatus.Stopped, 
context.getRouteController().getRouteStatus("foo"));
+
+        MockEndpoint mock = getMockEndpoint("mock:foo");
+        mock.expectedBodiesReceived("Hello Foo");
+
+        cs.getView().setLeader(true);
+
+        template.sendBody("seda:foo", "Hello Foo");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("foo"));
+    }
+
+    @Test
+    public void testClusteredRoutePolicyFactoryAddRoute() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:bar").routeId("bar").routePolicy(policy)
+                        .to("mock:bar");
+            }
+        });
+
+        // route is stopped as we are not leader yet
+        assertEquals(ServiceStatus.Stopped, 
context.getRouteController().getRouteStatus("foo"));
+        assertEquals(ServiceStatus.Stopped, 
context.getRouteController().getRouteStatus("bar"));
+
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello Foo");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Hello Bar");
+
+        cs.getView().setLeader(true);
+
+        template.sendBody("seda:foo", "Hello Foo");
+        template.sendBody("seda:bar", "Hello Bar");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("foo"));
+        assertEquals(ServiceStatus.Started, 
context.getRouteController().getRouteStatus("bar"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo").routeId("foo").routePolicy(policy)
+                        .to("mock:foo");
+            }
+        };
+    }
+
+    // *********************************
+    // Helpers
+    // *********************************
+
+    private static class TestClusterView extends AbstractCamelClusterView {
+        private boolean leader;
+
+        public TestClusterView(CamelClusterService cluster, String namespace) {
+            super(cluster, namespace);
+        }
+
+        @Override
+        public Optional<CamelClusterMember> getLeader() {
+            return leader ? Optional.of(getLocalMember()) : Optional.empty();
+        }
+
+        @Override
+        public CamelClusterMember getLocalMember() {
+            return new CamelClusterMember() {
+                @Override
+                public boolean isLeader() {
+                    return leader;
+                }
+
+                @Override
+                public boolean isLocal() {
+                    return true;
+                }
+
+                @Override
+                public String getId() {
+                    return getClusterService().getId();
+                }
+            };
+        }
+
+        @Override
+        public List<CamelClusterMember> getMembers() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+        }
+
+        public boolean isLeader() {
+            return leader;
+        }
+
+        public void setLeader(boolean leader) {
+            this.leader = leader;
+
+            if (isRunAllowed()) {
+                fireLeadershipChangedEvent(getLeader());
+            }
+        }
+    }
+
+    private static class TestClusterService extends 
AbstractCamelClusterService<TestClusterView> {
+
+        private TestClusterView view;
+
+        public TestClusterService(String id) {
+            super(id);
+        }
+
+        @Override
+        protected TestClusterView createView(String namespace) throws 
Exception {
+            if (view == null) {
+                view = new TestClusterView(this, namespace);
+            }
+            return view;
+        }
+
+        public TestClusterView getView() {
+            return view;
+        }
+    }
+}

Reply via email to