http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java new file mode 100644 index 0000000..8380147 --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent; +import org.apache.camel.component.kubernetes.ha.lock.TimedLeaderNotifier; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test the behavior of the timed notifier. + */ +public class TimedLeaderNotifierTest { + + private TimedLeaderNotifier notifier; + + private volatile Optional<String> currentLeader; + + private volatile Set<String> currentMembers; + + @Before + public void init() throws Exception { + this.notifier = new TimedLeaderNotifier(e -> { + if (e instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) { + currentLeader = ((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) e).getData(); + } else if (e instanceof KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) { + currentMembers = ((KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) e).getData(); + } + }); + this.notifier.start(); + } + + @After + public void destroy() throws Exception { + this.notifier.stop(); + } + + @Test + public void testMultipleCalls() throws Exception { + Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three")); + notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members); + notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 50L, members); + notifier.refreshLeadership(Optional.of("three"), System.currentTimeMillis(), 5000L, members); + Thread.sleep(80); + assertEquals(Optional.of("three"), currentLeader); + assertEquals(members, currentMembers); + } + + @Test + public void testExpiration() throws Exception { + Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three")); + notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members); + notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 50L, members); + Thread.sleep(160); + assertEquals(Optional.empty(), currentLeader); + assertEquals(members, currentMembers); + notifier.refreshLeadership(Optional.of("three"), System.currentTimeMillis(), 5000L, members); + Thread.sleep(80); + assertEquals(Optional.of("three"), currentLeader); + assertEquals(members, currentMembers); + } + + @Test + public void testMemberChanging() throws Exception { + Set<String> members1 = Collections.singleton("one"); + Set<String> members2 = new TreeSet<>(Arrays.asList("one", "two")); + notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members1); + notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 5000L, members2); + Thread.sleep(80); + assertEquals(Optional.of("two"), currentLeader); + assertEquals(members2, currentMembers); + } + + @Test + public void testOldData() throws Exception { + Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three")); + notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 1000L, members); + Thread.sleep(80); + notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis() - 1000, 900L, members); + Thread.sleep(80); + assertEquals(Optional.empty(), currentLeader); + } + + @Test + public void testNewLeaderEmpty() throws Exception { + Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three")); + notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 1000L, members); + Thread.sleep(80); + notifier.refreshLeadership(Optional.empty(), null, null, members); + Thread.sleep(80); + assertEquals(Optional.empty(), currentLeader); + } + +}
http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java index 6670f37..7d7147b 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java @@ -28,16 +28,21 @@ import org.apache.camel.ha.CamelClusterEventListener; import org.apache.camel.ha.CamelClusterMember; import org.apache.camel.ha.CamelClusterView; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Records leadership changes and allow to do assertions. */ public class LeaderRecorder implements CamelClusterEventListener.Leadership { + private static final Logger LOG = LoggerFactory.getLogger(LeaderRecorder.class); + private List<LeadershipInfo> leaderships = new CopyOnWriteArrayList<>(); @Override public void leadershipChanged(CamelClusterView view, CamelClusterMember leader) { + LOG.info("Cluster view {} - leader changed to: {}", view.getLocalMember(), leader); this.leaderships.add(new LeadershipInfo(leader != null ? leader.getId() : null, System.currentTimeMillis())); } http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java index 6422e35..3dc2423 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java @@ -17,10 +17,16 @@ package org.apache.camel.component.kubernetes.ha.utils; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodListBuilder; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import io.fabric8.mockwebserver.utils.ResponseProvider; @@ -41,7 +47,15 @@ public class LockTestServer extends KubernetesMockServer { private Long delayRequests; + private Set<String> pods; + public LockTestServer(ConfigMapLockSimulator lockSimulator) { + this(lockSimulator, Collections.emptySet()); + } + + public LockTestServer(ConfigMapLockSimulator lockSimulator, Collection<String> initialPods) { + + this.pods = new TreeSet<>(initialPods); expect().get().withPath("/api/v1/namespaces/test/configmaps/" + lockSimulator.getConfigMapName()).andReply(new ResponseProvider<Object>() { ThreadLocal<Integer> responseCode = new ThreadLocal<>(); @@ -132,8 +146,9 @@ public class LockTestServer extends KubernetesMockServer { }).always(); // Other resources - expect().get().withPath("/api/v1/namespaces/test/pods").andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion("1").and().build()).always(); - expect().get().withPath("/api/v1/namespaces/test/pods?resourceVersion=1&watch=true").andUpgradeToWebSocket().open().done().always(); + expect().get().withPath("/api/v1/namespaces/test/pods").andReply(200, request -> new PodListBuilder().withNewMetadata().withResourceVersion("1").and().withItems( + getCurrentPods().stream().map(name -> new PodBuilder().withNewMetadata().withName(name).and().build()).collect(Collectors.toList()) + ).build()).always(); } @@ -145,6 +160,18 @@ public class LockTestServer extends KubernetesMockServer { this.refuseRequests = refuseRequests; } + public synchronized Collection<String> getCurrentPods() { + return new TreeSet<>(this.pods); + } + + public synchronized void removePod(String pod) { + this.pods.remove(pod); + } + + public synchronized void addPod(String pod) { + this.pods.add(pod); + } + public Long getDelayRequests() { return delayRequests; }