This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 8cc227b Rebalancing cluster service 8cc227b is described below commit 8cc227b65b0e5808cb4db7ea30ffcb2cb4e9cdfe Author: nicolaferraro <ni.ferr...@gmail.com> AuthorDate: Mon Nov 23 17:32:23 2020 +0100 Rebalancing cluster service --- components/camel-kubernetes/pom.xml | 5 + .../cluster/KubernetesClusterService.java | 54 +++- .../kubernetes/cluster/KubernetesClusterView.java | 18 +- .../kubernetes/cluster/LeaseResourceType.java | 29 +++ .../cluster/lock/ConfigMapLockUtils.java | 101 -------- .../lock/KubernetesLeadershipController.java | 229 +++++++++++++---- .../lock/KubernetesLeaseResourceManager.java | 79 ++++++ .../cluster/lock/KubernetesLockConfiguration.java | 61 ++++- .../kubernetes/cluster/lock/LeaderInfo.java | 14 +- .../cluster/lock/TimedLeaderNotifier.java | 16 +- .../lock/impl/ConfigMapLeaseResourceManager.java | 144 +++++++++++ .../lock/impl/NativeLeaseResourceManager.java | 164 +++++++++++++ .../cluster/KubernetesClusterServiceTest.java | 273 ++++++++++++++++----- .../cluster/utils/ConfigMapLockSimulator.java | 68 ++--- .../cluster/utils/LeaseLockSimulator.java | 56 +++++ .../kubernetes/cluster/utils/LockTestServer.java | 200 +++++++++------ ...ckSimulator.java => ResourceLockSimulator.java} | 50 ++-- .../cluster/CamelPreemptiveClusterService.java | 28 +++ .../camel/cluster/CamelPreemptiveClusterView.java | 35 +++ .../cluster/RebalancingCamelClusterService.java | 267 ++++++++++++++++++++ 20 files changed, 1506 insertions(+), 385 deletions(-) diff --git a/components/camel-kubernetes/pom.xml b/components/camel-kubernetes/pom.xml index 6da12a5..01e48c5 100644 --- a/components/camel-kubernetes/pom.xml +++ b/components/camel-kubernetes/pom.xml @@ -111,6 +111,11 @@ <version>${commons-codec-version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> <!-- logging --> <dependency> <groupId>org.apache.logging.log4j</groupId> diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java index 9b24b7e..d9e1ace 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.cluster.CamelPreemptiveClusterService; import org.apache.camel.component.kubernetes.KubernetesConfiguration; import org.apache.camel.component.kubernetes.cluster.lock.KubernetesLockConfiguration; import org.apache.camel.support.cluster.AbstractCamelClusterService; @@ -29,11 +30,12 @@ import org.apache.camel.util.ObjectHelper; /** * A Kubernetes based cluster service leveraging Kubernetes optimistic locks on resources (specifically ConfigMaps). */ -public class KubernetesClusterService extends AbstractCamelClusterService<KubernetesClusterView> { +public class KubernetesClusterService extends AbstractCamelClusterService<KubernetesClusterView> + implements CamelPreemptiveClusterService { - private KubernetesConfiguration configuration; + protected KubernetesConfiguration configuration; - private KubernetesLockConfiguration lockConfiguration; + protected KubernetesLockConfiguration lockConfiguration; public KubernetesClusterService() { this.configuration = new KubernetesConfiguration(); @@ -58,6 +60,11 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern return new KubernetesClusterView(getCamelContext(), this, config, lockConfig); } + @Override + public KubernetesClusterView getView(String namespace) throws Exception { + return (KubernetesClusterView) super.getView(namespace); + } + protected KubernetesConfiguration setConfigDefaults( KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) { if (configuration.getConnectionTimeout() == null) { @@ -106,8 +113,8 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern } if (config.getLeaseDurationMillis() <= config.getRenewDeadlineMillis()) { throw new IllegalStateException( - "leaseDurationMillis must be greater than renewDeadlineMillis " + "(" + config.getLeaseDurationMillis() - + " is not greater than " + "leaseDurationMillis must be greater than renewDeadlineMillis (" + + config.getLeaseDurationMillis() + " is not greater than " + config.getRenewDeadlineMillis() + ")"); } if (config.getRenewDeadlineMillis() <= config.getJitterFactor() * config.getRetryPeriodMillis()) { @@ -154,15 +161,47 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern this.lockConfiguration.setKubernetesResourcesNamespace(kubernetesNamespace); } + /** + * @return the resource name + * @deprecated Use {@link #getKubernetesResourceName()} + */ + @Deprecated public String getConfigMapName() { return this.lockConfiguration.getConfigMapName(); } /** * Set the name of the ConfigMap used to do optimistic locking (defaults to 'leaders'). + * + * @param kubernetesResourceName the resource name + * @deprecated Use {@link #setKubernetesResourceName(String)} + */ + @Deprecated + public void setConfigMapName(String kubernetesResourceName) { + this.lockConfiguration.setConfigMapName(kubernetesResourceName); + } + + public LeaseResourceType getLeaseResourceType() { + return this.lockConfiguration.getLeaseResourceType(); + } + + /** + * Set the lease resource type used in Kubernetes (defaults to 'Lease', from coordination.k8s.io). */ - public void setConfigMapName(String configMapName) { - this.lockConfiguration.setConfigMapName(configMapName); + public void setLeaseResourceType(LeaseResourceType type) { + this.lockConfiguration.setLeaseResourceType(type); + } + + public String getKubernetesResourceName() { + return this.lockConfiguration.getKubernetesResourceName(); + } + + /** + * Set the name of the lease resource used to do optimistic locking (defaults to 'leaders'). Resource name is used + * as prefix when the underlying Kubernetes resource can mange a single lock. + */ + public void setKubernetesResourceName(String kubernetesResourceName) { + this.lockConfiguration.setKubernetesResourceName(kubernetesResourceName); } public String getPodName() { @@ -235,4 +274,5 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern public void setRetryPeriodMillis(long retryPeriodMillis) { lockConfiguration.setRetryPeriodMillis(retryPeriodMillis); } + } diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java index a2e80db..f684110 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import io.fabric8.kubernetes.client.KubernetesClient; import org.apache.camel.CamelContext; import org.apache.camel.cluster.CamelClusterMember; +import org.apache.camel.cluster.CamelPreemptiveClusterView; import org.apache.camel.component.kubernetes.KubernetesConfiguration; import org.apache.camel.component.kubernetes.KubernetesHelper; import org.apache.camel.component.kubernetes.cluster.lock.KubernetesClusterEvent; @@ -40,7 +41,7 @@ import org.apache.camel.util.ObjectHelper; * The cluster view on a specific Camel cluster namespace (not to be confused with Kubernetes namespaces). Namespaces * are represented as keys in a Kubernetes ConfigMap (values are the current leader pods). */ -public class KubernetesClusterView extends AbstractCamelClusterView { +public class KubernetesClusterView extends AbstractCamelClusterView implements CamelPreemptiveClusterView { private CamelContext camelContext; @@ -60,6 +61,8 @@ public class KubernetesClusterView extends AbstractCamelClusterView { private KubernetesLeadershipController controller; + private boolean disabled; + public KubernetesClusterView(CamelContext camelContext, KubernetesClusterService cluster, KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) { @@ -69,6 +72,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView { this.lockConfiguration = ObjectHelper.notNull(lockConfiguration, "lockConfiguration"); this.localMember = new KubernetesClusterMember(lockConfiguration.getPodName()); this.memberCache = new HashMap<>(); + this.disabled = false; } @Override @@ -86,6 +90,17 @@ public class KubernetesClusterView extends AbstractCamelClusterView { return currentMembers; } + public boolean isDisabled() { + return disabled; + } + + public void setDisabled(boolean disabled) { + this.disabled = disabled; + if (this.controller != null) { + this.controller.setDisabled(disabled); + } + } + @Override protected void doStart() throws Exception { if (controller == null) { @@ -121,6 +136,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView { } }); + this.controller.setDisabled(disabled); controller.start(); } } diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/LeaseResourceType.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/LeaseResourceType.java new file mode 100644 index 0000000..0561f76 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/LeaseResourceType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.cluster; + +public enum LeaseResourceType { + /** + * A Kubernetes ConfigMap. + */ + ConfigMap, + + /** + * A Kubernetes Lease (coordination.k8s.io). + */ + Lease +} diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/ConfigMapLockUtils.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/ConfigMapLockUtils.java deleted file mode 100644 index ee14a84..0000000 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/ConfigMapLockUtils.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kubernetes.cluster.lock; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Set; - -import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.api.model.ConfigMapBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utilities for managing ConfigMaps that contain lock information. - */ -public final class ConfigMapLockUtils { - - private static final Logger LOG = LoggerFactory.getLogger(ConfigMapLockUtils.class); - - private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ssX"; - - private static final String LEADER_PREFIX = "leader.pod."; - - private static final String LOCAL_TIMESTAMP_PREFIX = "leader.local.timestamp."; - - private ConfigMapLockUtils() { - } - - public static ConfigMap createNewConfigMap(String configMapName, LeaderInfo leaderInfo) { - return new ConfigMapBuilder().withNewMetadata().withName(configMapName).addToLabels("provider", "camel") - .addToLabels("kind", "locks").endMetadata() - .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader()) - .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getLocalTimestamp())) - .build(); - } - - public static ConfigMap getConfigMapWithNewLeader(ConfigMap configMap, LeaderInfo leaderInfo) { - return new ConfigMapBuilder(configMap).addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader()) - .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getLocalTimestamp())) - .build(); - } - - public static LeaderInfo getLeaderInfo(ConfigMap configMap, Set<String> members, String group) { - return new LeaderInfo(group, getLeader(configMap, group), getLocalTimestamp(configMap, group), members); - } - - private static String getLeader(ConfigMap configMap, String group) { - return getConfigMapValue(configMap, LEADER_PREFIX + group); - } - - private static String formatDate(Date date) { - if (date == null) { - return null; - } - try { - return new SimpleDateFormat(DATE_TIME_FORMAT).format(date); - } catch (Exception e) { - LOG.warn("Unable to format date '{}' using format {}", date, DATE_TIME_FORMAT, e); - } - - return null; - } - - private static Date getLocalTimestamp(ConfigMap configMap, String group) { - String timestamp = getConfigMapValue(configMap, LOCAL_TIMESTAMP_PREFIX + group); - if (timestamp == null) { - return null; - } - - try { - return new SimpleDateFormat(DATE_TIME_FORMAT).parse(timestamp); - } catch (Exception e) { - LOG.warn("Unable to parse time string '{}' using format {}", timestamp, DATE_TIME_FORMAT, e); - } - - return null; - } - - private static String getConfigMapValue(ConfigMap configMap, String key) { - if (configMap == null || configMap.getData() == null) { - return null; - } - return configMap.getData().get(key); - } - -} diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java index 8c6c017..69fa5e4 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java @@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.KubernetesClient; import org.apache.camel.CamelContext; @@ -45,16 +45,18 @@ public class KubernetesLeadershipController implements Service { private enum State { NOT_LEADER, BECOMING_LEADER, - LEADER + LEADER, + LOSING_LEADERSHIP, + LEADERSHIP_LOST } - private CamelContext camelContext; + private final CamelContext camelContext; - private KubernetesClient kubernetesClient; + private final KubernetesClient kubernetesClient; - private KubernetesLockConfiguration lockConfiguration; + private final KubernetesLockConfiguration lockConfiguration; - private KubernetesClusterEventHandler eventHandler; + private final KubernetesClusterEventHandler eventHandler; private State currentState = State.NOT_LEADER; @@ -62,10 +64,14 @@ public class KubernetesLeadershipController implements Service { private TimedLeaderNotifier leaderNotifier; + private final KubernetesLeaseResourceManager<HasMetadata> leaseManager; + private volatile LeaderInfo latestLeaderInfo; - private volatile ConfigMap latestConfigMap; + private volatile HasMetadata latestLeaseResource; private volatile Set<String> latestMembers; + private boolean disabled; + public KubernetesLeadershipController(CamelContext camelContext, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) { @@ -73,6 +79,8 @@ public class KubernetesLeadershipController implements Service { this.kubernetesClient = kubernetesClient; this.lockConfiguration = lockConfiguration; this.eventHandler = eventHandler; + this.disabled = false; + this.leaseManager = KubernetesLeaseResourceManager.create(lockConfiguration.getLeaseResourceType()); } @Override @@ -102,6 +110,18 @@ public class KubernetesLeadershipController implements Service { leaderNotifier = null; } + public boolean isDisabled() { + return disabled; + } + + public void setDisabled(boolean disabled) { + boolean oldState = this.disabled; + this.disabled = disabled; + if (oldState != disabled && serializedExecutor != null) { + serializedExecutor.execute(this::refreshStatus); + } + } + private void refreshStatus() { switch (currentState) { case NOT_LEADER: @@ -113,6 +133,12 @@ public class KubernetesLeadershipController implements Service { case LEADER: refreshStatusLeader(); break; + case LOSING_LEADERSHIP: + refreshStatusLosingLeadership(); + break; + case LEADERSHIP_LOST: + refreshStatusLeadershipLost(); + break; default: throw new RuntimeException("Unsupported state " + currentState); } @@ -132,7 +158,8 @@ public class KubernetesLeadershipController implements Service { if (this.latestLeaderInfo.hasEmptyLeader()) { // There is no previous leader - LOG.info("{} The cluster has no leaders. Trying to acquire the leadership...", logPrefix()); + LOG.info("{} The cluster has no leaders for group {}. Trying to acquire the leadership...", logPrefix(), + this.lockConfiguration.getGroupName()); boolean acquired = tryAcquireLeadership(); if (acquired) { LOG.info("{} Leadership acquired by current pod with immediate effect", logPrefix()); @@ -191,7 +218,55 @@ public class KubernetesLeadershipController implements Service { this.serializedExecutor.execute(this::refreshStatus); } + /** + * This pod is going to manually lose the leadership. It should shutdown activities and wait a lease amount of time + * before giving up the lease. + */ + private void refreshStatusLosingLeadership() { + // Wait always the same amount of time before giving up the leadership + long delay = this.lockConfiguration.getLeaseDurationMillis(); + LOG.info("{} Current pod owns the leadership, but it will be lost in {} seconds...", logPrefix(), + new BigDecimal(delay).divide(BigDecimal.valueOf(1000), 2, BigDecimal.ROUND_HALF_UP)); + + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + LOG.warn("Thread interrupted", e); + } + + LOG.info("{} Current pod is losing leadership now...", logPrefix()); + this.currentState = State.LEADERSHIP_LOST; + this.serializedExecutor.execute(this::refreshStatus); + } + + /** + * Functions are stopped, now lost leadership should be communicated by freeing up the lease. + */ + private void refreshStatusLeadershipLost() { + boolean pulled = lookupNewLeaderInfo(); + if (!pulled) { + rescheduleAfterDelay(); + return; + } + + if (!this.yieldLeadership()) { + rescheduleAfterDelay(); + return; + } + + LOG.info("{} Current pod has lost leadership", logPrefix()); + this.currentState = State.NOT_LEADER; + this.serializedExecutor.execute(this::refreshStatus); + } + private void refreshStatusLeader() { + if (this.disabled) { + LOG.debug("{} Leadership disabled, pod is going to lose leadership", logPrefix()); + this.currentState = State.LOSING_LEADERSHIP; + this.serializedExecutor.execute(this::refreshStatus); + return; + } + LOG.debug("{} Pod should be the leader, pulling new data from the cluster", logPrefix()); long timeBeforePulling = System.currentTimeMillis(); boolean pulled = lookupNewLeaderInfo(); @@ -202,9 +277,15 @@ public class KubernetesLeadershipController implements Service { if (this.latestLeaderInfo.isValidLeader(this.lockConfiguration.getPodName())) { LOG.debug("{} Current Pod is still the leader", logPrefix()); + this.leaderNotifier.refreshLeadership(Optional.of(this.lockConfiguration.getPodName()), timeBeforePulling, this.lockConfiguration.getRenewDeadlineMillis(), this.latestLeaderInfo.getMembers()); + + HasMetadata newLease = this.leaseManager.refreshLeaseRenewTime(kubernetesClient, this.latestLeaseResource, + this.lockConfiguration.getRenewDeadlineSeconds()); + updateLatestLeaderInfo(newLease, this.latestMembers); + rescheduleAfterDelay(); return; } else { @@ -228,13 +309,17 @@ public class KubernetesLeadershipController implements Service { private boolean lookupNewLeaderInfo() { LOG.debug("{} Looking up leadership information...", logPrefix()); - ConfigMap configMap; + HasMetadata leaseResource; try { - configMap = pullConfigMap(); + leaseResource = leaseManager.fetchLeaseResource(kubernetesClient, + this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient), + this.lockConfiguration.getKubernetesResourceName(), + this.lockConfiguration.getGroupName()); } catch (Throwable e) { - LOG.warn(logPrefix() + " Unable to retrieve the current ConfigMap " + this.lockConfiguration.getConfigMapName() - + " from Kubernetes"); - LOG.debug(logPrefix() + " Exception thrown during ConfigMap lookup", e); + LOG.warn(logPrefix() + " Unable to retrieve the current lease resource " + + this.lockConfiguration.getKubernetesResourceName() + + " for group " + this.lockConfiguration.getGroupName() + " from Kubernetes"); + LOG.debug(logPrefix() + " Exception thrown during lease resource lookup", e); return false; } @@ -247,14 +332,62 @@ public class KubernetesLeadershipController implements Service { return false; } - updateLatestLeaderInfo(configMap, members); + updateLatestLeaderInfo(leaseResource, members); return true; } + private boolean yieldLeadership() { + LOG.debug("{} Trying to yield the leadership...", logPrefix()); + + HasMetadata leaseResource = this.latestLeaseResource; + Set<String> members = this.latestMembers; + LeaderInfo latestLeaderInfo = this.latestLeaderInfo; + + if (latestLeaderInfo == null || members == null) { + LOG.warn(logPrefix() + " Unexpected condition. Latest leader info or list of members is empty."); + return false; + } else if (!members.contains(this.lockConfiguration.getPodName())) { + LOG.warn(logPrefix() + " The list of cluster members " + latestLeaderInfo.getMembers() + + " does not contain the current Pod. Cannot yield the leadership."); + return false; + } + + if (leaseResource == null) { + // Already yielded + return true; + } + + LOG.debug("{} Lock lease resource already present in the Kubernetes namespace. Checking...", logPrefix()); + LeaderInfo leaderInfo = leaseManager.decodeLeaderInfo(leaseResource, members, this.lockConfiguration.getGroupName()); + if (!leaderInfo.isValidLeader(this.lockConfiguration.getPodName())) { + // Already yielded + return true; + } + + try { + HasMetadata updatedLeaseResource = leaseManager.optimisticDeleteLeaderInfo(kubernetesClient, leaseResource, + this.lockConfiguration.getGroupName()); + + LOG.debug("{} Lease resource {} for group {} successfully updated", logPrefix(), + this.lockConfiguration.getKubernetesResourceName(), this.lockConfiguration.getGroupName()); + updateLatestLeaderInfo(updatedLeaseResource, members); + return true; + } catch (Exception ex) { + LOG.warn(logPrefix() + " Unable to update the lock on the lease resource to remove leadership information"); + LOG.debug(logPrefix() + " Error received during resource lock replace", ex); + return false; + } + } + private boolean tryAcquireLeadership() { + if (this.disabled) { + LOG.debug("{} Won't try to acquire the leadership because it's disabled...", logPrefix()); + return false; + } + LOG.debug("{} Trying to acquire the leadership...", logPrefix()); - ConfigMap configMap = this.latestConfigMap; + HasMetadata leaseResource = this.latestLeaseResource; Set<String> members = this.latestMembers; LeaderInfo latestLeaderInfo = this.latestLeaderInfo; @@ -267,53 +400,53 @@ public class KubernetesLeadershipController implements Service { return false; } - // Info we would set set in the configmap to become leaders + // Info we would set set in the lease resource to become leaders LeaderInfo newLeaderInfo = new LeaderInfo( - this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName(), new Date(), members); + this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName(), new Date(), members, + this.lockConfiguration.getLeaseDurationSeconds()); - if (configMap == null) { - // No ConfigMap created so far - LOG.debug("{} Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created", + if (leaseResource == null) { + // No leaseResource created so far + LOG.debug("{} Lock lease resource is not present in the Kubernetes namespace. A new lease resource will be created", logPrefix()); - ConfigMap newConfigMap - = ConfigMapLockUtils.createNewConfigMap(this.lockConfiguration.getConfigMapName(), newLeaderInfo); try { - kubernetesClient.configMaps() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .create(newConfigMap); - - LOG.debug("{} ConfigMap {} successfully created", logPrefix(), this.lockConfiguration.getConfigMapName()); - updateLatestLeaderInfo(newConfigMap, members); + HasMetadata newLeaseResource = leaseManager.createNewLeaseResource(kubernetesClient, + this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient), + this.lockConfiguration.getKubernetesResourceName(), + newLeaderInfo); + + LOG.debug("{} Lease resource {} successfully created for group {}", logPrefix(), + this.lockConfiguration.getKubernetesResourceName(), newLeaderInfo.getGroupName()); + updateLatestLeaderInfo(newLeaseResource, members); return true; } catch (Exception ex) { // Suppress exception LOG.warn(logPrefix() - + " Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has " + + " Unable to create the lease resource, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has " + "the right " + "permissions to create it"); - LOG.debug(logPrefix() + " Exception while trying to create the ConfigMap", ex); + LOG.debug(logPrefix() + " Exception while trying to create the lease resource", ex); return false; } } else { - LOG.debug("{} Lock configmap already present in the Kubernetes namespace. Checking...", logPrefix()); - LeaderInfo leaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, members, this.lockConfiguration.getGroupName()); + LOG.debug("{} Lock lease resource already present in the Kubernetes namespace. Checking...", logPrefix()); + LeaderInfo leaderInfo + = leaseManager.decodeLeaderInfo(leaseResource, members, this.lockConfiguration.getGroupName()); boolean canAcquire = !leaderInfo.hasValidLeader(); if (canAcquire) { // Try to be the new leader try { - ConfigMap updatedConfigMap = ConfigMapLockUtils.getConfigMapWithNewLeader(configMap, newLeaderInfo); - kubernetesClient.configMaps() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .withName(this.lockConfiguration.getConfigMapName()) - .lockResourceVersion(configMap.getMetadata().getResourceVersion()).replace(updatedConfigMap); - - LOG.debug("{} ConfigMap {} successfully updated", logPrefix(), this.lockConfiguration.getConfigMapName()); - updateLatestLeaderInfo(updatedConfigMap, members); + HasMetadata updatedLeaseResource + = leaseManager.optimisticAcquireLeadership(kubernetesClient, leaseResource, newLeaderInfo); + + LOG.debug("{} Lease resource {} successfully updated for group {}", logPrefix(), + this.lockConfiguration.getKubernetesResourceName(), newLeaderInfo.getGroupName()); + updateLatestLeaderInfo(updatedLeaseResource, members); return true; } catch (Exception ex) { - LOG.warn(logPrefix() + " Unable to update the lock ConfigMap to set leadership information"); - LOG.debug(logPrefix() + " Error received during configmap lock replace", ex); + LOG.warn(logPrefix() + " Unable to update the lock lease resource to set leadership information"); + LOG.debug(logPrefix() + " Error received during lease resource lock replace", ex); return false; } } else { @@ -325,20 +458,14 @@ public class KubernetesLeadershipController implements Service { } } - private void updateLatestLeaderInfo(ConfigMap configMap, Set<String> members) { + private void updateLatestLeaderInfo(HasMetadata leaseResource, Set<String> members) { LOG.debug("{} Updating internal status about the current leader", logPrefix()); - this.latestConfigMap = configMap; + this.latestLeaseResource = leaseResource; this.latestMembers = members; - this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, members, this.lockConfiguration.getGroupName()); + this.latestLeaderInfo = leaseManager.decodeLeaderInfo(leaseResource, members, this.lockConfiguration.getGroupName()); LOG.debug("{} Current leader info: {}", logPrefix(), this.latestLeaderInfo); } - private ConfigMap pullConfigMap() { - return kubernetesClient.configMaps() - .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) - .withName(this.lockConfiguration.getConfigMapName()).get(); - } - private Set<String> pullClusterMembers() { List<Pod> pods = kubernetesClient.pods() .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeaseResourceManager.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeaseResourceManager.java new file mode 100644 index 0000000..0ab7b7f --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeaseResourceManager.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.cluster.lock; + +import java.util.Set; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.kubernetes.cluster.LeaseResourceType; +import org.apache.camel.component.kubernetes.cluster.lock.impl.ConfigMapLeaseResourceManager; +import org.apache.camel.component.kubernetes.cluster.lock.impl.NativeLeaseResourceManager; + +/** + * Handles the actual interaction with Kubernetes resources, allowing different implementation to be plugged. + */ +public interface KubernetesLeaseResourceManager<T extends HasMetadata> { + + /** + * Create a new {@link KubernetesLeaseResourceManager} of the given {@link LeaseResourceType}. + */ + @SuppressWarnings("unchecked") + static <S extends HasMetadata> KubernetesLeaseResourceManager<S> create(LeaseResourceType type) { + switch (type) { + case ConfigMap: + return (KubernetesLeaseResourceManager<S>) new ConfigMapLeaseResourceManager(); + case Lease: + return (KubernetesLeaseResourceManager<S>) new NativeLeaseResourceManager(); + default: + throw new RuntimeCamelException("Unsupported lease resource type " + type); + } + } + + /** + * Return a {@link LeaderInfo} object from the underlying Kubernetes resource. + */ + LeaderInfo decodeLeaderInfo(T leaseResource, Set<String> members, String group); + + /** + * Fetch the lease resource for the given name and group. + */ + T fetchLeaseResource(KubernetesClient client, String namespace, String leaseResourceName, String group); + + /** + * Delete leadership information for the given lease resource and group. + */ + T optimisticDeleteLeaderInfo(KubernetesClient client, T leaseResource, String group); + + /** + * Set the leadership information on the lease resource to match the given {@link LeaderInfo}. + */ + T optimisticAcquireLeadership(KubernetesClient client, T leaseResource, LeaderInfo newLeaderInfo); + + /** + * Create a new lease resource matching the given {@link LeaderInfo}. + */ + T createNewLeaseResource(KubernetesClient client, String namespace, String leaseResourceName, LeaderInfo leaderInfo); + + /** + * Update information on the lease resource to increase the renew time (if last renewal has occurred more than + * minUpdateIntervalSeconds seconds ago). + */ + T refreshLeaseRenewTime(KubernetesClient client, T leaseResource, int minUpdateIntervalSeconds); + +} diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java index de9999b..70c52f8 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java @@ -20,18 +20,25 @@ import java.util.HashMap; import java.util.Map; import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.camel.component.kubernetes.cluster.LeaseResourceType; /** * Configuration for Kubernetes Lock. */ public class KubernetesLockConfiguration implements Cloneable { - public static final String DEFAULT_CONFIGMAP_NAME = "leaders"; + public static final LeaseResourceType DEFAULT_LEASE_RESOURCE_TYPE = LeaseResourceType.Lease; + public static final String DEFAULT_RESOURCE_NAME = "leaders"; public static final double DEFAULT_JITTER_FACTOR = 1.2; - public static final long DEFAULT_LEASE_DURATION_MILLIS = 30000; - public static final long DEFAULT_RENEW_DEADLINE_MILLIS = 20000; - public static final long DEFAULT_RETRY_PERIOD_MILLIS = 5000; + public static final long DEFAULT_LEASE_DURATION_MILLIS = 15000; + public static final long DEFAULT_RENEW_DEADLINE_MILLIS = 10000; + public static final long DEFAULT_RETRY_PERIOD_MILLIS = 2000; + + /** + * Kubernetes resource type used to hold the leases. + */ + private LeaseResourceType leaseResourceType = DEFAULT_LEASE_RESOURCE_TYPE; /** * Kubernetes namespace containing the pods and the ConfigMap used for locking. @@ -39,9 +46,9 @@ public class KubernetesLockConfiguration implements Cloneable { private String kubernetesResourcesNamespace; /** - * Name of the ConfigMap used for locking. + * Name of the resource used for locking (or prefix, in case multiple ones are used). */ - private String configMapName = DEFAULT_CONFIGMAP_NAME; + private String kubernetesResourceName = DEFAULT_RESOURCE_NAME; /** * Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfigMap. @@ -82,6 +89,14 @@ public class KubernetesLockConfiguration implements Cloneable { public KubernetesLockConfiguration() { } + public LeaseResourceType getLeaseResourceType() { + return leaseResourceType; + } + + public void setLeaseResourceType(LeaseResourceType leaseResourceType) { + this.leaseResourceType = leaseResourceType; + } + public String getKubernetesResourcesNamespaceOrDefault(KubernetesClient kubernetesClient) { if (kubernetesResourcesNamespace != null) { return kubernetesResourcesNamespace; @@ -97,12 +112,30 @@ public class KubernetesLockConfiguration implements Cloneable { this.kubernetesResourcesNamespace = kubernetesResourcesNamespace; } + /** + * @return the resource name + * @deprecated Use {@link #getKubernetesResourceName()} + */ + @Deprecated public String getConfigMapName() { - return configMapName; + return kubernetesResourceName; } - public void setConfigMapName(String configMapName) { - this.configMapName = configMapName; + /** + * @param kubernetesResourceName the resource name + * @deprecated Use {@link #setKubernetesResourceName(String)} + */ + @Deprecated + public void setConfigMapName(String kubernetesResourceName) { + this.kubernetesResourceName = kubernetesResourceName; + } + + public String getKubernetesResourceName() { + return kubernetesResourceName; + } + + public void setKubernetesResourceName(String kubernetesResourceName) { + this.kubernetesResourceName = kubernetesResourceName; } public String getGroupName() { @@ -141,6 +174,10 @@ public class KubernetesLockConfiguration implements Cloneable { this.jitterFactor = jitterFactor; } + public int getLeaseDurationSeconds() { + return (int) (getLeaseDurationMillis() / 1000); + } + public long getLeaseDurationMillis() { return leaseDurationMillis; } @@ -149,6 +186,10 @@ public class KubernetesLockConfiguration implements Cloneable { this.leaseDurationMillis = leaseDurationMillis; } + public int getRenewDeadlineSeconds() { + return (int) (getRenewDeadlineMillis() / 1000); + } + public long getRenewDeadlineMillis() { return renewDeadlineMillis; } @@ -178,7 +219,7 @@ public class KubernetesLockConfiguration implements Cloneable { public String toString() { final StringBuilder sb = new StringBuilder("KubernetesLockConfiguration{"); sb.append("kubernetesResourcesNamespace='").append(kubernetesResourcesNamespace).append('\''); - sb.append(", configMapName='").append(configMapName).append('\''); + sb.append(", kubernetesResourceName='").append(kubernetesResourceName).append('\''); sb.append(", groupName='").append(groupName).append('\''); sb.append(", podName='").append(podName).append('\''); sb.append(", clusterLabels=").append(clusterLabels); diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java index f19a075..97b2b47 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java @@ -34,14 +34,17 @@ public class LeaderInfo { private Set<String> members; + private Integer leaseDurationSeconds; + public LeaderInfo() { } - public LeaderInfo(String groupName, String leader, Date timestamp, Set<String> members) { + public LeaderInfo(String groupName, String leader, Date timestamp, Set<String> members, Integer leaseDurationSeconds) { this.groupName = groupName; this.leader = leader; this.localTimestamp = timestamp; this.members = members; + this.leaseDurationSeconds = leaseDurationSeconds; } public boolean hasEmptyLeader() { @@ -89,6 +92,14 @@ public class LeaderInfo { this.members = members; } + public Integer getLeaseDurationSeconds() { + return leaseDurationSeconds; + } + + public void setLeaseDurationSeconds(Integer leaseDurationSeconds) { + this.leaseDurationSeconds = leaseDurationSeconds; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("LeaderInfo{"); @@ -96,6 +107,7 @@ public class LeaderInfo { sb.append(", leader='").append(leader).append('\''); sb.append(", localTimestamp=").append(localTimestamp); sb.append(", members=").append(members); + sb.append(", leaseDurationSeconds=").append(leaseDurationSeconds); sb.append('}'); return sb.toString(); } diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java index 5b49fef..650d592 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java @@ -165,12 +165,8 @@ public class TimedLeaderNotifier implements Service { lastCommunicatedLeader = newLeader; LOG.info("The cluster has a new leader: {}", newLeader); try { - handler.onKubernetesClusterEvent(new KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent() { - @Override - public Optional<String> getData() { - return newLeader; - } - }); + handler.onKubernetesClusterEvent( + (KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> newLeader); } catch (Throwable t) { LOG.warn("Error while communicating the new leader to the handler", t); } @@ -181,12 +177,8 @@ public class TimedLeaderNotifier implements Service { lastCommunicatedMembers = newMembers; LOG.info("The list of cluster members has changed: {}", newMembers); try { - handler.onKubernetesClusterEvent(new KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent() { - @Override - public Set<String> getData() { - return newMembers; - } - }); + handler.onKubernetesClusterEvent( + (KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) () -> newMembers); } catch (Throwable t) { LOG.warn("Error while communicating the cluster members to the handler", t); } diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/ConfigMapLeaseResourceManager.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/ConfigMapLeaseResourceManager.java new file mode 100644 index 0000000..66e6b2e --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/ConfigMapLeaseResourceManager.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.cluster.lock.impl; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Set; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.camel.component.kubernetes.cluster.lock.KubernetesLeaseResourceManager; +import org.apache.camel.component.kubernetes.cluster.lock.LeaderInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConfigMapLeaseResourceManager implements KubernetesLeaseResourceManager<ConfigMap> { + + private static final Logger LOG = LoggerFactory.getLogger(ConfigMapLeaseResourceManager.class); + + private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ssX"; + + private static final String LEADER_PREFIX = "leader.pod."; + + private static final String LOCAL_TIMESTAMP_PREFIX = "leader.local.timestamp."; + + @Override + public LeaderInfo decodeLeaderInfo(ConfigMap configMap, Set<String> members, String group) { + return new LeaderInfo(group, getLeader(configMap, group), getLocalTimestamp(configMap, group), members, null); + } + + @Override + public ConfigMap fetchLeaseResource(KubernetesClient client, String namespace, String name, String group) { + return client.configMaps() + .inNamespace(namespace) + .withName(name).get(); + } + + @Override + public ConfigMap optimisticDeleteLeaderInfo(KubernetesClient client, ConfigMap leaseResource, String group) { + ConfigMap updatedConfigMap = getConfigMapWithoutLeader(leaseResource, group); + return client.configMaps() + .inNamespace(leaseResource.getMetadata().getNamespace()) + .withName(leaseResource.getMetadata().getName()) + .lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedConfigMap); + } + + @Override + public ConfigMap optimisticAcquireLeadership(KubernetesClient client, ConfigMap leaseResource, LeaderInfo newLeaderInfo) { + ConfigMap updatedConfigMap = getConfigMapWithNewLeader(leaseResource, newLeaderInfo); + return client.configMaps() + .inNamespace(leaseResource.getMetadata().getNamespace()) + .withName(leaseResource.getMetadata().getName()) + .lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedConfigMap); + } + + @Override + public ConfigMap createNewLeaseResource( + KubernetesClient client, String namespace, String leaseResourceName, LeaderInfo leaderInfo) { + ConfigMap newConfigMap + = new ConfigMapBuilder().withNewMetadata().withName(leaseResourceName).addToLabels("provider", "camel") + .addToLabels("kind", "locks").endMetadata() + .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader()) + .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(), + formatDate(leaderInfo.getLocalTimestamp())) + .build(); + + return client.configMaps() + .inNamespace(namespace) + .create(newConfigMap); + } + + @Override + public ConfigMap refreshLeaseRenewTime(KubernetesClient client, ConfigMap leaseResource, int minUpdateIntervalSeconds) { + // Configmap does not store renew information + return leaseResource; + } + + private static ConfigMap getConfigMapWithNewLeader(ConfigMap configMap, LeaderInfo leaderInfo) { + return new ConfigMapBuilder(configMap).addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader()) + .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getLocalTimestamp())) + .build(); + } + + private static ConfigMap getConfigMapWithoutLeader(ConfigMap configMap, String group) { + return new ConfigMapBuilder(configMap).removeFromData(LEADER_PREFIX + group) + .removeFromData(LOCAL_TIMESTAMP_PREFIX + group) + .build(); + } + + private static Date getLocalTimestamp(ConfigMap configMap, String group) { + String timestamp = getConfigMapValue(configMap, LOCAL_TIMESTAMP_PREFIX + group); + if (timestamp == null) { + return null; + } + + try { + return new SimpleDateFormat(DATE_TIME_FORMAT).parse(timestamp); + } catch (Exception e) { + LOG.warn("Unable to parse time string '" + timestamp + "' using format " + DATE_TIME_FORMAT, e); + } + + return null; + } + + private static String getLeader(ConfigMap configMap, String group) { + return getConfigMapValue(configMap, LEADER_PREFIX + group); + } + + private static String getConfigMapValue(ConfigMap configMap, String key) { + if (configMap == null || configMap.getData() == null) { + return null; + } + return configMap.getData().get(key); + } + + private static String formatDate(Date date) { + if (date == null) { + return null; + } + try { + return new SimpleDateFormat(DATE_TIME_FORMAT).format(date); + } catch (Exception e) { + LOG.warn("Unable to format date '" + date + "' using format " + DATE_TIME_FORMAT, e); + } + + return null; + } + +} diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/NativeLeaseResourceManager.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/NativeLeaseResourceManager.java new file mode 100644 index 0000000..1400208 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/NativeLeaseResourceManager.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.cluster.lock.impl; + +import java.time.ZonedDateTime; +import java.util.Date; +import java.util.Set; + +import io.fabric8.kubernetes.api.model.coordination.v1.Lease; +import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.camel.component.kubernetes.cluster.lock.KubernetesLeaseResourceManager; +import org.apache.camel.component.kubernetes.cluster.lock.LeaderInfo; + +public class NativeLeaseResourceManager implements KubernetesLeaseResourceManager<Lease> { + + @Override + public LeaderInfo decodeLeaderInfo(Lease lease, Set<String> members, String group) { + return new LeaderInfo(group, getLeader(lease), getLocalTimestamp(lease), members, getLeaseDurationSeconds(lease)); + } + + @Override + public Lease fetchLeaseResource(KubernetesClient client, String namespace, String name, String group) { + return client.leases() + .inNamespace(namespace) + .withName(leaseResourceName(name, group)).get(); + } + + @Override + public Lease optimisticDeleteLeaderInfo(KubernetesClient client, Lease leaseResource, String group) { + Lease updatedLease = getLeaseWithoutLeader(leaseResource); + return client.leases() + .inNamespace(leaseResource.getMetadata().getNamespace()) + .withName(leaseResource.getMetadata().getName()) + .lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedLease); + } + + @Override + public Lease optimisticAcquireLeadership(KubernetesClient client, Lease leaseResource, LeaderInfo newLeaderInfo) { + Lease updatedLease = getLeaseWithNewLeader(leaseResource, newLeaderInfo); + return client.leases() + .inNamespace(leaseResource.getMetadata().getNamespace()) + .withName(leaseResource.getMetadata().getName()) + .lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedLease); + } + + @Override + public Lease refreshLeaseRenewTime(KubernetesClient client, Lease leaseResource, int minUpdateIntervalSeconds) { + ZonedDateTime lastRenew = leaseResource.getSpec() != null ? leaseResource.getSpec().getRenewTime() : null; + if (lastRenew == null || lastRenew.plusSeconds(minUpdateIntervalSeconds).isBefore(ZonedDateTime.now())) { + Lease updatedLease = new LeaseBuilder(leaseResource) + .editOrNewSpec() + .withRenewTime(ZonedDateTime.now()) + .endSpec() + .build(); + return client.leases() + .inNamespace(leaseResource.getMetadata().getNamespace()) + .withName(leaseResource.getMetadata().getName()) + .lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedLease); + } + return leaseResource; + } + + @Override + public Lease createNewLeaseResource(KubernetesClient client, String namespace, String prefix, LeaderInfo leaderInfo) { + ZonedDateTime now = ZonedDateTime.now(); + Lease newLease = new LeaseBuilder().withNewMetadata() + .withName(leaseResourceName(prefix, leaderInfo.getGroupName())) + .addToLabels("provider", "camel") + .endMetadata() + .withNewSpec() + .withNewHolderIdentity(leaderInfo.getLeader()) + .withAcquireTime(now) + .withLeaseDurationSeconds(leaderInfo.getLeaseDurationSeconds()) + .withRenewTime(now) + .endSpec() + .build(); + + return client.leases() + .inNamespace(namespace) + .create(newLease); + } + + private static Lease getLeaseWithNewLeader(Lease lease, LeaderInfo leaderInfo) { + Integer transitions = lease.getSpec() != null ? lease.getSpec().getLeaseTransitions() : null; + if (transitions == null) { + transitions = 0; + } + ZonedDateTime now = ZonedDateTime.now(); + return new LeaseBuilder(lease) + .editOrNewSpec() + .withNewHolderIdentity(leaderInfo.getLeader()) + .withAcquireTime(now) + .withLeaseDurationSeconds(leaderInfo.getLeaseDurationSeconds()) + .withRenewTime(now) + .withLeaseTransitions(transitions + 1) + .endSpec() + .build(); + } + + private static Lease getLeaseWithoutLeader(Lease lease) { + return new LeaseBuilder(lease).editOrNewSpec() + .withHolderIdentity(null) + .withAcquireTime(null) + .withRenewTime(null) + .withLeaseDurationSeconds(null) + .endSpec() + .build(); + } + + private static Date getLocalTimestamp(Lease lease) { + if (lease == null || lease.getSpec() == null || lease.getSpec().getAcquireTime() == null) { + return null; + } + return Date.from(lease.getSpec().getAcquireTime().toInstant()); + } + + private static Integer getLeaseDurationSeconds(Lease lease) { + if (lease == null || lease.getSpec() == null) { + return null; + } + return lease.getSpec().getLeaseDurationSeconds(); + } + + private static String getLeader(Lease lease) { + if (lease == null || lease.getSpec() == null) { + return null; + } + return lease.getSpec().getHolderIdentity(); + } + + private static String leaseResourceName(String prefix, String group) { + return toValidKubernetesID(prefix + "-" + group); + } + + private static String toValidKubernetesID(String id) { + id = id.toLowerCase().replaceAll("[^a-z0-9-.]", "-"); + while (id.length() > 0 && isNonAlphanumeric(id, 0)) { + id = id.substring(1); + } + while (id.length() > 0 && isNonAlphanumeric(id, id.length() - 1)) { + id = id.substring(0, id.length() - 1); + } + return id; + } + + private static boolean isNonAlphanumeric(String id, int pos) { + return !Character.isAlphabetic(id.charAt(pos)) && !Character.isDigit(id.charAt(pos)); + } +} diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java index e58d4d1..cdaedc9 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java @@ -16,25 +16,40 @@ */ package org.apache.camel.component.kubernetes.cluster; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder; +import org.apache.camel.cluster.CamelPreemptiveClusterService; import org.apache.camel.component.kubernetes.KubernetesConfiguration; import org.apache.camel.component.kubernetes.cluster.utils.ConfigMapLockSimulator; import org.apache.camel.component.kubernetes.cluster.utils.LeaderRecorder; +import org.apache.camel.component.kubernetes.cluster.utils.LeaseLockSimulator; import org.apache.camel.component.kubernetes.cluster.utils.LockTestServer; +import org.apache.camel.component.kubernetes.cluster.utils.ResourceLockSimulator; +import org.apache.camel.support.cluster.RebalancingCamelClusterService; import org.apache.camel.test.junit5.CamelTestSupport; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -52,31 +67,33 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { private static final int RETRY_PERIOD_MILLIS = 200; private static final double JITTER_FACTOR = 1.1; - private ConfigMapLockSimulator lockSimulator; + private ConfigMapLockSimulator configMapLockSimulator; + private Map<String, LeaseLockSimulator> leaseLockSimulators = new HashMap<>(); - private Map<String, LockTestServer> lockServers; + private Map<String, LockTestServer<?>> lockServers = new HashMap<>(); - @BeforeEach - public void prepareLock() { - this.lockSimulator = new ConfigMapLockSimulator("leaders"); - this.lockServers = new HashMap<>(); - } + private Map<String, CamelPreemptiveClusterService> clusterServices = new HashMap<>(); @AfterEach public void shutdownLock() { - for (LockTestServer server : this.lockServers.values()) { + for (LockTestServer<?> server : this.lockServers.values()) { try { server.destroy(); } catch (Exception e) { // can happen in case of delay } } + this.lockServers = new HashMap<>(); + configMapLockSimulator = null; + leaseLockSimulators = new HashMap<>(); + clusterServices = new HashMap<>(); } - @Test - public void testSimpleLeaderElection() throws Exception { - LeaderRecorder mypod1 = addMember("mypod1"); - LeaderRecorder mypod2 = addMember("mypod2"); + @ParameterizedTest + @EnumSource(LeaseResourceType.class) + public void testSimpleLeaderElection(LeaseResourceType type) throws Exception { + LeaderRecorder mypod1 = addMember("mypod1", type); + LeaderRecorder mypod2 = addMember("mypod2", type); context.start(); mypod1.waitForAnyLeader(5, TimeUnit.SECONDS); @@ -88,11 +105,12 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { assertEquals(mypod2.getCurrentLeader(), leader, "Leaders should be equals"); } - @Test - public void testMultipleMembersLeaderElection() throws Exception { + @ParameterizedTest + @EnumSource(LeaseResourceType.class) + public void testMultipleMembersLeaderElection(LeaseResourceType type) throws Exception { int number = 5; List<LeaderRecorder> members - = IntStream.range(0, number).mapToObj(i -> addMember("mypod" + i)).collect(Collectors.toList()); + = IntStream.range(0, number).mapToObj(i -> addMember("mypod" + i, type)).collect(Collectors.toList()); context.start(); for (LeaderRecorder member : members) { @@ -107,10 +125,11 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { @Test public void testSimpleLeaderElectionWithExistingConfigMap() throws Exception { - lockSimulator.setConfigMap(new ConfigMapBuilder().withNewMetadata().withName("leaders").and().build(), true); + this.configMapLockSimulator = new ConfigMapLockSimulator("leaders"); + configMapLockSimulator.setResource(new ConfigMapBuilder().withNewMetadata().withName("leaders").and().build(), true); - LeaderRecorder mypod1 = addMember("mypod1"); - LeaderRecorder mypod2 = addMember("mypod2"); + LeaderRecorder mypod1 = addMember("mypod1", LeaseResourceType.ConfigMap); + LeaderRecorder mypod2 = addMember("mypod2", LeaseResourceType.ConfigMap); context.start(); mypod1.waitForAnyLeader(10, TimeUnit.SECONDS); @@ -122,9 +141,31 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { } @Test - public void testLeadershipLoss() throws Exception { - LeaderRecorder mypod1 = addMember("mypod1"); - LeaderRecorder mypod2 = addMember("mypod2"); + public void testSimpleLeaderElectionWithExistingLeases() throws Exception { + LeaseLockSimulator simulator = new LeaseLockSimulator("leaders-mygroup"); + simulator.setResource(new LeaseBuilder() + .withNewMetadata().withName("leaders-mygroup") + .and() + .build(), true); + this.leaseLockSimulators.put("mygroup", simulator); + + LeaderRecorder mypod1 = addMember("mypod1", "mygroup", LeaseResourceType.Lease); + LeaderRecorder mypod2 = addMember("mypod2", "mygroup", LeaseResourceType.Lease); + context.start(); + + mypod1.waitForAnyLeader(10, TimeUnit.SECONDS); + mypod2.waitForAnyLeader(10, TimeUnit.SECONDS); + + String leader = mypod1.getCurrentLeader(); + assertTrue(leader.startsWith("mypod")); + assertEquals(mypod2.getCurrentLeader(), leader, "Leaders should be equals"); + } + + @ParameterizedTest + @EnumSource(LeaseResourceType.class) + public void testLeadershipLoss(LeaseResourceType type) throws Exception { + LeaderRecorder mypod1 = addMember("mypod1", type); + LeaderRecorder mypod2 = addMember("mypod2", type); context.start(); mypod1.waitForAnyLeader(5, TimeUnit.SECONDS); @@ -152,10 +193,11 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { checkLeadershipChangeDistance((LEASE_TIME_MILLIS - RENEW_DEADLINE_MILLIS) / 2, TimeUnit.MILLISECONDS, mypod1, mypod2); } - @Test - public void testSlowLeaderLosingLeadershipOnlyInternally() throws Exception { - LeaderRecorder mypod1 = addMember("mypod1"); - LeaderRecorder mypod2 = addMember("mypod2"); + @ParameterizedTest + @EnumSource(LeaseResourceType.class) + public void testSlowLeaderLosingLeadershipOnlyInternally(LeaseResourceType type) throws Exception { + LeaderRecorder mypod1 = addMember("mypod1", type); + LeaderRecorder mypod2 = addMember("mypod2", type); context.start(); mypod1.waitForAnyLeader(5, TimeUnit.SECONDS); @@ -173,10 +215,11 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { assertEquals(firstLeader, formerLoserRecorder.getCurrentLeader()); } - @Test - public void testRecoveryAfterFailure() throws Exception { - LeaderRecorder mypod1 = addMember("mypod1"); - LeaderRecorder mypod2 = addMember("mypod2"); + @ParameterizedTest + @EnumSource(LeaseResourceType.class) + public void testRecoveryAfterFailure(LeaseResourceType type) throws Exception { + LeaderRecorder mypod1 = addMember("mypod1", type); + LeaderRecorder mypod2 = addMember("mypod2", type); context.start(); mypod1.waitForAnyLeader(5, TimeUnit.SECONDS); @@ -197,10 +240,10 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { @Test public void testSharedConfigMap() throws Exception { - LeaderRecorder a1 = addMember("a1"); - LeaderRecorder a2 = addMember("a2"); - LeaderRecorder b1 = addMember("b1", "app2"); - LeaderRecorder b2 = addMember("b2", "app2"); + LeaderRecorder a1 = addMember("a1", LeaseResourceType.ConfigMap); + LeaderRecorder a2 = addMember("a2", LeaseResourceType.ConfigMap); + LeaderRecorder b1 = addMember("b1", "app2", LeaseResourceType.ConfigMap); + LeaderRecorder b2 = addMember("b2", "app2", LeaseResourceType.ConfigMap); context.start(); a1.waitForAnyLeader(5, TimeUnit.SECONDS); @@ -218,33 +261,110 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { assertNotEquals(a1.getCurrentLeader(), b2.getCurrentLeader()); } + static Stream<Arguments> rebalancingProvider() { + return Stream.of( + // LeaseResourceType, pods, partitions, expected partitions owned, tolerance on owned partitions + Arguments.of(LeaseResourceType.Lease, 4, 2, 0, 1), + Arguments.of(LeaseResourceType.Lease, 1, 2, 2, 0), + Arguments.of(LeaseResourceType.Lease, 2, 2, 1, 0), + Arguments.of(LeaseResourceType.ConfigMap, 3, 10, 3, 1), + Arguments.of(LeaseResourceType.Lease, 3, 10, 3, 1), + Arguments.of(LeaseResourceType.ConfigMap, 6, 23, 3, 1), + Arguments.of(LeaseResourceType.Lease, 6, 23, 3, 1)); + } + + @ParameterizedTest + @MethodSource("rebalancingProvider") + public void testRebalancing(LeaseResourceType type, int pods, int partitions, int expectedPartitionsPerPod, int tolerance) + throws Exception { + Map<String, List<LeaderRecorder>> recorders = createCluster(type, pods, partitions); + context.start(); + + waitForAllLeaders(recorders, leaders -> { + Map<String, Long> counts = leaders.values().stream() + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + + for (Long count : counts.values()) { + if (count < expectedPartitionsPerPod || count > expectedPartitionsPerPod + tolerance) { + return false; + } + } + return true; + }, 30, TimeUnit.SECONDS); + } + + private Map<String, List<LeaderRecorder>> createCluster(LeaseResourceType type, int pods, int partitions) { + Map<String, List<LeaderRecorder>> recorders = new HashMap<>(); + for (int i = 0; i < partitions; i++) { + String partitionName = "partition-" + i; + recorders.put(partitionName, new ArrayList<>()); + for (int j = 0; j < pods; j++) { + recorders.get(partitionName).add(addMember("mypod-" + j, partitionName, type, true)); + } + } + return recorders; + } + + private void waitForAllLeaders( + Map<String, List<LeaderRecorder>> partitionRecorders, + Predicate<Map<String, String>> condition, long time, TimeUnit unit) { + Awaitility.waitAtMost(time, unit).until(() -> { + Map<String, String> leaders = new HashMap<>(); + for (String partition : partitionRecorders.keySet()) { + String leader = null; + for (LeaderRecorder recorder : partitionRecorders.get(partition)) { + String partitionLeader = recorder.getCurrentLeader(); + if (partitionLeader == null || (leader != null && !leader.equals(partitionLeader))) { + return false; + } + leader = partitionLeader; + } + if (leader == null) { + return false; + } + leaders.put(partition, leader); + } + return condition.test(leaders); + }); + } + + private void withLockServer(String pod, Consumer<LockTestServer<?>> consumer) { + consumer.accept(this.lockServers.get(pod)); + } + private void delayRequestsFromPod(String pod, long delay, TimeUnit unit) { - this.lockServers.get(pod).setDelayRequests(TimeUnit.MILLISECONDS.convert(delay, unit)); + withLockServer(pod, server -> server.setDelayRequests(TimeUnit.MILLISECONDS.convert(delay, unit))); } private void refuseRequestsFromPod(String pod) { - this.lockServers.get(pod).setRefuseRequests(true); + withLockServer(pod, server -> server.setRefuseRequests(true)); } private void allowRequestsFromPod(String pod) { - this.lockServers.get(pod).setRefuseRequests(false); + withLockServer(pod, server -> server.setRefuseRequests(false)); } private void disconnectPod(String pod) { - for (LockTestServer server : this.lockServers.values()) { + for (LockTestServer<?> server : this.lockServers.values()) { server.removePod(pod); } } private void connectPod(String pod) { - for (LockTestServer server : this.lockServers.values()) { + for (LockTestServer<?> server : this.lockServers.values()) { server.addPod(pod); } } + private void connectSimulator(ResourceLockSimulator<?> lockSimulator) { + for (LockTestServer<?> server : this.lockServers.values()) { + server.addSimulator(lockSimulator); + } + } + private void checkLeadershipChangeDistance(long minimum, TimeUnit unit, LeaderRecorder... recorders) { List<LeaderRecorder.LeadershipInfo> infos = Arrays.stream(recorders).flatMap(lr -> lr.getLeadershipInfo().stream()) - .sorted((li1, li2) -> Long.compare(li1.getChangeTimestamp(), li2.getChangeTimestamp())) + .sorted(Comparator.comparingLong(LeaderRecorder.LeadershipInfo::getChangeTimestamp)) .collect(Collectors.toList()); LeaderRecorder.LeadershipInfo currentLeaderLastSeen = null; @@ -266,30 +386,69 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { } } - private LeaderRecorder addMember(String name) { - return addMember(name, "app"); + private LeaderRecorder addMember(String name, LeaseResourceType type) { + return addMember(name, "app", type); } - private LeaderRecorder addMember(String name, String namespace) { - assertNull(this.lockServers.get(name)); + private LeaderRecorder addMember(String name, String namespace, LeaseResourceType type) { + return addMember(name, namespace, type, false); + } - LockTestServer lockServer = new LockTestServer(lockSimulator); - this.lockServers.put(name, lockServer); + private LeaderRecorder addMember(String name, String namespace, LeaseResourceType type, boolean rebalancing) { + ResourceLockSimulator<?> lockSimulator; + switch (type) { + case ConfigMap: + if (this.configMapLockSimulator == null) { + this.configMapLockSimulator = new ConfigMapLockSimulator("leaders"); + } + lockSimulator = this.configMapLockSimulator; + break; + case Lease: + if (!this.leaseLockSimulators.containsKey(namespace)) { + this.leaseLockSimulators.put(namespace, new LeaseLockSimulator("leaders-" + namespace)); + } + lockSimulator = this.leaseLockSimulators.get(namespace); + break; + default: + throw new IllegalArgumentException("Unsupported LeaseResourceType " + type); + } - KubernetesConfiguration configuration = new KubernetesConfiguration(); - configuration.setKubernetesClient(lockServer.createClient()); + if (!this.lockServers.containsKey(name)) { + this.lockServers.put(name, new LockTestServer<>()); + } + LockTestServer<?> lockServer = this.lockServers.get(name); + + CamelPreemptiveClusterService member = clusterServices.get(name); + if (member == null) { + KubernetesConfiguration configuration = new KubernetesConfiguration(); + configuration.setKubernetesClient(lockServer.createClient()); + + KubernetesClusterService service = new KubernetesClusterService(configuration); + service.setKubernetesNamespace("test"); + service.setPodName(name); + service.setLeaseDurationMillis(LEASE_TIME_MILLIS); + service.setRenewDeadlineMillis(RENEW_DEADLINE_MILLIS); + service.setRetryPeriodMillis(RETRY_PERIOD_MILLIS); + service.setJitterFactor(JITTER_FACTOR); + service.setLeaseResourceType(type); + + if (rebalancing) { + member = new RebalancingCamelClusterService(service, RETRY_PERIOD_MILLIS); + } else { + member = service; + } + + try { + context().addService(member); + } catch (Exception ex) { + throw new RuntimeException(ex); + } - KubernetesClusterService member = new KubernetesClusterService(configuration); - member.setKubernetesNamespace("test"); - member.setPodName(name); - member.setLeaseDurationMillis(LEASE_TIME_MILLIS); - member.setRenewDeadlineMillis(RENEW_DEADLINE_MILLIS); - member.setRetryPeriodMillis(RETRY_PERIOD_MILLIS); - member.setJitterFactor(JITTER_FACTOR); + clusterServices.put(name, member); + } LeaderRecorder recorder = new LeaderRecorder(); try { - context().addService(member); member.getView(namespace).addEventListener(recorder); } catch (Exception ex) { throw new RuntimeException(ex); @@ -297,7 +456,9 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { for (String pod : this.lockServers.keySet()) { connectPod(pod); + connectSimulator(lockSimulator); } + return recorder; } diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java index f4e6e21..66dbe31 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java @@ -18,63 +18,39 @@ package org.apache.camel.component.kubernetes.cluster.utils; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Central lock for testing leader election. + * Central lock for testing leader election based on ConfigMap. */ -public class ConfigMapLockSimulator { +public class ConfigMapLockSimulator extends ResourceLockSimulator<ConfigMap> { - private static final Logger LOG = LoggerFactory.getLogger(ConfigMapLockSimulator.class); - - private String configMapName; - - private ConfigMap currentMap; - - private long versionCounter = 1000000; - - public ConfigMapLockSimulator(String configMapName) { - this.configMapName = configMapName; + public ConfigMapLockSimulator(String resourceName) { + super(resourceName); } - public String getConfigMapName() { - return configMapName; + @Override + protected ConfigMap withNewResourceVersion(ConfigMap resource, String newResourceVersion) { + return new ConfigMapBuilder(resource).editOrNewMetadata().withResourceVersion(newResourceVersion) + .endMetadata().build(); } - public synchronized boolean setConfigMap(ConfigMap map, boolean insert) { - // Insert - if (insert && currentMap != null) { - LOG.error("Current map should have been null"); - return false; - } - - // Update - if (!insert && currentMap == null) { - LOG.error("Current map should not have been null"); - return false; - } - String version = map.getMetadata() != null ? map.getMetadata().getResourceVersion() : null; - if (version != null) { - long versionLong = Long.parseLong(version); - if (versionLong != versionCounter) { - LOG.warn("Current resource version is {} while the update is related to version {}", versionCounter, - versionLong); - return false; - } - } - - this.currentMap = new ConfigMapBuilder(map).editOrNewMetadata().withResourceVersion(String.valueOf(++versionCounter)) - .endMetadata().build(); - return true; + @Override + protected ConfigMap copyOf(ConfigMap resource) { + return new ConfigMapBuilder(resource).build(); } - public synchronized ConfigMap getConfigMap() { - if (currentMap == null) { - return null; - } + @Override + public String getResourcePath() { + return "configmaps"; + } - return new ConfigMapBuilder(currentMap).build(); + @Override + public String getAPIPath() { + return "/api/v1"; } + @Override + public Class<ConfigMap> getResourceClass() { + return ConfigMap.class; + } } diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LeaseLockSimulator.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LeaseLockSimulator.java new file mode 100644 index 0000000..3d99656 --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LeaseLockSimulator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.cluster.utils; + +import io.fabric8.kubernetes.api.model.coordination.v1.Lease; +import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder; + +/** + * Central lock for testing leader election based on Lease. + */ +public class LeaseLockSimulator extends ResourceLockSimulator<Lease> { + + public LeaseLockSimulator(String resourceName) { + super(resourceName); + } + + @Override + protected Lease withNewResourceVersion(Lease resource, String newResourceVersion) { + return new LeaseBuilder(resource).editOrNewMetadata().withResourceVersion(newResourceVersion) + .endMetadata().build(); + } + + @Override + protected Lease copyOf(Lease resource) { + return new LeaseBuilder(resource).build(); + } + + @Override + public String getResourcePath() { + return "leases"; + } + + @Override + public String getAPIPath() { + return "/apis/coordination.k8s.io/v1"; + } + + @Override + public Class<Lease> getResourceClass() { + return Lease.class; + } +} diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java index bd14acc..1a5bd7a 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java @@ -19,12 +19,15 @@ package org.apache.camel.component.kubernetes.cluster.utils; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; -import io.fabric8.kubernetes.api.model.ConfigMap; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodListBuilder; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; @@ -37,7 +40,7 @@ import org.slf4j.LoggerFactory; /** * A Test server to interact with Kubernetes for locking on a ConfigMap. */ -public class LockTestServer extends KubernetesMockServer { +public class LockTestServer<T extends HasMetadata> extends KubernetesMockServer { private static final Logger LOG = LoggerFactory.getLogger(LockTestServer.class); @@ -47,15 +50,106 @@ public class LockTestServer extends KubernetesMockServer { private Set<String> pods; - public LockTestServer(ConfigMapLockSimulator lockSimulator) { - this(lockSimulator, Collections.emptySet()); + private Map<String, ResourceLockSimulator<T>> simulators; + + public LockTestServer() { + this(Collections.emptySet()); } - public LockTestServer(ConfigMapLockSimulator lockSimulator, Collection<String> initialPods) { + public LockTestServer(Collection<String> initialPods) { this.pods = new TreeSet<>(initialPods); + this.simulators = new HashMap<>(); + + // Other resources + expect().get().withPath("/api/v1/namespaces/test/pods") + .andReply(200, + request -> new PodListBuilder().withNewMetadata().withResourceVersion("1").and() + .withItems(getCurrentPods().stream() + .map(name -> new PodBuilder().withNewMetadata().withName(name).and().build()) + .collect(Collectors.toList())) + .build()) + .always(); - expect().get().withPath("/api/v1/namespaces/test/configmaps/" + lockSimulator.getConfigMapName()) + } + + public void addSimulator(ResourceLockSimulator<?> paramLockSimulator) { + ResourceLockSimulator<T> lockSimulator = (ResourceLockSimulator<T>) paramLockSimulator; + if (this.simulators.containsKey(lockSimulator.getResourceName())) { + return; + } + this.simulators.put(lockSimulator.getResourceName(), lockSimulator); + + if (this.simulators.size() == 1) { + // Global methods defined once + expect().post().withPath(lockSimulator.getAPIPath() + "/namespaces/test/" + lockSimulator.getResourcePath()) + .andReply(new ResponseProvider<Object>() { + + private Headers headers = new Headers.Builder().build(); + private Map<Integer, String> lockNames = new HashMap<>(); + + @Override + public int getStatusCode(RecordedRequest request) { + if (refuseRequests) { + return 500; + } + + T resource; + try { + resource = convert(request, lockSimulator.getResourceClass()); + } catch (Exception e) { + LOG.error("Error during resource conversion", e); + return 500; + } + + if (resource == null) { + LOG.error("No resource received"); + return 500; + } + ResourceLockSimulator<T> lockSimulator = simulators.get(resource.getMetadata().getName()); + if (resource.getMetadata() == null + || !lockSimulator.getResourceName().equals(resource.getMetadata().getName())) { + LOG.error("Illegal resource received"); + return 500; + } + + boolean done = lockSimulator.setResource(resource, true); + if (done) { + lockNames.put(request.getSequenceNumber(), lockSimulator.getResourceName()); + return 201; + } + return 500; + } + + @Override + public Object getBody(RecordedRequest recordedRequest) { + delayIfNecessary(); + + if (lockNames.containsKey(recordedRequest.getSequenceNumber())) { + T resource = simulators.get(lockNames.get(recordedRequest.getSequenceNumber())).getResource(); + if (resource != null) { + return resource; + } + } + + return ""; + } + + @Override + public Headers getHeaders() { + return headers; + } + + @Override + public void setHeaders(Headers headers) { + this.headers = headers; + } + }).always(); + } + + expect().get() + .withPath(lockSimulator.getAPIPath() + "/namespaces/test/" + lockSimulator.getResourcePath() + "/" + + lockSimulator.getResourceName()) .andReply(new ResponseProvider<Object>() { private Headers headers = new Headers.Builder().build(); @@ -66,7 +160,7 @@ public class LockTestServer extends KubernetesMockServer { return 500; } - if (lockSimulator.getConfigMap() != null) { + if (lockSimulator.getResource() != null) { return 200; } @@ -76,9 +170,9 @@ public class LockTestServer extends KubernetesMockServer { @Override public Object getBody(RecordedRequest recordedRequest) { delayIfNecessary(); - ConfigMap map = lockSimulator.getConfigMap(); - if (map != null) { - return map; + T resource = lockSimulator.getResource(); + if (resource != null) { + return resource; } return ""; } @@ -94,53 +188,9 @@ public class LockTestServer extends KubernetesMockServer { } }).always(); - expect().post().withPath("/api/v1/namespaces/test/configmaps").andReply(new ResponseProvider<Object>() { - - private Headers headers = new Headers.Builder().build(); - - @Override - public int getStatusCode(RecordedRequest request) { - if (refuseRequests) { - return 500; - } - - ConfigMap map = convert(request); - if (map == null || map.getMetadata() == null - || !lockSimulator.getConfigMapName().equals(map.getMetadata().getName())) { - throw new IllegalArgumentException("Illegal configMap received"); - } - - boolean done = lockSimulator.setConfigMap(map, true); - if (done) { - return 201; - } - return 500; - } - - @Override - public Object getBody(RecordedRequest recordedRequest) { - delayIfNecessary(); - - ConfigMap map = lockSimulator.getConfigMap(); - if (map != null) { - return map; - } - - return ""; - } - - @Override - public Headers getHeaders() { - return headers; - } - - @Override - public void setHeaders(Headers headers) { - this.headers = headers; - } - }).always(); - - expect().put().withPath("/api/v1/namespaces/test/configmaps/" + lockSimulator.getConfigMapName()) + expect().put() + .withPath(lockSimulator.getAPIPath() + "/namespaces/test/" + lockSimulator.getResourcePath() + "/" + + lockSimulator.getResourceName()) .andReply(new ResponseProvider<Object>() { private Headers headers = new Headers.Builder().build(); @@ -151,9 +201,15 @@ public class LockTestServer extends KubernetesMockServer { return 500; } - ConfigMap map = convert(request); + T resource; + try { + resource = convert(request, lockSimulator.getResourceClass()); + } catch (Exception e) { + LOG.error("Error during resource conversion", e); + return 500; + } - boolean done = lockSimulator.setConfigMap(map, false); + boolean done = lockSimulator.setResource(resource, false); if (done) { return 200; } @@ -163,9 +219,9 @@ public class LockTestServer extends KubernetesMockServer { @Override public Object getBody(RecordedRequest recordedRequest) { delayIfNecessary(); - ConfigMap map = lockSimulator.getConfigMap(); - if (map != null) { - return map; + T resource = lockSimulator.getResource(); + if (resource != null) { + return resource; } return ""; @@ -181,16 +237,6 @@ public class LockTestServer extends KubernetesMockServer { this.headers = headers; } }).always(); - - // Other resources - expect().get().withPath("/api/v1/namespaces/test/pods") - .andReply(200, - request -> new PodListBuilder().withNewMetadata().withResourceVersion("1").and() - .withItems(getCurrentPods().stream() - .map(name -> new PodBuilder().withNewMetadata().withName(name).and().build()) - .collect(Collectors.toList())) - .build()) - .always(); } public boolean isRefuseRequests() { @@ -231,13 +277,9 @@ public class LockTestServer extends KubernetesMockServer { } } - private ConfigMap convert(RecordedRequest request) { - try { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(request.getBody().readByteArray(), ConfigMap.class); - } catch (IOException e) { - throw new IllegalArgumentException("Erroneous data", e); - } + private T convert(RecordedRequest request, Class<T> targetClass) throws IOException { + ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule()); + return mapper.readValue(request.getBody().readByteArray(), targetClass); } } diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ResourceLockSimulator.java similarity index 53% copy from components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java copy to components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ResourceLockSimulator.java index f4e6e21..558f0f8 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ResourceLockSimulator.java @@ -16,45 +16,44 @@ */ package org.apache.camel.component.kubernetes.cluster.utils; -import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Central lock for testing leader election. */ -public class ConfigMapLockSimulator { +public abstract class ResourceLockSimulator<T extends HasMetadata> { - private static final Logger LOG = LoggerFactory.getLogger(ConfigMapLockSimulator.class); + private static final Logger LOG = LoggerFactory.getLogger(ResourceLockSimulator.class); - private String configMapName; + private String resourceName; - private ConfigMap currentMap; + private T currentResource; private long versionCounter = 1000000; - public ConfigMapLockSimulator(String configMapName) { - this.configMapName = configMapName; + public ResourceLockSimulator(String resourceName) { + this.resourceName = resourceName; } - public String getConfigMapName() { - return configMapName; + public String getResourceName() { + return resourceName; } - public synchronized boolean setConfigMap(ConfigMap map, boolean insert) { + public synchronized boolean setResource(T resource, boolean insert) { // Insert - if (insert && currentMap != null) { - LOG.error("Current map should have been null"); + if (insert && currentResource != null) { + LOG.error("Current resource should have been null"); return false; } // Update - if (!insert && currentMap == null) { - LOG.error("Current map should not have been null"); + if (!insert && currentResource == null) { + LOG.error("Current resource should not have been null"); return false; } - String version = map.getMetadata() != null ? map.getMetadata().getResourceVersion() : null; + String version = resource.getMetadata() != null ? resource.getMetadata().getResourceVersion() : null; if (version != null) { long versionLong = Long.parseLong(version); if (versionLong != versionCounter) { @@ -64,17 +63,26 @@ public class ConfigMapLockSimulator { } } - this.currentMap = new ConfigMapBuilder(map).editOrNewMetadata().withResourceVersion(String.valueOf(++versionCounter)) - .endMetadata().build(); + this.currentResource = withNewResourceVersion(resource, String.valueOf(++versionCounter)); return true; } - public synchronized ConfigMap getConfigMap() { - if (currentMap == null) { + public synchronized T getResource() { + if (currentResource == null) { return null; } - return new ConfigMapBuilder(currentMap).build(); + return copyOf(currentResource); } + protected abstract T withNewResourceVersion(T resource, String newResourceVersion); + + protected abstract T copyOf(T resource); + + public abstract String getAPIPath(); + + public abstract String getResourcePath(); + + public abstract Class<T> getResourceClass(); + } diff --git a/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterService.java b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterService.java new file mode 100644 index 0000000..652ba67 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterService.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.cluster; + +/** + * A {@link CamelPreemptiveClusterService} is a {@link CamelClusterService} that manages + * {@link CamelPreemptiveClusterView}s. + */ +public interface CamelPreemptiveClusterService extends CamelClusterService { + + @Override + CamelPreemptiveClusterView getView(String namespace) throws Exception; + +} diff --git a/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterView.java b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterView.java new file mode 100644 index 0000000..c8a2136 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterView.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.cluster; + +/** + * A {@link CamelPreemptiveClusterView} is a {@link CamelClusterView} that can be externally disabled by another + * controller. + */ +public interface CamelPreemptiveClusterView extends CamelClusterView { + + /** + * Enable or disables a view. + */ + void setDisabled(boolean disabled); + + /** + * Check if the view is disabled. + */ + boolean isDisabled(); + +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cluster/RebalancingCamelClusterService.java b/core/camel-support/src/main/java/org/apache/camel/support/cluster/RebalancingCamelClusterService.java new file mode 100644 index 0000000..372734f --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/cluster/RebalancingCamelClusterService.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support.cluster; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.camel.CamelContext; +import org.apache.camel.cluster.CamelClusterMember; +import org.apache.camel.cluster.CamelClusterView; +import org.apache.camel.cluster.CamelPreemptiveClusterService; +import org.apache.camel.cluster.CamelPreemptiveClusterView; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link RebalancingCamelClusterService} adds rebalancing capabilities to an underlying + * {@link CamelPreemptiveClusterService}. Each view is treated as a partition by this cluster service and it makes sure + * that all services belonging to the cluster own a balanced number of partitions (same number or difference at most 1 + * when not possible). + */ +public class RebalancingCamelClusterService implements CamelPreemptiveClusterService { + + private static final Logger LOG = LoggerFactory.getLogger(RebalancingCamelClusterService.class); + + protected ScheduledExecutorService serializedExecutor; + + protected CamelPreemptiveClusterService delegate; + + protected CamelContext camelContext; + + protected long periodMillis; + + public RebalancingCamelClusterService(CamelPreemptiveClusterService delegate, long periodMillis) { + this.delegate = ObjectHelper.notNull(delegate, "delegate"); + this.periodMillis = periodMillis; + } + + public RebalancingCamelClusterService(CamelContext camelContext, CamelPreemptiveClusterService delegate, + long periodMillis) { + this.camelContext = ObjectHelper.notNull(camelContext, "camelContext"); + this.delegate = ObjectHelper.notNull(delegate, "delegate"); + this.periodMillis = periodMillis; + } + + @Override + public void start() { + delegate.start(); + if (serializedExecutor == null) { + serializedExecutor = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, + "RebalancingClusterService"); + serializedExecutor.execute(this::reconcile); + } + } + + @Override + public void stop() { + if (serializedExecutor != null) { + serializedExecutor.shutdownNow(); + } + serializedExecutor = null; + + delegate.stop(); + } + + public CamelPreemptiveClusterService getDelegate() { + return delegate; + } + + public long getPeriodMillis() { + return periodMillis; + } + + public void setDelegate(CamelPreemptiveClusterService delegate) { + this.delegate = delegate; + } + + protected void reconcile() { + Integer n = members(); + List<String> partitions = partitionList(); + int k = partitions.size(); + + if (n == null || n == 0 || k == 0) { + rescheduleAfterDelay(); + return; + } + + int threshold = 0; + while (threshold <= k) { + threshold += n; + } + threshold -= n; + + int quota = threshold / n; + + List<String> main = new ArrayList<>(); + List<String> remaining = new ArrayList<>(); + for (int i = 0; i < threshold; i++) { + main.add(partitions.get(i)); + } + for (int i = threshold; i < partitions.size(); i++) { + remaining.add(partitions.get(i)); + } + + rebalanceGroup(main, quota); + rebalanceGroup(remaining, 1); + rescheduleAfterDelay(); + } + + protected void rebalanceGroup(List<String> partitions, int quota) { + List<String> owned = owned(partitions); + if (owned == null) { + return; + } + + if (owned.size() < quota) { + // Open all (to let the controller choose which ones) + for (String partition : partitions) { + setDisabled(partition, false); + } + } else if (owned.size() > quota) { + for (int i = 0; i < owned.size() - quota; i++) { + setDisabled(owned.get(i), true); + } + } else { + // We're fine, but we prevent this instance from stealing locks that are not needed + Set<String> ownedSet = new HashSet<>(owned); + for (String partition : partitions) { + if (!ownedSet.contains(partition)) { + setDisabled(partition, true); + } + } + } + } + + protected void setDisabled(String partition, boolean disabled) { + try { + LOG.debug("Setting partition {} to disabled={}...", partition, disabled); + CamelPreemptiveClusterView view = delegate.getView(partition); + if (view.isDisabled() != disabled) { + view.setDisabled(disabled); + } + } catch (Exception ex) { + LOG.warn("Could not get view " + partition, ex); + } + } + + protected List<String> owned(List<String> partitions) { + List<String> owned = new ArrayList<>(partitions.size()); + for (String partition : partitions) { + try { + CamelPreemptiveClusterView view = delegate.getView(partition); + if (!view.isDisabled() && view.getLocalMember().isLeader()) { + owned.add(partition); + } + } catch (Exception ex) { + LOG.warn("Could not get view " + partition, ex); + return null; + } + } + return owned; + } + + protected List<String> partitionList() { + ArrayList<String> partitions = new ArrayList<>(this.getNamespaces()); + Collections.sort(partitions); + return partitions; + } + + protected Integer members() { + Set<String> members = null; + for (String group : this.getNamespaces()) { + try { + CamelPreemptiveClusterView view = delegate.getView(group); + Set<String> viewMembers = view.getMembers().stream().map(CamelClusterMember::getId).collect(Collectors.toSet()); + if (members != null && !members.equals(viewMembers)) { + LOG.debug("View members don't match: {} vs {}", members, viewMembers); + return null; + } + members = viewMembers; + } catch (Exception ex) { + LOG.warn("Could not get view " + group, ex); + return null; + } + } + return members != null ? members.size() : 0; + } + + private void rescheduleAfterDelay() { + this.serializedExecutor.schedule(this::reconcile, + this.periodMillis, + TimeUnit.MILLISECONDS); + } + + @Override + public CamelPreemptiveClusterView getView(String namespace) throws Exception { + return delegate.getView(namespace); + } + + @Override + public void releaseView(CamelClusterView view) throws Exception { + delegate.releaseView(view); + } + + @Override + public Collection<String> getNamespaces() { + return delegate.getNamespaces(); + } + + @Override + public void startView(String namespace) throws Exception { + delegate.startView(namespace); + } + + @Override + public void stopView(String namespace) throws Exception { + delegate.stopView(namespace); + } + + @Override + public boolean isLeader(String namespace) { + return delegate.isLeader(namespace); + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + delegate.setCamelContext(camelContext); + } + + @Override + public CamelContext getCamelContext() { + return this.camelContext; + } + + @Override + public void setId(String id) { + delegate.setId(id); + } + + @Override + public String getId() { + return delegate.getId(); + } +}