CAMEL-11331: Adding tests and fixing impl
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/881b9331 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/881b9331 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/881b9331 Branch: refs/heads/master Commit: 881b9331f0712b4e611087eb11e623016dd3de72 Parents: debfeed Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Fri Jul 14 11:21:50 2017 +0200 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Tue Aug 8 16:39:43 2017 +0200 ---------------------------------------------------------------------- components/camel-kubernetes/pom.xml | 6 + .../kubernetes/ha/lock/ConfigMapLockUtils.java | 2 +- ...ubernetesLeaseBasedLeadershipController.java | 15 +- .../ha/lock/KubernetesLockConfiguration.java | 6 +- .../ha/KubernetesClusterServiceTest.java | 291 +++++++++++++++++++ .../ha/utils/ConfigMapLockSimulator.java | 83 ++++++ .../kubernetes/ha/utils/LeaderRecorder.java | 115 ++++++++ .../kubernetes/ha/utils/LockTestServer.java | 175 +++++++++++ .../kubernetes/ha/utils/LockTestServerTest.java | 97 +++++++ parent/pom.xml | 1 + 10 files changed, 784 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/pom.xml b/components/camel-kubernetes/pom.xml index c444068..38fa037 100644 --- a/components/camel-kubernetes/pom.xml +++ b/components/camel-kubernetes/pom.xml @@ -67,6 +67,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>io.fabric8</groupId> + <artifactId>mockwebserver</artifactId> + <version>${mockwebserver-version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test-spring</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java index 84718f3..70fa860 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java @@ -26,7 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * Utilities for managing ConfigMaps that contain lock information. */ public final class ConfigMapLockUtils { http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java index 8e96a72..42be2e7 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java @@ -163,8 +163,9 @@ public class KubernetesLeaseBasedLeadershipController implements Service { long time = Math.min(timeRetry, timeDeadline); long delay = Math.max(0, time - System.currentTimeMillis()); - LOG.debug("Next renewal timeout event will be fired in {} seconds", delay / 1000); - return delay; + long delayJittered = jitter(delay, lockConfiguration.getJitterFactor()); + LOG.debug("Next renewal timeout event will be fired in {} seconds", delayJittered / 1000); + return delayJittered; } @@ -340,10 +341,18 @@ public class KubernetesLeaseBasedLeadershipController implements Service { private void updateLatestLeaderInfo(ConfigMap configMap) { LOG.debug("Updating internal status about the current leader"); this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, this.lockConfiguration.getGroupName()); + + // Notify about changes in current leader if any + this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader); + if (this.latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())) { + this.eventDispatcherExecutor.schedule(this::checkAndNotifyNewLeader, this.lockConfiguration.getRenewDeadlineSeconds() * 1000 + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS); + } else if (this.latestLeaderInfo.getLeader() != null) { + this.eventDispatcherExecutor.schedule(this::checkAndNotifyNewLeader, this.lockConfiguration.getLeaseDurationSeconds() * 1000 + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS); + } } private void checkAndNotifyNewLeader() { - LOG.debug("Checking if the current leader has changed to notify the event handler..."); + LOG.info("Checking if the current leader has changed to notify the event handler..."); LeaderInfo newLeaderInfo = this.latestLeaderInfo; if (newLeaderInfo == null) { return; http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java index 37e0251..6461708 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java @@ -30,9 +30,9 @@ public class KubernetesLockConfiguration implements Cloneable { public static final double DEFAULT_JITTER_FACTOR = 1.2; - public static final long DEFAULT_LEASE_DURATION_SECONDS = 20; - public static final long DEFAULT_RENEW_DEADLINE_SECONDS = 15; - public static final long DEFAULT_RETRY_PERIOD_SECONDS = 6; + public static final long DEFAULT_LEASE_DURATION_SECONDS = 60; + public static final long DEFAULT_RENEW_DEADLINE_SECONDS = 45; + public static final long DEFAULT_RETRY_PERIOD_SECONDS = 9; public static final long DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS = 5; public static final long DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS = 1800; http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java new file mode 100644 index 0000000..4baebc6 --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; + +import org.apache.camel.component.kubernetes.KubernetesConfiguration; +import org.apache.camel.component.kubernetes.ha.utils.ConfigMapLockSimulator; +import org.apache.camel.component.kubernetes.ha.utils.LeaderRecorder; +import org.apache.camel.component.kubernetes.ha.utils.LockTestServer; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test leader election scenarios using a mock server. + */ +public class KubernetesClusterServiceTest extends CamelTestSupport { + + private static final int LEASE_TIME_SECONDS = 5; + + private ConfigMapLockSimulator lockSimulator; + + private Map<String, LockTestServer> lockServers; + + @Before + public void prepareLock() { + this.lockSimulator = new ConfigMapLockSimulator("leaders"); + this.lockServers = new HashMap<>(); + } + + @After + public void shutdownLock() { + for (LockTestServer server : this.lockServers.values()) { + try { + server.destroy(); + } catch (Exception e) { + // can happen in case of delay + } + } + } + + @Test + public void testSimpleLeaderElection() throws Exception { + LeaderRecorder mypod1 = addMember("mypod1"); + LeaderRecorder mypod2 = addMember("mypod2"); + context.start(); + + mypod1.waitForAnyLeader(2, TimeUnit.SECONDS); + mypod2.waitForAnyLeader(2, TimeUnit.SECONDS); + + String leader = mypod1.getCurrentLeader(); + assertTrue(leader.startsWith("mypod")); + assertEquals("Leaders should be equals", mypod2.getCurrentLeader(), leader); + } + + @Test + public void testMultipleMembersLeaderElection() throws Exception { + int number = 5; + List<LeaderRecorder> members = IntStream.range(0, number).mapToObj(i -> addMember("mypod" + i)).collect(Collectors.toList()); + context.start(); + + for (LeaderRecorder member : members) { + member.waitForAnyLeader(2, TimeUnit.SECONDS); + } + + Set<String> leaders = members.stream().map(LeaderRecorder::getCurrentLeader).collect(Collectors.toSet()); + assertEquals(1, leaders.size()); + String leader = leaders.iterator().next(); + assertTrue(leader.startsWith("mypod")); + } + + @Test + public void testSimpleLeaderElectionWithExistingConfigMap() throws Exception { + lockSimulator.setConfigMap(new ConfigMapBuilder() + .withNewMetadata() + .withName("leaders") + .and().build(), true); + + LeaderRecorder mypod1 = addMember("mypod1"); + LeaderRecorder mypod2 = addMember("mypod2"); + context.start(); + + mypod1.waitForAnyLeader(2, TimeUnit.SECONDS); + mypod2.waitForAnyLeader(2, TimeUnit.SECONDS); + + String leader = mypod1.getCurrentLeader(); + assertTrue(leader.startsWith("mypod")); + assertEquals("Leaders should be equals", mypod2.getCurrentLeader(), leader); + } + + @Test + public void testLeadershipLoss() throws Exception { + LeaderRecorder mypod1 = addMember("mypod1"); + LeaderRecorder mypod2 = addMember("mypod2"); + context.start(); + + mypod1.waitForAnyLeader(2, TimeUnit.SECONDS); + mypod2.waitForAnyLeader(2, TimeUnit.SECONDS); + + String firstLeader = mypod1.getCurrentLeader(); + + LeaderRecorder formerLeaderRecorder = firstLeader.equals("mypod1") ? mypod1 : mypod2; + LeaderRecorder formerLoserRecorder = firstLeader.equals("mypod1") ? mypod2 : mypod1; + + refuseRequestsFromPod(firstLeader); + + formerLeaderRecorder.waitForALeaderChange(7, TimeUnit.SECONDS); + formerLoserRecorder.waitForANewLeader(firstLeader, 7, TimeUnit.SECONDS); + + String secondLeader = formerLoserRecorder.getCurrentLeader(); + assertNotEquals("The firstLeader should be different from the new one", firstLeader, secondLeader); + + Long lossTimestamp = formerLeaderRecorder.getLastTimeOf(l -> l == null); + Long gainTimestamp = formerLoserRecorder.getLastTimeOf(secondLeader::equals); + + assertTrue("At least 2 seconds must elapse from leadership loss and regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + 2000); + checkLeadershipChangeDistance(LEASE_TIME_SECONDS, TimeUnit.SECONDS, mypod1, mypod2); + } + + @Test + public void testSlowLeaderLosingLeadership() throws Exception { + LeaderRecorder mypod1 = addMember("mypod1"); + LeaderRecorder mypod2 = addMember("mypod2"); + context.start(); + + mypod1.waitForAnyLeader(2, TimeUnit.SECONDS); + mypod2.waitForAnyLeader(2, TimeUnit.SECONDS); + + String firstLeader = mypod1.getCurrentLeader(); + + LeaderRecorder formerLeaderRecorder = firstLeader.equals("mypod1") ? mypod1 : mypod2; + LeaderRecorder formerLoserRecorder = firstLeader.equals("mypod1") ? mypod2 : mypod1; + + delayRequestsFromPod(firstLeader, 10, TimeUnit.SECONDS); + + formerLeaderRecorder.waitForALeaderChange(7, TimeUnit.SECONDS); + formerLoserRecorder.waitForANewLeader(firstLeader, 7, TimeUnit.SECONDS); + + String secondLeader = formerLoserRecorder.getCurrentLeader(); + assertNotEquals("The firstLeader should be different from the new one", firstLeader, secondLeader); + + Long lossTimestamp = formerLeaderRecorder.getLastTimeOf(l -> l == null); + Long gainTimestamp = formerLoserRecorder.getLastTimeOf(secondLeader::equals); + + assertTrue("At least 2 seconds must elapse from leadership loss and regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + 2000); + checkLeadershipChangeDistance(LEASE_TIME_SECONDS, TimeUnit.SECONDS, mypod1, mypod2); + } + + @Test + public void testRecoveryAfterFailure() throws Exception { + LeaderRecorder mypod1 = addMember("mypod1"); + LeaderRecorder mypod2 = addMember("mypod2"); + context.start(); + + mypod1.waitForAnyLeader(2, TimeUnit.SECONDS); + mypod2.waitForAnyLeader(2, TimeUnit.SECONDS); + + String firstLeader = mypod1.getCurrentLeader(); + + for (int i = 0; i < 3; i++) { + refuseRequestsFromPod(firstLeader); + Thread.sleep(1000); + allowRequestsFromPod(firstLeader); + Thread.sleep(2000); + } + + assertEquals(firstLeader, mypod1.getCurrentLeader()); + assertEquals(firstLeader, mypod2.getCurrentLeader()); + } + + @Test + public void testSharedConfigMap() throws Exception { + LeaderRecorder a1 = addMember("a1"); + LeaderRecorder a2 = addMember("a2"); + LeaderRecorder b1 = addMember("b1", "app2"); + LeaderRecorder b2 = addMember("b2", "app2"); + context.start(); + + a1.waitForAnyLeader(2, TimeUnit.SECONDS); + a2.waitForAnyLeader(2, TimeUnit.SECONDS); + b1.waitForAnyLeader(2, TimeUnit.SECONDS); + b1.waitForAnyLeader(2, TimeUnit.SECONDS); + + assertNotNull(a1.getCurrentLeader()); + assertTrue(a1.getCurrentLeader().startsWith("a")); + assertEquals(a1.getCurrentLeader(), a2.getCurrentLeader()); + assertNotNull(b1.getCurrentLeader()); + assertTrue(b1.getCurrentLeader().startsWith("b")); + assertEquals(b1.getCurrentLeader(), b2.getCurrentLeader()); + + assertNotEquals(a1.getCurrentLeader(), b2.getCurrentLeader()); + } + + private void delayRequestsFromPod(String pod, long delay, TimeUnit unit) { + this.lockServers.get(pod).setDelayRequests(TimeUnit.MILLISECONDS.convert(delay, unit)); + } + + private void refuseRequestsFromPod(String pod) { + this.lockServers.get(pod).setRefuseRequests(true); + } + + private void allowRequestsFromPod(String pod) { + this.lockServers.get(pod).setRefuseRequests(false); + } + + private void checkLeadershipChangeDistance(long minimum, TimeUnit unit, LeaderRecorder... recorders) { + List<LeaderRecorder.LeadershipInfo> infos = Arrays.stream(recorders) + .flatMap(lr -> lr.getLeadershipInfo().stream()) + .sorted((li1, li2) -> Long.compare(li1.getChangeTimestamp(), li2.getChangeTimestamp())) + .collect(Collectors.toList()); + + LeaderRecorder.LeadershipInfo currentLeaderLastSeen = null; + for (LeaderRecorder.LeadershipInfo info : infos) { + if (currentLeaderLastSeen == null || currentLeaderLastSeen.getLeader() == null) { + currentLeaderLastSeen = info; + } else { + if (Objects.equals(info.getLeader(), currentLeaderLastSeen.getLeader())) { + currentLeaderLastSeen = info; + } else if (info.getLeader() != null && !info.getLeader().equals(currentLeaderLastSeen.getLeader())) { + // switch + long delay = info.getChangeTimestamp() - currentLeaderLastSeen.getChangeTimestamp(); + assertTrue("Lease time not elapsed between switch", delay >= TimeUnit.MILLISECONDS.convert(minimum, unit)); + currentLeaderLastSeen = info; + } + } + } + } + + private LeaderRecorder addMember(String name) { + return addMember(name, "app"); + } + + private LeaderRecorder addMember(String name, String namespace) { + assertNull(this.lockServers.get(name)); + + LockTestServer lockServer = new LockTestServer(lockSimulator); + this.lockServers.put(name, lockServer); + + KubernetesConfiguration configuration = new KubernetesConfiguration(); + configuration.setKubernetesClient(lockServer.createClient()); + + KubernetesClusterService member = new KubernetesClusterService(configuration); + member.setKubernetesNamespace("test"); + member.setPodName(name); + member.setLeaseDurationSeconds(LEASE_TIME_SECONDS); + member.setRenewDeadlineSeconds(3); // 5-3 = at least 2 seconds for switching on leadership loss + member.setRetryPeriodSeconds(1); + member.setRetryOnErrorIntervalSeconds(1); + member.setJitterFactor(1.2); + + LeaderRecorder recorder = new LeaderRecorder(); + try { + member.getView(namespace).addEventListener(recorder); + context().addService(member); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + return recorder; + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java new file mode 100644 index 0000000..1c3d7d0 --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.utils; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Central lock for testing leader election. + */ +public class ConfigMapLockSimulator { + + private static final Logger LOG = LoggerFactory.getLogger(ConfigMapLockSimulator.class); + + private String configMapName; + + private ConfigMap currentMap; + + private long versionCounter = 1000000; + + public ConfigMapLockSimulator(String configMapName) { + this.configMapName = configMapName; + } + + public String getConfigMapName() { + return configMapName; + } + + public synchronized boolean setConfigMap(ConfigMap map, boolean insert) { + // Insert + if (insert && currentMap != null) { + LOG.error("Current map should have been null"); + return false; + } + + // Update + if (!insert && currentMap == null) { + LOG.error("Current map should not have been null"); + return false; + } + String version = map.getMetadata() != null ? map.getMetadata().getResourceVersion() : null; + if (version != null) { + long versionLong = Long.parseLong(version); + if (versionLong != versionCounter) { + LOG.warn("Current resource version is {} while the update is related to version {}", versionCounter, versionLong); + return false; + } + } + + this.currentMap = new ConfigMapBuilder(map) + .editOrNewMetadata() + .withResourceVersion(String.valueOf(++versionCounter)) + .endMetadata() + .build(); + return true; + } + + public synchronized ConfigMap getConfigMap() { + if (currentMap == null) { + return null; + } + + return new ConfigMapBuilder(currentMap).build(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java new file mode 100644 index 0000000..6670f37 --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.utils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +import org.apache.camel.ha.CamelClusterEventListener; +import org.apache.camel.ha.CamelClusterMember; +import org.apache.camel.ha.CamelClusterView; +import org.junit.Assert; + +/** + * Records leadership changes and allow to do assertions. + */ +public class LeaderRecorder implements CamelClusterEventListener.Leadership { + + private List<LeadershipInfo> leaderships = new CopyOnWriteArrayList<>(); + + @Override + public void leadershipChanged(CamelClusterView view, CamelClusterMember leader) { + this.leaderships.add(new LeadershipInfo(leader != null ? leader.getId() : null, System.currentTimeMillis())); + } + + public List<LeadershipInfo> getLeadershipInfo() { + return leaderships; + } + + public void waitForAnyLeader(long time, TimeUnit unit) { + waitForLeader(leader -> leader != null, time, unit); + } + + public void waitForALeaderChange(long time, TimeUnit unit) { + String current = getCurrentLeader(); + waitForLeader(leader -> !Objects.equals(current, leader), time, unit); + } + + public void waitForANewLeader(String current, long time, TimeUnit unit) { + waitForLeader(leader -> leader != null && !Objects.equals(current, leader), time, unit); + } + + public void waitForLeader(Predicate<String> as, long time, TimeUnit unit) { + long start = System.currentTimeMillis(); + while (!as.test(getCurrentLeader())) { + if (System.currentTimeMillis() - start > TimeUnit.MILLISECONDS.convert(time, unit)) { + Assert.fail("Timeout while waiting for condition"); + } + doWait(50); + } + } + + private void doWait(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public String getCurrentLeader() { + if (leaderships.size() > 0) { + return leaderships.get(leaderships.size() - 1).getLeader(); + } + return null; + } + + public Long getLastTimeOf(Predicate<String> p) { + List<LeadershipInfo> lst = new ArrayList<>(leaderships); + Collections.reverse(lst); + for (LeadershipInfo info : lst) { + if (p.test(info.getLeader())) { + return info.getChangeTimestamp(); + } + } + return null; + } + + public static class LeadershipInfo { + private String leader; + private long changeTimestamp; + + public LeadershipInfo(String leader, long changeTimestamp) { + this.leader = leader; + this.changeTimestamp = changeTimestamp; + } + + public String getLeader() { + return leader; + } + + public long getChangeTimestamp() { + return changeTimestamp; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java new file mode 100644 index 0000000..6422e35 --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.utils; + +import java.io.IOException; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.PodListBuilder; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; +import io.fabric8.mockwebserver.utils.ResponseProvider; + +import okhttp3.mockwebserver.RecordedRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Test server to interact with Kubernetes for locking on a ConfigMap. + */ +public class LockTestServer extends KubernetesMockServer { + + private static final Logger LOG = LoggerFactory.getLogger(LockTestServer.class); + + private boolean refuseRequests; + + private Long delayRequests; + + public LockTestServer(ConfigMapLockSimulator lockSimulator) { + + expect().get().withPath("/api/v1/namespaces/test/configmaps/" + lockSimulator.getConfigMapName()).andReply(new ResponseProvider<Object>() { + ThreadLocal<Integer> responseCode = new ThreadLocal<>(); + + @Override + public int getStatusCode() { + return responseCode.get(); + } + + @Override + public Object getBody(RecordedRequest recordedRequest) { + delayIfNecessary(); + if (refuseRequests) { + responseCode.set(500); + return ""; + } + + ConfigMap map = lockSimulator.getConfigMap(); + if (map != null) { + responseCode.set(200); + return map; + } else { + responseCode.set(404); + return ""; + } + } + }).always(); + + expect().post().withPath("/api/v1/namespaces/test/configmaps").andReply(new ResponseProvider<Object>() { + ThreadLocal<Integer> responseCode = new ThreadLocal<>(); + + @Override + public int getStatusCode() { + return responseCode.get(); + } + + @Override + public Object getBody(RecordedRequest recordedRequest) { + delayIfNecessary(); + if (refuseRequests) { + responseCode.set(500); + return ""; + } + + ConfigMap map = convert(recordedRequest); + if (map == null || map.getMetadata() == null || !lockSimulator.getConfigMapName().equals(map.getMetadata().getName())) { + throw new IllegalArgumentException("Illegal configMap received"); + } + + boolean done = lockSimulator.setConfigMap(map, true); + if (done) { + responseCode.set(201); + return lockSimulator.getConfigMap(); + } else { + responseCode.set(500); + return ""; + } + } + }).always(); + + expect().put().withPath("/api/v1/namespaces/test/configmaps/" + lockSimulator.getConfigMapName()).andReply(new ResponseProvider<Object>() { + ThreadLocal<Integer> responseCode = new ThreadLocal<>(); + + @Override + public int getStatusCode() { + return responseCode.get(); + } + + @Override + public Object getBody(RecordedRequest recordedRequest) { + delayIfNecessary(); + if (refuseRequests) { + responseCode.set(500); + return ""; + } + + ConfigMap map = convert(recordedRequest); + + boolean done = lockSimulator.setConfigMap(map, false); + if (done) { + responseCode.set(200); + return lockSimulator.getConfigMap(); + } else { + responseCode.set(409); + return ""; + } + } + }).always(); + + // Other resources + expect().get().withPath("/api/v1/namespaces/test/pods").andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion("1").and().build()).always(); + expect().get().withPath("/api/v1/namespaces/test/pods?resourceVersion=1&watch=true").andUpgradeToWebSocket().open().done().always(); + } + + + public boolean isRefuseRequests() { + return refuseRequests; + } + + public void setRefuseRequests(boolean refuseRequests) { + this.refuseRequests = refuseRequests; + } + + public Long getDelayRequests() { + return delayRequests; + } + + public void setDelayRequests(Long delayRequests) { + this.delayRequests = delayRequests; + } + + private void delayIfNecessary() { + if (delayRequests != null) { + try { + Thread.sleep(delayRequests); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private ConfigMap convert(RecordedRequest request) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(request.getBody().readByteArray(), ConfigMap.class); + } catch (IOException e) { + throw new IllegalArgumentException("Erroneous data", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java new file mode 100644 index 0000000..282b83f --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.utils; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; + +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Basic tests on the lock test server. + */ +public class LockTestServerTest { + + @Test + public void test() { + ConfigMapLockSimulator lock = new ConfigMapLockSimulator("xxx"); + LockTestServer server = new LockTestServer(lock); + KubernetesClient client = server.createClient(); + + assertNull(client.configMaps().withName("xxx").get()); + + client.configMaps().withName("xxx").createNew() + .withNewMetadata() + .withName("xxx") + .and().done(); + + try { + client.configMaps().withName("xxx").createNew() + .withNewMetadata() + .withName("xxx") + .and().done(); + Assert.fail("Should have failed for duplicate insert"); + } catch (Exception e) { + } + + client.configMaps().withName("xxx") + .createOrReplaceWithNew() + .editOrNewMetadata() + .withName("xxx") + .addToLabels("a", "b") + .and().done(); + + ConfigMap map = client.configMaps().withName("xxx").get(); + assertEquals("b", map.getMetadata().getLabels().get("a")); + + + client.configMaps().withName("xxx") + .lockResourceVersion(map.getMetadata().getResourceVersion()) + .replace(new ConfigMapBuilder(map) + .editOrNewMetadata() + .withName("xxx") + .addToLabels("c", "d") + .and() + .build()); + + ConfigMap newMap = client.configMaps().withName("xxx").get(); + assertEquals("d", newMap.getMetadata().getLabels().get("c")); + + try { + client.configMaps().withName("xxx") + .lockResourceVersion(map.getMetadata().getResourceVersion()) + .replace(new ConfigMapBuilder(map) + .editOrNewMetadata() + .withName("xxx") + .addToLabels("e", "f") + .and() + .build()); + Assert.fail("Should have failed for wrong version"); + } catch (Exception ex) { + } + + ConfigMap newMap2 = client.configMaps().withName("xxx").get(); + assertNull(newMap2.getMetadata().getLabels().get("e")); + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 8030a2a..516033f 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -491,6 +491,7 @@ <mina2-version>2.0.16</mina2-version> <minimal-json-version>0.9.4</minimal-json-version> <mock-javamail-version>1.9</mock-javamail-version> + <mockwebserver-version>0.0.13</mockwebserver-version> <mockito-version>1.10.19</mockito-version> <mongo-java-driver-version>3.5.0</mongo-java-driver-version> <mongo-java-driver32-version>3.2.2</mongo-java-driver32-version>