This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0d8117e794fff41df3759a6edd409cea2fdefa34 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Aug 23 20:43:20 2020 +0200 CAMEL-15450: ClusteredRoutePolicyFactory throw NullPointerException when adding a route to CamelContext that has been started --- .../apache/camel/cluster/CamelClusterService.java | 4 +- .../camel/impl/cluster/ClusteredRoutePolicy.java | 33 +++- .../cluster/ClusteredRoutePolicyFactoryTest.java | 191 +++++++++++++++++++++ .../camel/cluster/ClusteredRoutePolicyTest.java | 190 ++++++++++++++++++++ 4 files changed, 407 insertions(+), 11 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/cluster/CamelClusterService.java b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelClusterService.java index e833987..61e1b12 100644 --- a/core/camel-api/src/main/java/org/apache/camel/cluster/CamelClusterService.java +++ b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelClusterService.java @@ -47,7 +47,7 @@ public interface CamelClusterService extends Service, CamelContextAware, IdAware /** * Release a view if it has no references. * - * @param view the view. + * @param view the view. */ void releaseView(CamelClusterView view) throws Exception; @@ -69,7 +69,7 @@ public interface CamelClusterService extends Service, CamelContextAware, IdAware /** * Check if the service is the leader on the given namespace. * - * @param namespace the namespace. + * @param namespace the namespace. */ boolean isLeader(String namespace); diff --git a/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java b/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java index 85a603a..3787b09 100644 --- a/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java +++ b/core/camel-cluster/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java @@ -47,7 +47,7 @@ import org.apache.camel.util.ReferenceCount; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@ManagedResource(description = "Clustered Route policy using") +@ManagedResource(description = "Clustered Route policy") public final class ClusteredRoutePolicy extends RoutePolicySupport implements CamelContextAware { private static final Logger LOG = LoggerFactory.getLogger(ClusteredRoutePolicy.class); @@ -64,6 +64,7 @@ public final class ClusteredRoutePolicy extends RoutePolicySupport implements Ca private final CamelClusterService.Selector clusterServiceSelector; private CamelClusterService clusterService; private CamelClusterView clusterView; + private volatile boolean clusterViewAddListenerDone; private Duration initialDelay; private ScheduledExecutorService executorService; @@ -105,11 +106,13 @@ public final class ClusteredRoutePolicy extends RoutePolicySupport implements Ca try { // Remove event listener - clusterView.removeEventListener(leadershipEventListener); + if (clusterView != null) { + clusterView.removeEventListener(leadershipEventListener); - // If all the routes have been shut down then the view and its - // resources can eventually be released. - clusterView.getClusterService().releaseView(clusterView); + // If all the routes have been shut down then the view and its + // resources can eventually be released. + clusterView.getClusterService().releaseView(clusterView); + } } catch (Exception e) { throw new RuntimeException(e); } finally { @@ -193,6 +196,10 @@ public final class ClusteredRoutePolicy extends RoutePolicySupport implements Ca clusterService.getClass().getName()); clusterView = clusterService.getView(namespace); + if (!clusterViewAddListenerDone) { + clusterView.addEventListener(leadershipEventListener); + clusterViewAddListenerDone = true; + } } @Override @@ -288,11 +295,19 @@ public final class ClusteredRoutePolicy extends RoutePolicySupport implements Ca } private void onCamelContextStarted() { - LOG.debug("Apply cluster policy (stopped-routes='{}', started-routes='{}')", - stoppedRoutes.stream().map(Route::getId).collect(Collectors.joining(",")), - startedRoutes.stream().map(Route::getId).collect(Collectors.joining(","))); + if (LOG.isDebugEnabled()) { + LOG.debug("Apply cluster policy (stopped-routes='{}', started-routes='{}')", + stoppedRoutes.stream().map(Route::getId).collect(Collectors.joining(",")), + startedRoutes.stream().map(Route::getId).collect(Collectors.joining(","))); + } - clusterView.addEventListener(leadershipEventListener); + if (clusterView != null && !clusterViewAddListenerDone) { + clusterView.addEventListener(leadershipEventListener); + clusterViewAddListenerDone = true; + } else { + // cluster view is not initialized yet, so lets add its listener in doStart + clusterViewAddListenerDone = false; + } } // **************************************************** diff --git a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyFactoryTest.java b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyFactoryTest.java new file mode 100644 index 0000000..1c51166 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyFactoryTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.cluster; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.cluster.ClusteredRoutePolicyFactory; +import org.apache.camel.support.cluster.AbstractCamelClusterService; +import org.apache.camel.support.cluster.AbstractCamelClusterView; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ClusteredRoutePolicyFactoryTest extends ContextTestSupport { + + private ClusteredRoutePolicyFactory factory; + private TestClusterService cs; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + cs = new TestClusterService("my-cluster-service"); + context.addService(cs); + + factory = ClusteredRoutePolicyFactory.forNamespace("my-ns"); + context.addRoutePolicyFactory(factory); + + return context; + } + + @Test + public void testClusteredRoutePolicyFactory() throws Exception { + // route is stopped as we are not leader yet + assertEquals(ServiceStatus.Stopped, context.getRouteController().getRouteStatus("foo")); + + MockEndpoint mock = getMockEndpoint("mock:foo"); + mock.expectedBodiesReceived("Hello Foo"); + + cs.getView().setLeader(true); + + template.sendBody("seda:foo", "Hello Foo"); + + assertMockEndpointsSatisfied(); + + assertEquals(ServiceStatus.Started, context.getRouteController().getRouteStatus("foo")); + } + + @Test + public void testClusteredRoutePolicyFactoryAddRoute() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:bar").routeId("bar") + .to("mock:bar"); + } + }); + + // route is stopped as we are not leader yet + assertEquals(ServiceStatus.Stopped, context.getRouteController().getRouteStatus("foo")); + assertEquals(ServiceStatus.Stopped, context.getRouteController().getRouteStatus("bar")); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello Foo"); + getMockEndpoint("mock:bar").expectedBodiesReceived("Hello Bar"); + + cs.getView().setLeader(true); + + template.sendBody("seda:foo", "Hello Foo"); + template.sendBody("seda:bar", "Hello Bar"); + + assertMockEndpointsSatisfied(); + + assertEquals(ServiceStatus.Started, context.getRouteController().getRouteStatus("foo")); + assertEquals(ServiceStatus.Started, context.getRouteController().getRouteStatus("bar")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo").routeId("foo") + .to("mock:foo"); + } + }; + } + + // ********************************* + // Helpers + // ********************************* + + private static class TestClusterView extends AbstractCamelClusterView { + private boolean leader; + + public TestClusterView(CamelClusterService cluster, String namespace) { + super(cluster, namespace); + } + + @Override + public Optional<CamelClusterMember> getLeader() { + return leader ? Optional.of(getLocalMember()) : Optional.empty(); + } + + @Override + public CamelClusterMember getLocalMember() { + return new CamelClusterMember() { + @Override + public boolean isLeader() { + return leader; + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public String getId() { + return getClusterService().getId(); + } + }; + } + + @Override + public List<CamelClusterMember> getMembers() { + return Collections.emptyList(); + } + + @Override + protected void doStart() throws Exception { + } + + @Override + protected void doStop() throws Exception { + } + + public boolean isLeader() { + return leader; + } + + public void setLeader(boolean leader) { + this.leader = leader; + + if (isRunAllowed()) { + fireLeadershipChangedEvent(getLeader()); + } + } + } + + private static class TestClusterService extends AbstractCamelClusterService<TestClusterView> { + + private TestClusterView view; + + public TestClusterService(String id) { + super(id); + } + + @Override + protected TestClusterView createView(String namespace) throws Exception { + if (view == null) { + view = new TestClusterView(this, namespace); + } + return view; + } + + public TestClusterView getView() { + return view; + } + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java new file mode 100644 index 0000000..8714eef --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/cluster/ClusteredRoutePolicyTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.cluster; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.cluster.ClusteredRoutePolicy; +import org.apache.camel.support.cluster.AbstractCamelClusterService; +import org.apache.camel.support.cluster.AbstractCamelClusterView; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ClusteredRoutePolicyTest extends ContextTestSupport { + + private ClusteredRoutePolicy policy; + private TestClusterService cs; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + cs = new TestClusterService("my-cluster-service"); + context.addService(cs); + + policy = ClusteredRoutePolicy.forNamespace("my-ns"); + + return context; + } + + @Test + public void testClusteredRoutePolicyFactory() throws Exception { + // route is stopped as we are not leader yet + assertEquals(ServiceStatus.Stopped, context.getRouteController().getRouteStatus("foo")); + + MockEndpoint mock = getMockEndpoint("mock:foo"); + mock.expectedBodiesReceived("Hello Foo"); + + cs.getView().setLeader(true); + + template.sendBody("seda:foo", "Hello Foo"); + + assertMockEndpointsSatisfied(); + + assertEquals(ServiceStatus.Started, context.getRouteController().getRouteStatus("foo")); + } + + @Test + public void testClusteredRoutePolicyFactoryAddRoute() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:bar").routeId("bar").routePolicy(policy) + .to("mock:bar"); + } + }); + + // route is stopped as we are not leader yet + assertEquals(ServiceStatus.Stopped, context.getRouteController().getRouteStatus("foo")); + assertEquals(ServiceStatus.Stopped, context.getRouteController().getRouteStatus("bar")); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello Foo"); + getMockEndpoint("mock:bar").expectedBodiesReceived("Hello Bar"); + + cs.getView().setLeader(true); + + template.sendBody("seda:foo", "Hello Foo"); + template.sendBody("seda:bar", "Hello Bar"); + + assertMockEndpointsSatisfied(); + + assertEquals(ServiceStatus.Started, context.getRouteController().getRouteStatus("foo")); + assertEquals(ServiceStatus.Started, context.getRouteController().getRouteStatus("bar")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo").routeId("foo").routePolicy(policy) + .to("mock:foo"); + } + }; + } + + // ********************************* + // Helpers + // ********************************* + + private static class TestClusterView extends AbstractCamelClusterView { + private boolean leader; + + public TestClusterView(CamelClusterService cluster, String namespace) { + super(cluster, namespace); + } + + @Override + public Optional<CamelClusterMember> getLeader() { + return leader ? Optional.of(getLocalMember()) : Optional.empty(); + } + + @Override + public CamelClusterMember getLocalMember() { + return new CamelClusterMember() { + @Override + public boolean isLeader() { + return leader; + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public String getId() { + return getClusterService().getId(); + } + }; + } + + @Override + public List<CamelClusterMember> getMembers() { + return Collections.emptyList(); + } + + @Override + protected void doStart() throws Exception { + } + + @Override + protected void doStop() throws Exception { + } + + public boolean isLeader() { + return leader; + } + + public void setLeader(boolean leader) { + this.leader = leader; + + if (isRunAllowed()) { + fireLeadershipChangedEvent(getLeader()); + } + } + } + + private static class TestClusterService extends AbstractCamelClusterService<TestClusterView> { + + private TestClusterView view; + + public TestClusterService(String id) { + super(id); + } + + @Override + protected TestClusterView createView(String namespace) throws Exception { + if (view == null) { + view = new TestClusterView(this, namespace); + } + return view; + } + + public TestClusterView getView() { + return view; + } + } +}