This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cc227b  Rebalancing cluster service
8cc227b is described below

commit 8cc227b65b0e5808cb4db7ea30ffcb2cb4e9cdfe
Author: nicolaferraro <ni.ferr...@gmail.com>
AuthorDate: Mon Nov 23 17:32:23 2020 +0100

    Rebalancing cluster service
---
 components/camel-kubernetes/pom.xml                |   5 +
 .../cluster/KubernetesClusterService.java          |  54 +++-
 .../kubernetes/cluster/KubernetesClusterView.java  |  18 +-
 .../kubernetes/cluster/LeaseResourceType.java      |  29 +++
 .../cluster/lock/ConfigMapLockUtils.java           | 101 --------
 .../lock/KubernetesLeadershipController.java       | 229 +++++++++++++----
 .../lock/KubernetesLeaseResourceManager.java       |  79 ++++++
 .../cluster/lock/KubernetesLockConfiguration.java  |  61 ++++-
 .../kubernetes/cluster/lock/LeaderInfo.java        |  14 +-
 .../cluster/lock/TimedLeaderNotifier.java          |  16 +-
 .../lock/impl/ConfigMapLeaseResourceManager.java   | 144 +++++++++++
 .../lock/impl/NativeLeaseResourceManager.java      | 164 +++++++++++++
 .../cluster/KubernetesClusterServiceTest.java      | 273 ++++++++++++++++-----
 .../cluster/utils/ConfigMapLockSimulator.java      |  68 ++---
 .../cluster/utils/LeaseLockSimulator.java          |  56 +++++
 .../kubernetes/cluster/utils/LockTestServer.java   | 200 +++++++++------
 ...ckSimulator.java => ResourceLockSimulator.java} |  50 ++--
 .../cluster/CamelPreemptiveClusterService.java     |  28 +++
 .../camel/cluster/CamelPreemptiveClusterView.java  |  35 +++
 .../cluster/RebalancingCamelClusterService.java    | 267 ++++++++++++++++++++
 20 files changed, 1506 insertions(+), 385 deletions(-)

diff --git a/components/camel-kubernetes/pom.xml 
b/components/camel-kubernetes/pom.xml
index 6da12a5..01e48c5 100644
--- a/components/camel-kubernetes/pom.xml
+++ b/components/camel-kubernetes/pom.xml
@@ -111,6 +111,11 @@
             <version>${commons-codec-version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
         <!-- logging -->
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java
index 9b24b7e..d9e1ace 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.cluster.CamelPreemptiveClusterService;
 import org.apache.camel.component.kubernetes.KubernetesConfiguration;
 import 
org.apache.camel.component.kubernetes.cluster.lock.KubernetesLockConfiguration;
 import org.apache.camel.support.cluster.AbstractCamelClusterService;
@@ -29,11 +30,12 @@ import org.apache.camel.util.ObjectHelper;
 /**
  * A Kubernetes based cluster service leveraging Kubernetes optimistic locks 
on resources (specifically ConfigMaps).
  */
-public class KubernetesClusterService extends 
AbstractCamelClusterService<KubernetesClusterView> {
+public class KubernetesClusterService extends 
AbstractCamelClusterService<KubernetesClusterView>
+        implements CamelPreemptiveClusterService {
 
-    private KubernetesConfiguration configuration;
+    protected KubernetesConfiguration configuration;
 
-    private KubernetesLockConfiguration lockConfiguration;
+    protected KubernetesLockConfiguration lockConfiguration;
 
     public KubernetesClusterService() {
         this.configuration = new KubernetesConfiguration();
@@ -58,6 +60,11 @@ public class KubernetesClusterService extends 
AbstractCamelClusterService<Kubern
         return new KubernetesClusterView(getCamelContext(), this, config, 
lockConfig);
     }
 
+    @Override
+    public KubernetesClusterView getView(String namespace) throws Exception {
+        return (KubernetesClusterView) super.getView(namespace);
+    }
+
     protected KubernetesConfiguration setConfigDefaults(
             KubernetesConfiguration configuration, KubernetesLockConfiguration 
lockConfiguration) {
         if (configuration.getConnectionTimeout() == null) {
@@ -106,8 +113,8 @@ public class KubernetesClusterService extends 
AbstractCamelClusterService<Kubern
         }
         if (config.getLeaseDurationMillis() <= 
config.getRenewDeadlineMillis()) {
             throw new IllegalStateException(
-                    "leaseDurationMillis must be greater than 
renewDeadlineMillis " + "(" + config.getLeaseDurationMillis()
-                                            + " is not greater than "
+                    "leaseDurationMillis must be greater than 
renewDeadlineMillis ("
+                                            + config.getLeaseDurationMillis() 
+ " is not greater than "
                                             + config.getRenewDeadlineMillis() 
+ ")");
         }
         if (config.getRenewDeadlineMillis() <= config.getJitterFactor() * 
config.getRetryPeriodMillis()) {
@@ -154,15 +161,47 @@ public class KubernetesClusterService extends 
AbstractCamelClusterService<Kubern
         
this.lockConfiguration.setKubernetesResourcesNamespace(kubernetesNamespace);
     }
 
+    /**
+     * @return     the resource name
+     * @deprecated Use {@link #getKubernetesResourceName()}
+     */
+    @Deprecated
     public String getConfigMapName() {
         return this.lockConfiguration.getConfigMapName();
     }
 
     /**
      * Set the name of the ConfigMap used to do optimistic locking (defaults 
to 'leaders').
+     *
+     * @param      kubernetesResourceName the resource name
+     * @deprecated                        Use {@link 
#setKubernetesResourceName(String)}
+     */
+    @Deprecated
+    public void setConfigMapName(String kubernetesResourceName) {
+        this.lockConfiguration.setConfigMapName(kubernetesResourceName);
+    }
+
+    public LeaseResourceType getLeaseResourceType() {
+        return this.lockConfiguration.getLeaseResourceType();
+    }
+
+    /**
+     * Set the lease resource type used in Kubernetes (defaults to 'Lease', 
from coordination.k8s.io).
      */
-    public void setConfigMapName(String configMapName) {
-        this.lockConfiguration.setConfigMapName(configMapName);
+    public void setLeaseResourceType(LeaseResourceType type) {
+        this.lockConfiguration.setLeaseResourceType(type);
+    }
+
+    public String getKubernetesResourceName() {
+        return this.lockConfiguration.getKubernetesResourceName();
+    }
+
+    /**
+     * Set the name of the lease resource used to do optimistic locking 
(defaults to 'leaders'). Resource name is used
+     * as prefix when the underlying Kubernetes resource can mange a single 
lock.
+     */
+    public void setKubernetesResourceName(String kubernetesResourceName) {
+        
this.lockConfiguration.setKubernetesResourceName(kubernetesResourceName);
     }
 
     public String getPodName() {
@@ -235,4 +274,5 @@ public class KubernetesClusterService extends 
AbstractCamelClusterService<Kubern
     public void setRetryPeriodMillis(long retryPeriodMillis) {
         lockConfiguration.setRetryPeriodMillis(retryPeriodMillis);
     }
+
 }
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java
index a2e80db..f684110 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.camel.CamelContext;
 import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.cluster.CamelPreemptiveClusterView;
 import org.apache.camel.component.kubernetes.KubernetesConfiguration;
 import org.apache.camel.component.kubernetes.KubernetesHelper;
 import 
org.apache.camel.component.kubernetes.cluster.lock.KubernetesClusterEvent;
@@ -40,7 +41,7 @@ import org.apache.camel.util.ObjectHelper;
  * The cluster view on a specific Camel cluster namespace (not to be confused 
with Kubernetes namespaces). Namespaces
  * are represented as keys in a Kubernetes ConfigMap (values are the current 
leader pods).
  */
-public class KubernetesClusterView extends AbstractCamelClusterView {
+public class KubernetesClusterView extends AbstractCamelClusterView implements 
CamelPreemptiveClusterView {
 
     private CamelContext camelContext;
 
@@ -60,6 +61,8 @@ public class KubernetesClusterView extends 
AbstractCamelClusterView {
 
     private KubernetesLeadershipController controller;
 
+    private boolean disabled;
+
     public KubernetesClusterView(CamelContext camelContext, 
KubernetesClusterService cluster,
                                  KubernetesConfiguration configuration,
                                  KubernetesLockConfiguration 
lockConfiguration) {
@@ -69,6 +72,7 @@ public class KubernetesClusterView extends 
AbstractCamelClusterView {
         this.lockConfiguration = ObjectHelper.notNull(lockConfiguration, 
"lockConfiguration");
         this.localMember = new 
KubernetesClusterMember(lockConfiguration.getPodName());
         this.memberCache = new HashMap<>();
+        this.disabled = false;
     }
 
     @Override
@@ -86,6 +90,17 @@ public class KubernetesClusterView extends 
AbstractCamelClusterView {
         return currentMembers;
     }
 
+    public boolean isDisabled() {
+        return disabled;
+    }
+
+    public void setDisabled(boolean disabled) {
+        this.disabled = disabled;
+        if (this.controller != null) {
+            this.controller.setDisabled(disabled);
+        }
+    }
+
     @Override
     protected void doStart() throws Exception {
         if (controller == null) {
@@ -121,6 +136,7 @@ public class KubernetesClusterView extends 
AbstractCamelClusterView {
                 }
             });
 
+            this.controller.setDisabled(disabled);
             controller.start();
         }
     }
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/LeaseResourceType.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/LeaseResourceType.java
new file mode 100644
index 0000000..0561f76
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/LeaseResourceType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.cluster;
+
+public enum LeaseResourceType {
+    /**
+     * A Kubernetes ConfigMap.
+     */
+    ConfigMap,
+
+    /**
+     * A Kubernetes Lease (coordination.k8s.io).
+     */
+    Lease
+}
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/ConfigMapLockUtils.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/ConfigMapLockUtils.java
deleted file mode 100644
index ee14a84..0000000
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/ConfigMapLockUtils.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.cluster.lock;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Set;
-
-import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utilities for managing ConfigMaps that contain lock information.
- */
-public final class ConfigMapLockUtils {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ConfigMapLockUtils.class);
-
-    private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ssX";
-
-    private static final String LEADER_PREFIX = "leader.pod.";
-
-    private static final String LOCAL_TIMESTAMP_PREFIX = 
"leader.local.timestamp.";
-
-    private ConfigMapLockUtils() {
-    }
-
-    public static ConfigMap createNewConfigMap(String configMapName, 
LeaderInfo leaderInfo) {
-        return new 
ConfigMapBuilder().withNewMetadata().withName(configMapName).addToLabels("provider",
 "camel")
-                .addToLabels("kind", "locks").endMetadata()
-                .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), 
leaderInfo.getLeader())
-                .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(), 
formatDate(leaderInfo.getLocalTimestamp()))
-                .build();
-    }
-
-    public static ConfigMap getConfigMapWithNewLeader(ConfigMap configMap, 
LeaderInfo leaderInfo) {
-        return new ConfigMapBuilder(configMap).addToData(LEADER_PREFIX + 
leaderInfo.getGroupName(), leaderInfo.getLeader())
-                .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(), 
formatDate(leaderInfo.getLocalTimestamp()))
-                .build();
-    }
-
-    public static LeaderInfo getLeaderInfo(ConfigMap configMap, Set<String> 
members, String group) {
-        return new LeaderInfo(group, getLeader(configMap, group), 
getLocalTimestamp(configMap, group), members);
-    }
-
-    private static String getLeader(ConfigMap configMap, String group) {
-        return getConfigMapValue(configMap, LEADER_PREFIX + group);
-    }
-
-    private static String formatDate(Date date) {
-        if (date == null) {
-            return null;
-        }
-        try {
-            return new SimpleDateFormat(DATE_TIME_FORMAT).format(date);
-        } catch (Exception e) {
-            LOG.warn("Unable to format date '{}' using format {}", date, 
DATE_TIME_FORMAT, e);
-        }
-
-        return null;
-    }
-
-    private static Date getLocalTimestamp(ConfigMap configMap, String group) {
-        String timestamp = getConfigMapValue(configMap, LOCAL_TIMESTAMP_PREFIX 
+ group);
-        if (timestamp == null) {
-            return null;
-        }
-
-        try {
-            return new SimpleDateFormat(DATE_TIME_FORMAT).parse(timestamp);
-        } catch (Exception e) {
-            LOG.warn("Unable to parse time string '{}' using format {}", 
timestamp, DATE_TIME_FORMAT, e);
-        }
-
-        return null;
-    }
-
-    private static String getConfigMapValue(ConfigMap configMap, String key) {
-        if (configMap == null || configMap.getData() == null) {
-            return null;
-        }
-        return configMap.getData().get(key);
-    }
-
-}
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java
index 8c6c017..69fa5e4 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.camel.CamelContext;
@@ -45,16 +45,18 @@ public class KubernetesLeadershipController implements 
Service {
     private enum State {
         NOT_LEADER,
         BECOMING_LEADER,
-        LEADER
+        LEADER,
+        LOSING_LEADERSHIP,
+        LEADERSHIP_LOST
     }
 
-    private CamelContext camelContext;
+    private final CamelContext camelContext;
 
-    private KubernetesClient kubernetesClient;
+    private final KubernetesClient kubernetesClient;
 
-    private KubernetesLockConfiguration lockConfiguration;
+    private final KubernetesLockConfiguration lockConfiguration;
 
-    private KubernetesClusterEventHandler eventHandler;
+    private final KubernetesClusterEventHandler eventHandler;
 
     private State currentState = State.NOT_LEADER;
 
@@ -62,10 +64,14 @@ public class KubernetesLeadershipController implements 
Service {
 
     private TimedLeaderNotifier leaderNotifier;
 
+    private final KubernetesLeaseResourceManager<HasMetadata> leaseManager;
+
     private volatile LeaderInfo latestLeaderInfo;
-    private volatile ConfigMap latestConfigMap;
+    private volatile HasMetadata latestLeaseResource;
     private volatile Set<String> latestMembers;
 
+    private boolean disabled;
+
     public KubernetesLeadershipController(CamelContext camelContext, 
KubernetesClient kubernetesClient,
                                           KubernetesLockConfiguration 
lockConfiguration,
                                           KubernetesClusterEventHandler 
eventHandler) {
@@ -73,6 +79,8 @@ public class KubernetesLeadershipController implements 
Service {
         this.kubernetesClient = kubernetesClient;
         this.lockConfiguration = lockConfiguration;
         this.eventHandler = eventHandler;
+        this.disabled = false;
+        this.leaseManager = 
KubernetesLeaseResourceManager.create(lockConfiguration.getLeaseResourceType());
     }
 
     @Override
@@ -102,6 +110,18 @@ public class KubernetesLeadershipController implements 
Service {
         leaderNotifier = null;
     }
 
+    public boolean isDisabled() {
+        return disabled;
+    }
+
+    public void setDisabled(boolean disabled) {
+        boolean oldState = this.disabled;
+        this.disabled = disabled;
+        if (oldState != disabled && serializedExecutor != null) {
+            serializedExecutor.execute(this::refreshStatus);
+        }
+    }
+
     private void refreshStatus() {
         switch (currentState) {
             case NOT_LEADER:
@@ -113,6 +133,12 @@ public class KubernetesLeadershipController implements 
Service {
             case LEADER:
                 refreshStatusLeader();
                 break;
+            case LOSING_LEADERSHIP:
+                refreshStatusLosingLeadership();
+                break;
+            case LEADERSHIP_LOST:
+                refreshStatusLeadershipLost();
+                break;
             default:
                 throw new RuntimeException("Unsupported state " + 
currentState);
         }
@@ -132,7 +158,8 @@ public class KubernetesLeadershipController implements 
Service {
 
         if (this.latestLeaderInfo.hasEmptyLeader()) {
             // There is no previous leader
-            LOG.info("{} The cluster has no leaders. Trying to acquire the 
leadership...", logPrefix());
+            LOG.info("{} The cluster has no leaders for group {}. Trying to 
acquire the leadership...", logPrefix(),
+                    this.lockConfiguration.getGroupName());
             boolean acquired = tryAcquireLeadership();
             if (acquired) {
                 LOG.info("{} Leadership acquired by current pod with immediate 
effect", logPrefix());
@@ -191,7 +218,55 @@ public class KubernetesLeadershipController implements 
Service {
         this.serializedExecutor.execute(this::refreshStatus);
     }
 
+    /**
+     * This pod is going to manually lose the leadership. It should shutdown 
activities and wait a lease amount of time
+     * before giving up the lease.
+     */
+    private void refreshStatusLosingLeadership() {
+        // Wait always the same amount of time before giving up the leadership
+        long delay = this.lockConfiguration.getLeaseDurationMillis();
+        LOG.info("{} Current pod owns the leadership, but it will be lost in 
{} seconds...", logPrefix(),
+                new BigDecimal(delay).divide(BigDecimal.valueOf(1000), 2, 
BigDecimal.ROUND_HALF_UP));
+
+        try {
+            Thread.sleep(delay);
+        } catch (InterruptedException e) {
+            LOG.warn("Thread interrupted", e);
+        }
+
+        LOG.info("{} Current pod is losing leadership now...", logPrefix());
+        this.currentState = State.LEADERSHIP_LOST;
+        this.serializedExecutor.execute(this::refreshStatus);
+    }
+
+    /**
+     * Functions are stopped, now lost leadership should be communicated by 
freeing up the lease.
+     */
+    private void refreshStatusLeadershipLost() {
+        boolean pulled = lookupNewLeaderInfo();
+        if (!pulled) {
+            rescheduleAfterDelay();
+            return;
+        }
+
+        if (!this.yieldLeadership()) {
+            rescheduleAfterDelay();
+            return;
+        }
+
+        LOG.info("{} Current pod has lost leadership", logPrefix());
+        this.currentState = State.NOT_LEADER;
+        this.serializedExecutor.execute(this::refreshStatus);
+    }
+
     private void refreshStatusLeader() {
+        if (this.disabled) {
+            LOG.debug("{} Leadership disabled, pod is going to lose 
leadership", logPrefix());
+            this.currentState = State.LOSING_LEADERSHIP;
+            this.serializedExecutor.execute(this::refreshStatus);
+            return;
+        }
+
         LOG.debug("{} Pod should be the leader, pulling new data from the 
cluster", logPrefix());
         long timeBeforePulling = System.currentTimeMillis();
         boolean pulled = lookupNewLeaderInfo();
@@ -202,9 +277,15 @@ public class KubernetesLeadershipController implements 
Service {
 
         if 
(this.latestLeaderInfo.isValidLeader(this.lockConfiguration.getPodName())) {
             LOG.debug("{} Current Pod is still the leader", logPrefix());
+
             
this.leaderNotifier.refreshLeadership(Optional.of(this.lockConfiguration.getPodName()),
 timeBeforePulling,
                     this.lockConfiguration.getRenewDeadlineMillis(),
                     this.latestLeaderInfo.getMembers());
+
+            HasMetadata newLease = 
this.leaseManager.refreshLeaseRenewTime(kubernetesClient, 
this.latestLeaseResource,
+                    this.lockConfiguration.getRenewDeadlineSeconds());
+            updateLatestLeaderInfo(newLease, this.latestMembers);
+
             rescheduleAfterDelay();
             return;
         } else {
@@ -228,13 +309,17 @@ public class KubernetesLeadershipController implements 
Service {
     private boolean lookupNewLeaderInfo() {
         LOG.debug("{} Looking up leadership information...", logPrefix());
 
-        ConfigMap configMap;
+        HasMetadata leaseResource;
         try {
-            configMap = pullConfigMap();
+            leaseResource = leaseManager.fetchLeaseResource(kubernetesClient,
+                    
this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient),
+                    this.lockConfiguration.getKubernetesResourceName(),
+                    this.lockConfiguration.getGroupName());
         } catch (Throwable e) {
-            LOG.warn(logPrefix() + " Unable to retrieve the current ConfigMap 
" + this.lockConfiguration.getConfigMapName()
-                     + " from Kubernetes");
-            LOG.debug(logPrefix() + " Exception thrown during ConfigMap 
lookup", e);
+            LOG.warn(logPrefix() + " Unable to retrieve the current lease 
resource "
+                     + this.lockConfiguration.getKubernetesResourceName()
+                     + " for group " + this.lockConfiguration.getGroupName() + 
" from Kubernetes");
+            LOG.debug(logPrefix() + " Exception thrown during lease resource 
lookup", e);
             return false;
         }
 
@@ -247,14 +332,62 @@ public class KubernetesLeadershipController implements 
Service {
             return false;
         }
 
-        updateLatestLeaderInfo(configMap, members);
+        updateLatestLeaderInfo(leaseResource, members);
         return true;
     }
 
+    private boolean yieldLeadership() {
+        LOG.debug("{} Trying to yield the leadership...", logPrefix());
+
+        HasMetadata leaseResource = this.latestLeaseResource;
+        Set<String> members = this.latestMembers;
+        LeaderInfo latestLeaderInfo = this.latestLeaderInfo;
+
+        if (latestLeaderInfo == null || members == null) {
+            LOG.warn(logPrefix() + " Unexpected condition. Latest leader info 
or list of members is empty.");
+            return false;
+        } else if (!members.contains(this.lockConfiguration.getPodName())) {
+            LOG.warn(logPrefix() + " The list of cluster members " + 
latestLeaderInfo.getMembers()
+                     + " does not contain the current Pod. Cannot yield the 
leadership.");
+            return false;
+        }
+
+        if (leaseResource == null) {
+            // Already yielded
+            return true;
+        }
+
+        LOG.debug("{} Lock lease resource already present in the Kubernetes 
namespace. Checking...", logPrefix());
+        LeaderInfo leaderInfo = leaseManager.decodeLeaderInfo(leaseResource, 
members, this.lockConfiguration.getGroupName());
+        if (!leaderInfo.isValidLeader(this.lockConfiguration.getPodName())) {
+            // Already yielded
+            return true;
+        }
+
+        try {
+            HasMetadata updatedLeaseResource = 
leaseManager.optimisticDeleteLeaderInfo(kubernetesClient, leaseResource,
+                    this.lockConfiguration.getGroupName());
+
+            LOG.debug("{} Lease resource {} for group {} successfully 
updated", logPrefix(),
+                    this.lockConfiguration.getKubernetesResourceName(), 
this.lockConfiguration.getGroupName());
+            updateLatestLeaderInfo(updatedLeaseResource, members);
+            return true;
+        } catch (Exception ex) {
+            LOG.warn(logPrefix() + " Unable to update the lock on the lease 
resource to remove leadership information");
+            LOG.debug(logPrefix() + " Error received during resource lock 
replace", ex);
+            return false;
+        }
+    }
+
     private boolean tryAcquireLeadership() {
+        if (this.disabled) {
+            LOG.debug("{} Won't try to acquire the leadership because it's 
disabled...", logPrefix());
+            return false;
+        }
+
         LOG.debug("{} Trying to acquire the leadership...", logPrefix());
 
-        ConfigMap configMap = this.latestConfigMap;
+        HasMetadata leaseResource = this.latestLeaseResource;
         Set<String> members = this.latestMembers;
         LeaderInfo latestLeaderInfo = this.latestLeaderInfo;
 
@@ -267,53 +400,53 @@ public class KubernetesLeadershipController implements 
Service {
             return false;
         }
 
-        // Info we would set set in the configmap to become leaders
+        // Info we would set set in the lease resource to become leaders
         LeaderInfo newLeaderInfo = new LeaderInfo(
-                this.lockConfiguration.getGroupName(), 
this.lockConfiguration.getPodName(), new Date(), members);
+                this.lockConfiguration.getGroupName(), 
this.lockConfiguration.getPodName(), new Date(), members,
+                this.lockConfiguration.getLeaseDurationSeconds());
 
-        if (configMap == null) {
-            // No ConfigMap created so far
-            LOG.debug("{} Lock configmap is not present in the Kubernetes 
namespace. A new ConfigMap will be created",
+        if (leaseResource == null) {
+            // No leaseResource created so far
+            LOG.debug("{} Lock lease resource is not present in the Kubernetes 
namespace. A new lease resource will be created",
                     logPrefix());
-            ConfigMap newConfigMap
-                    = 
ConfigMapLockUtils.createNewConfigMap(this.lockConfiguration.getConfigMapName(),
 newLeaderInfo);
 
             try {
-                kubernetesClient.configMaps()
-                        
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                        .create(newConfigMap);
-
-                LOG.debug("{} ConfigMap {} successfully created", logPrefix(), 
this.lockConfiguration.getConfigMapName());
-                updateLatestLeaderInfo(newConfigMap, members);
+                HasMetadata newLeaseResource = 
leaseManager.createNewLeaseResource(kubernetesClient,
+                        
this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient),
+                        this.lockConfiguration.getKubernetesResourceName(),
+                        newLeaderInfo);
+
+                LOG.debug("{} Lease resource {} successfully created for group 
{}", logPrefix(),
+                        this.lockConfiguration.getKubernetesResourceName(), 
newLeaderInfo.getGroupName());
+                updateLatestLeaderInfo(newLeaseResource, members);
                 return true;
             } catch (Exception ex) {
                 // Suppress exception
                 LOG.warn(logPrefix()
-                         + " Unable to create the ConfigMap, it may have been 
created by other cluster members concurrently. If the problem persists, check 
if the service account has "
+                         + " Unable to create the lease resource, it may have 
been created by other cluster members concurrently. If the problem persists, 
check if the service account has "
                          + "the right " + "permissions to create it");
-                LOG.debug(logPrefix() + " Exception while trying to create the 
ConfigMap", ex);
+                LOG.debug(logPrefix() + " Exception while trying to create the 
lease resource", ex);
                 return false;
             }
         } else {
-            LOG.debug("{} Lock configmap already present in the Kubernetes 
namespace. Checking...", logPrefix());
-            LeaderInfo leaderInfo = 
ConfigMapLockUtils.getLeaderInfo(configMap, members, 
this.lockConfiguration.getGroupName());
+            LOG.debug("{} Lock lease resource already present in the 
Kubernetes namespace. Checking...", logPrefix());
+            LeaderInfo leaderInfo
+                    = leaseManager.decodeLeaderInfo(leaseResource, members, 
this.lockConfiguration.getGroupName());
 
             boolean canAcquire = !leaderInfo.hasValidLeader();
             if (canAcquire) {
                 // Try to be the new leader
                 try {
-                    ConfigMap updatedConfigMap = 
ConfigMapLockUtils.getConfigMapWithNewLeader(configMap, newLeaderInfo);
-                    kubernetesClient.configMaps()
-                            
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                            
.withName(this.lockConfiguration.getConfigMapName())
-                            
.lockResourceVersion(configMap.getMetadata().getResourceVersion()).replace(updatedConfigMap);
-
-                    LOG.debug("{} ConfigMap {} successfully updated", 
logPrefix(), this.lockConfiguration.getConfigMapName());
-                    updateLatestLeaderInfo(updatedConfigMap, members);
+                    HasMetadata updatedLeaseResource
+                            = 
leaseManager.optimisticAcquireLeadership(kubernetesClient, leaseResource, 
newLeaderInfo);
+
+                    LOG.debug("{} Lease resource {} successfully updated for 
group {}", logPrefix(),
+                            
this.lockConfiguration.getKubernetesResourceName(), 
newLeaderInfo.getGroupName());
+                    updateLatestLeaderInfo(updatedLeaseResource, members);
                     return true;
                 } catch (Exception ex) {
-                    LOG.warn(logPrefix() + " Unable to update the lock 
ConfigMap to set leadership information");
-                    LOG.debug(logPrefix() + " Error received during configmap 
lock replace", ex);
+                    LOG.warn(logPrefix() + " Unable to update the lock lease 
resource to set leadership information");
+                    LOG.debug(logPrefix() + " Error received during lease 
resource lock replace", ex);
                     return false;
                 }
             } else {
@@ -325,20 +458,14 @@ public class KubernetesLeadershipController implements 
Service {
         }
     }
 
-    private void updateLatestLeaderInfo(ConfigMap configMap, Set<String> 
members) {
+    private void updateLatestLeaderInfo(HasMetadata leaseResource, Set<String> 
members) {
         LOG.debug("{} Updating internal status about the current leader", 
logPrefix());
-        this.latestConfigMap = configMap;
+        this.latestLeaseResource = leaseResource;
         this.latestMembers = members;
-        this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, 
members, this.lockConfiguration.getGroupName());
+        this.latestLeaderInfo = leaseManager.decodeLeaderInfo(leaseResource, 
members, this.lockConfiguration.getGroupName());
         LOG.debug("{} Current leader info: {}", logPrefix(), 
this.latestLeaderInfo);
     }
 
-    private ConfigMap pullConfigMap() {
-        return kubernetesClient.configMaps()
-                
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                .withName(this.lockConfiguration.getConfigMapName()).get();
-    }
-
     private Set<String> pullClusterMembers() {
         List<Pod> pods = kubernetesClient.pods()
                 
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeaseResourceManager.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeaseResourceManager.java
new file mode 100644
index 0000000..0ab7b7f
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeaseResourceManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.cluster.lock;
+
+import java.util.Set;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.kubernetes.cluster.LeaseResourceType;
+import 
org.apache.camel.component.kubernetes.cluster.lock.impl.ConfigMapLeaseResourceManager;
+import 
org.apache.camel.component.kubernetes.cluster.lock.impl.NativeLeaseResourceManager;
+
+/**
+ * Handles the actual interaction with Kubernetes resources, allowing 
different implementation to be plugged.
+ */
+public interface KubernetesLeaseResourceManager<T extends HasMetadata> {
+
+    /**
+     * Create a new {@link KubernetesLeaseResourceManager} of the given {@link 
LeaseResourceType}.
+     */
+    @SuppressWarnings("unchecked")
+    static <S extends HasMetadata> KubernetesLeaseResourceManager<S> 
create(LeaseResourceType type) {
+        switch (type) {
+            case ConfigMap:
+                return (KubernetesLeaseResourceManager<S>) new 
ConfigMapLeaseResourceManager();
+            case Lease:
+                return (KubernetesLeaseResourceManager<S>) new 
NativeLeaseResourceManager();
+            default:
+                throw new RuntimeCamelException("Unsupported lease resource 
type " + type);
+        }
+    }
+
+    /**
+     * Return a {@link LeaderInfo} object from the underlying Kubernetes 
resource.
+     */
+    LeaderInfo decodeLeaderInfo(T leaseResource, Set<String> members, String 
group);
+
+    /**
+     * Fetch the lease resource for the given name and group.
+     */
+    T fetchLeaseResource(KubernetesClient client, String namespace, String 
leaseResourceName, String group);
+
+    /**
+     * Delete leadership information for the given lease resource and group.
+     */
+    T optimisticDeleteLeaderInfo(KubernetesClient client, T leaseResource, 
String group);
+
+    /**
+     * Set the leadership information on the lease resource to match the given 
{@link LeaderInfo}.
+     */
+    T optimisticAcquireLeadership(KubernetesClient client, T leaseResource, 
LeaderInfo newLeaderInfo);
+
+    /**
+     * Create a new lease resource matching the given {@link LeaderInfo}.
+     */
+    T createNewLeaseResource(KubernetesClient client, String namespace, String 
leaseResourceName, LeaderInfo leaderInfo);
+
+    /**
+     * Update information on the lease resource to increase the renew time (if 
last renewal has occurred more than
+     * minUpdateIntervalSeconds seconds ago).
+     */
+    T refreshLeaseRenewTime(KubernetesClient client, T leaseResource, int 
minUpdateIntervalSeconds);
+
+}
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java
index de9999b..70c52f8 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java
@@ -20,18 +20,25 @@ import java.util.HashMap;
 import java.util.Map;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.camel.component.kubernetes.cluster.LeaseResourceType;
 
 /**
  * Configuration for Kubernetes Lock.
  */
 public class KubernetesLockConfiguration implements Cloneable {
 
-    public static final String DEFAULT_CONFIGMAP_NAME = "leaders";
+    public static final LeaseResourceType DEFAULT_LEASE_RESOURCE_TYPE = 
LeaseResourceType.Lease;
+    public static final String DEFAULT_RESOURCE_NAME = "leaders";
 
     public static final double DEFAULT_JITTER_FACTOR = 1.2;
-    public static final long DEFAULT_LEASE_DURATION_MILLIS = 30000;
-    public static final long DEFAULT_RENEW_DEADLINE_MILLIS = 20000;
-    public static final long DEFAULT_RETRY_PERIOD_MILLIS = 5000;
+    public static final long DEFAULT_LEASE_DURATION_MILLIS = 15000;
+    public static final long DEFAULT_RENEW_DEADLINE_MILLIS = 10000;
+    public static final long DEFAULT_RETRY_PERIOD_MILLIS = 2000;
+
+    /**
+     * Kubernetes resource type used to hold the leases.
+     */
+    private LeaseResourceType leaseResourceType = DEFAULT_LEASE_RESOURCE_TYPE;
 
     /**
      * Kubernetes namespace containing the pods and the ConfigMap used for 
locking.
@@ -39,9 +46,9 @@ public class KubernetesLockConfiguration implements Cloneable 
{
     private String kubernetesResourcesNamespace;
 
     /**
-     * Name of the ConfigMap used for locking.
+     * Name of the resource used for locking (or prefix, in case multiple ones 
are used).
      */
-    private String configMapName = DEFAULT_CONFIGMAP_NAME;
+    private String kubernetesResourceName = DEFAULT_RESOURCE_NAME;
 
     /**
      * Name of the lock group (or namespace according to the Camel cluster 
convention) within the chosen ConfigMap.
@@ -82,6 +89,14 @@ public class KubernetesLockConfiguration implements 
Cloneable {
     public KubernetesLockConfiguration() {
     }
 
+    public LeaseResourceType getLeaseResourceType() {
+        return leaseResourceType;
+    }
+
+    public void setLeaseResourceType(LeaseResourceType leaseResourceType) {
+        this.leaseResourceType = leaseResourceType;
+    }
+
     public String getKubernetesResourcesNamespaceOrDefault(KubernetesClient 
kubernetesClient) {
         if (kubernetesResourcesNamespace != null) {
             return kubernetesResourcesNamespace;
@@ -97,12 +112,30 @@ public class KubernetesLockConfiguration implements 
Cloneable {
         this.kubernetesResourcesNamespace = kubernetesResourcesNamespace;
     }
 
+    /**
+     * @return     the resource name
+     * @deprecated Use {@link #getKubernetesResourceName()}
+     */
+    @Deprecated
     public String getConfigMapName() {
-        return configMapName;
+        return kubernetesResourceName;
     }
 
-    public void setConfigMapName(String configMapName) {
-        this.configMapName = configMapName;
+    /**
+     * @param      kubernetesResourceName the resource name
+     * @deprecated                        Use {@link 
#setKubernetesResourceName(String)}
+     */
+    @Deprecated
+    public void setConfigMapName(String kubernetesResourceName) {
+        this.kubernetesResourceName = kubernetesResourceName;
+    }
+
+    public String getKubernetesResourceName() {
+        return kubernetesResourceName;
+    }
+
+    public void setKubernetesResourceName(String kubernetesResourceName) {
+        this.kubernetesResourceName = kubernetesResourceName;
     }
 
     public String getGroupName() {
@@ -141,6 +174,10 @@ public class KubernetesLockConfiguration implements 
Cloneable {
         this.jitterFactor = jitterFactor;
     }
 
+    public int getLeaseDurationSeconds() {
+        return (int) (getLeaseDurationMillis() / 1000);
+    }
+
     public long getLeaseDurationMillis() {
         return leaseDurationMillis;
     }
@@ -149,6 +186,10 @@ public class KubernetesLockConfiguration implements 
Cloneable {
         this.leaseDurationMillis = leaseDurationMillis;
     }
 
+    public int getRenewDeadlineSeconds() {
+        return (int) (getRenewDeadlineMillis() / 1000);
+    }
+
     public long getRenewDeadlineMillis() {
         return renewDeadlineMillis;
     }
@@ -178,7 +219,7 @@ public class KubernetesLockConfiguration implements 
Cloneable {
     public String toString() {
         final StringBuilder sb = new 
StringBuilder("KubernetesLockConfiguration{");
         
sb.append("kubernetesResourcesNamespace='").append(kubernetesResourcesNamespace).append('\'');
-        sb.append(", configMapName='").append(configMapName).append('\'');
+        sb.append(", 
kubernetesResourceName='").append(kubernetesResourceName).append('\'');
         sb.append(", groupName='").append(groupName).append('\'');
         sb.append(", podName='").append(podName).append('\'');
         sb.append(", clusterLabels=").append(clusterLabels);
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java
index f19a075..97b2b47 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java
@@ -34,14 +34,17 @@ public class LeaderInfo {
 
     private Set<String> members;
 
+    private Integer leaseDurationSeconds;
+
     public LeaderInfo() {
     }
 
-    public LeaderInfo(String groupName, String leader, Date timestamp, 
Set<String> members) {
+    public LeaderInfo(String groupName, String leader, Date timestamp, 
Set<String> members, Integer leaseDurationSeconds) {
         this.groupName = groupName;
         this.leader = leader;
         this.localTimestamp = timestamp;
         this.members = members;
+        this.leaseDurationSeconds = leaseDurationSeconds;
     }
 
     public boolean hasEmptyLeader() {
@@ -89,6 +92,14 @@ public class LeaderInfo {
         this.members = members;
     }
 
+    public Integer getLeaseDurationSeconds() {
+        return leaseDurationSeconds;
+    }
+
+    public void setLeaseDurationSeconds(Integer leaseDurationSeconds) {
+        this.leaseDurationSeconds = leaseDurationSeconds;
+    }
+
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder("LeaderInfo{");
@@ -96,6 +107,7 @@ public class LeaderInfo {
         sb.append(", leader='").append(leader).append('\'');
         sb.append(", localTimestamp=").append(localTimestamp);
         sb.append(", members=").append(members);
+        sb.append(", leaseDurationSeconds=").append(leaseDurationSeconds);
         sb.append('}');
         return sb.toString();
     }
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java
index 5b49fef..650d592 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java
@@ -165,12 +165,8 @@ public class TimedLeaderNotifier implements Service {
             lastCommunicatedLeader = newLeader;
             LOG.info("The cluster has a new leader: {}", newLeader);
             try {
-                handler.onKubernetesClusterEvent(new 
KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent() {
-                    @Override
-                    public Optional<String> getData() {
-                        return newLeader;
-                    }
-                });
+                handler.onKubernetesClusterEvent(
+                        
(KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> newLeader);
             } catch (Throwable t) {
                 LOG.warn("Error while communicating the new leader to the 
handler", t);
             }
@@ -181,12 +177,8 @@ public class TimedLeaderNotifier implements Service {
             lastCommunicatedMembers = newMembers;
             LOG.info("The list of cluster members has changed: {}", 
newMembers);
             try {
-                handler.onKubernetesClusterEvent(new 
KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent() {
-                    @Override
-                    public Set<String> getData() {
-                        return newMembers;
-                    }
-                });
+                handler.onKubernetesClusterEvent(
+                        
(KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) () -> 
newMembers);
             } catch (Throwable t) {
                 LOG.warn("Error while communicating the cluster members to the 
handler", t);
             }
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/ConfigMapLeaseResourceManager.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/ConfigMapLeaseResourceManager.java
new file mode 100644
index 0000000..66e6b2e
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/ConfigMapLeaseResourceManager.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.cluster.lock.impl;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Set;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import 
org.apache.camel.component.kubernetes.cluster.lock.KubernetesLeaseResourceManager;
+import org.apache.camel.component.kubernetes.cluster.lock.LeaderInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigMapLeaseResourceManager implements 
KubernetesLeaseResourceManager<ConfigMap> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConfigMapLeaseResourceManager.class);
+
+    private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ssX";
+
+    private static final String LEADER_PREFIX = "leader.pod.";
+
+    private static final String LOCAL_TIMESTAMP_PREFIX = 
"leader.local.timestamp.";
+
+    @Override
+    public LeaderInfo decodeLeaderInfo(ConfigMap configMap, Set<String> 
members, String group) {
+        return new LeaderInfo(group, getLeader(configMap, group), 
getLocalTimestamp(configMap, group), members, null);
+    }
+
+    @Override
+    public ConfigMap fetchLeaseResource(KubernetesClient client, String 
namespace, String name, String group) {
+        return client.configMaps()
+                .inNamespace(namespace)
+                .withName(name).get();
+    }
+
+    @Override
+    public ConfigMap optimisticDeleteLeaderInfo(KubernetesClient client, 
ConfigMap leaseResource, String group) {
+        ConfigMap updatedConfigMap = getConfigMapWithoutLeader(leaseResource, 
group);
+        return client.configMaps()
+                .inNamespace(leaseResource.getMetadata().getNamespace())
+                .withName(leaseResource.getMetadata().getName())
+                
.lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedConfigMap);
+    }
+
+    @Override
+    public ConfigMap optimisticAcquireLeadership(KubernetesClient client, 
ConfigMap leaseResource, LeaderInfo newLeaderInfo) {
+        ConfigMap updatedConfigMap = getConfigMapWithNewLeader(leaseResource, 
newLeaderInfo);
+        return client.configMaps()
+                .inNamespace(leaseResource.getMetadata().getNamespace())
+                .withName(leaseResource.getMetadata().getName())
+                
.lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedConfigMap);
+    }
+
+    @Override
+    public ConfigMap createNewLeaseResource(
+            KubernetesClient client, String namespace, String 
leaseResourceName, LeaderInfo leaderInfo) {
+        ConfigMap newConfigMap
+                = new 
ConfigMapBuilder().withNewMetadata().withName(leaseResourceName).addToLabels("provider",
 "camel")
+                        .addToLabels("kind", "locks").endMetadata()
+                        .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), 
leaderInfo.getLeader())
+                        .addToData(LOCAL_TIMESTAMP_PREFIX + 
leaderInfo.getGroupName(),
+                                formatDate(leaderInfo.getLocalTimestamp()))
+                        .build();
+
+        return client.configMaps()
+                .inNamespace(namespace)
+                .create(newConfigMap);
+    }
+
+    @Override
+    public ConfigMap refreshLeaseRenewTime(KubernetesClient client, ConfigMap 
leaseResource, int minUpdateIntervalSeconds) {
+        // Configmap does not store renew information
+        return leaseResource;
+    }
+
+    private static ConfigMap getConfigMapWithNewLeader(ConfigMap configMap, 
LeaderInfo leaderInfo) {
+        return new ConfigMapBuilder(configMap).addToData(LEADER_PREFIX + 
leaderInfo.getGroupName(), leaderInfo.getLeader())
+                .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(), 
formatDate(leaderInfo.getLocalTimestamp()))
+                .build();
+    }
+
+    private static ConfigMap getConfigMapWithoutLeader(ConfigMap configMap, 
String group) {
+        return new ConfigMapBuilder(configMap).removeFromData(LEADER_PREFIX + 
group)
+                .removeFromData(LOCAL_TIMESTAMP_PREFIX + group)
+                .build();
+    }
+
+    private static Date getLocalTimestamp(ConfigMap configMap, String group) {
+        String timestamp = getConfigMapValue(configMap, LOCAL_TIMESTAMP_PREFIX 
+ group);
+        if (timestamp == null) {
+            return null;
+        }
+
+        try {
+            return new SimpleDateFormat(DATE_TIME_FORMAT).parse(timestamp);
+        } catch (Exception e) {
+            LOG.warn("Unable to parse time string '" + timestamp + "' using 
format " + DATE_TIME_FORMAT, e);
+        }
+
+        return null;
+    }
+
+    private static String getLeader(ConfigMap configMap, String group) {
+        return getConfigMapValue(configMap, LEADER_PREFIX + group);
+    }
+
+    private static String getConfigMapValue(ConfigMap configMap, String key) {
+        if (configMap == null || configMap.getData() == null) {
+            return null;
+        }
+        return configMap.getData().get(key);
+    }
+
+    private static String formatDate(Date date) {
+        if (date == null) {
+            return null;
+        }
+        try {
+            return new SimpleDateFormat(DATE_TIME_FORMAT).format(date);
+        } catch (Exception e) {
+            LOG.warn("Unable to format date '" + date + "' using format " + 
DATE_TIME_FORMAT, e);
+        }
+
+        return null;
+    }
+
+}
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/NativeLeaseResourceManager.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/NativeLeaseResourceManager.java
new file mode 100644
index 0000000..1400208
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/NativeLeaseResourceManager.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.cluster.lock.impl;
+
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.Set;
+
+import io.fabric8.kubernetes.api.model.coordination.v1.Lease;
+import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import 
org.apache.camel.component.kubernetes.cluster.lock.KubernetesLeaseResourceManager;
+import org.apache.camel.component.kubernetes.cluster.lock.LeaderInfo;
+
+public class NativeLeaseResourceManager implements 
KubernetesLeaseResourceManager<Lease> {
+
+    @Override
+    public LeaderInfo decodeLeaderInfo(Lease lease, Set<String> members, 
String group) {
+        return new LeaderInfo(group, getLeader(lease), 
getLocalTimestamp(lease), members, getLeaseDurationSeconds(lease));
+    }
+
+    @Override
+    public Lease fetchLeaseResource(KubernetesClient client, String namespace, 
String name, String group) {
+        return client.leases()
+                .inNamespace(namespace)
+                .withName(leaseResourceName(name, group)).get();
+    }
+
+    @Override
+    public Lease optimisticDeleteLeaderInfo(KubernetesClient client, Lease 
leaseResource, String group) {
+        Lease updatedLease = getLeaseWithoutLeader(leaseResource);
+        return client.leases()
+                .inNamespace(leaseResource.getMetadata().getNamespace())
+                .withName(leaseResource.getMetadata().getName())
+                
.lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedLease);
+    }
+
+    @Override
+    public Lease optimisticAcquireLeadership(KubernetesClient client, Lease 
leaseResource, LeaderInfo newLeaderInfo) {
+        Lease updatedLease = getLeaseWithNewLeader(leaseResource, 
newLeaderInfo);
+        return client.leases()
+                .inNamespace(leaseResource.getMetadata().getNamespace())
+                .withName(leaseResource.getMetadata().getName())
+                
.lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedLease);
+    }
+
+    @Override
+    public Lease refreshLeaseRenewTime(KubernetesClient client, Lease 
leaseResource, int minUpdateIntervalSeconds) {
+        ZonedDateTime lastRenew = leaseResource.getSpec() != null ? 
leaseResource.getSpec().getRenewTime() : null;
+        if (lastRenew == null || 
lastRenew.plusSeconds(minUpdateIntervalSeconds).isBefore(ZonedDateTime.now())) {
+            Lease updatedLease = new LeaseBuilder(leaseResource)
+                    .editOrNewSpec()
+                    .withRenewTime(ZonedDateTime.now())
+                    .endSpec()
+                    .build();
+            return client.leases()
+                    .inNamespace(leaseResource.getMetadata().getNamespace())
+                    .withName(leaseResource.getMetadata().getName())
+                    
.lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedLease);
+        }
+        return leaseResource;
+    }
+
+    @Override
+    public Lease createNewLeaseResource(KubernetesClient client, String 
namespace, String prefix, LeaderInfo leaderInfo) {
+        ZonedDateTime now = ZonedDateTime.now();
+        Lease newLease = new LeaseBuilder().withNewMetadata()
+                .withName(leaseResourceName(prefix, leaderInfo.getGroupName()))
+                .addToLabels("provider", "camel")
+                .endMetadata()
+                .withNewSpec()
+                .withNewHolderIdentity(leaderInfo.getLeader())
+                .withAcquireTime(now)
+                .withLeaseDurationSeconds(leaderInfo.getLeaseDurationSeconds())
+                .withRenewTime(now)
+                .endSpec()
+                .build();
+
+        return client.leases()
+                .inNamespace(namespace)
+                .create(newLease);
+    }
+
+    private static Lease getLeaseWithNewLeader(Lease lease, LeaderInfo 
leaderInfo) {
+        Integer transitions = lease.getSpec() != null ? 
lease.getSpec().getLeaseTransitions() : null;
+        if (transitions == null) {
+            transitions = 0;
+        }
+        ZonedDateTime now = ZonedDateTime.now();
+        return new LeaseBuilder(lease)
+                .editOrNewSpec()
+                .withNewHolderIdentity(leaderInfo.getLeader())
+                .withAcquireTime(now)
+                .withLeaseDurationSeconds(leaderInfo.getLeaseDurationSeconds())
+                .withRenewTime(now)
+                .withLeaseTransitions(transitions + 1)
+                .endSpec()
+                .build();
+    }
+
+    private static Lease getLeaseWithoutLeader(Lease lease) {
+        return new LeaseBuilder(lease).editOrNewSpec()
+                .withHolderIdentity(null)
+                .withAcquireTime(null)
+                .withRenewTime(null)
+                .withLeaseDurationSeconds(null)
+                .endSpec()
+                .build();
+    }
+
+    private static Date getLocalTimestamp(Lease lease) {
+        if (lease == null || lease.getSpec() == null || 
lease.getSpec().getAcquireTime() == null) {
+            return null;
+        }
+        return Date.from(lease.getSpec().getAcquireTime().toInstant());
+    }
+
+    private static Integer getLeaseDurationSeconds(Lease lease) {
+        if (lease == null || lease.getSpec() == null) {
+            return null;
+        }
+        return lease.getSpec().getLeaseDurationSeconds();
+    }
+
+    private static String getLeader(Lease lease) {
+        if (lease == null || lease.getSpec() == null) {
+            return null;
+        }
+        return lease.getSpec().getHolderIdentity();
+    }
+
+    private static String leaseResourceName(String prefix, String group) {
+        return toValidKubernetesID(prefix + "-" + group);
+    }
+
+    private static String toValidKubernetesID(String id) {
+        id = id.toLowerCase().replaceAll("[^a-z0-9-.]", "-");
+        while (id.length() > 0 && isNonAlphanumeric(id, 0)) {
+            id = id.substring(1);
+        }
+        while (id.length() > 0 && isNonAlphanumeric(id, id.length() - 1)) {
+            id = id.substring(0, id.length() - 1);
+        }
+        return id;
+    }
+
+    private static boolean isNonAlphanumeric(String id, int pos) {
+        return !Character.isAlphabetic(id.charAt(pos)) && 
!Character.isDigit(id.charAt(pos));
+    }
+}
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java
index e58d4d1..cdaedc9 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java
@@ -16,25 +16,40 @@
  */
 package org.apache.camel.component.kubernetes.cluster;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder;
+import org.apache.camel.cluster.CamelPreemptiveClusterService;
 import org.apache.camel.component.kubernetes.KubernetesConfiguration;
 import 
org.apache.camel.component.kubernetes.cluster.utils.ConfigMapLockSimulator;
 import org.apache.camel.component.kubernetes.cluster.utils.LeaderRecorder;
+import org.apache.camel.component.kubernetes.cluster.utils.LeaseLockSimulator;
 import org.apache.camel.component.kubernetes.cluster.utils.LockTestServer;
+import 
org.apache.camel.component.kubernetes.cluster.utils.ResourceLockSimulator;
+import org.apache.camel.support.cluster.RebalancingCamelClusterService;
 import org.apache.camel.test.junit5.CamelTestSupport;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -52,31 +67,33 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
     private static final int RETRY_PERIOD_MILLIS = 200;
     private static final double JITTER_FACTOR = 1.1;
 
-    private ConfigMapLockSimulator lockSimulator;
+    private ConfigMapLockSimulator configMapLockSimulator;
+    private Map<String, LeaseLockSimulator> leaseLockSimulators = new 
HashMap<>();
 
-    private Map<String, LockTestServer> lockServers;
+    private Map<String, LockTestServer<?>> lockServers = new HashMap<>();
 
-    @BeforeEach
-    public void prepareLock() {
-        this.lockSimulator = new ConfigMapLockSimulator("leaders");
-        this.lockServers = new HashMap<>();
-    }
+    private Map<String, CamelPreemptiveClusterService> clusterServices = new 
HashMap<>();
 
     @AfterEach
     public void shutdownLock() {
-        for (LockTestServer server : this.lockServers.values()) {
+        for (LockTestServer<?> server : this.lockServers.values()) {
             try {
                 server.destroy();
             } catch (Exception e) {
                 // can happen in case of delay
             }
         }
+        this.lockServers = new HashMap<>();
+        configMapLockSimulator = null;
+        leaseLockSimulators = new HashMap<>();
+        clusterServices = new HashMap<>();
     }
 
-    @Test
-    public void testSimpleLeaderElection() throws Exception {
-        LeaderRecorder mypod1 = addMember("mypod1");
-        LeaderRecorder mypod2 = addMember("mypod2");
+    @ParameterizedTest
+    @EnumSource(LeaseResourceType.class)
+    public void testSimpleLeaderElection(LeaseResourceType type) throws 
Exception {
+        LeaderRecorder mypod1 = addMember("mypod1", type);
+        LeaderRecorder mypod2 = addMember("mypod2", type);
         context.start();
 
         mypod1.waitForAnyLeader(5, TimeUnit.SECONDS);
@@ -88,11 +105,12 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
         assertEquals(mypod2.getCurrentLeader(), leader, "Leaders should be 
equals");
     }
 
-    @Test
-    public void testMultipleMembersLeaderElection() throws Exception {
+    @ParameterizedTest
+    @EnumSource(LeaseResourceType.class)
+    public void testMultipleMembersLeaderElection(LeaseResourceType type) 
throws Exception {
         int number = 5;
         List<LeaderRecorder> members
-                = IntStream.range(0, number).mapToObj(i -> addMember("mypod" + 
i)).collect(Collectors.toList());
+                = IntStream.range(0, number).mapToObj(i -> addMember("mypod" + 
i, type)).collect(Collectors.toList());
         context.start();
 
         for (LeaderRecorder member : members) {
@@ -107,10 +125,11 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
 
     @Test
     public void testSimpleLeaderElectionWithExistingConfigMap() throws 
Exception {
-        lockSimulator.setConfigMap(new 
ConfigMapBuilder().withNewMetadata().withName("leaders").and().build(), true);
+        this.configMapLockSimulator = new ConfigMapLockSimulator("leaders");
+        configMapLockSimulator.setResource(new 
ConfigMapBuilder().withNewMetadata().withName("leaders").and().build(), true);
 
-        LeaderRecorder mypod1 = addMember("mypod1");
-        LeaderRecorder mypod2 = addMember("mypod2");
+        LeaderRecorder mypod1 = addMember("mypod1", 
LeaseResourceType.ConfigMap);
+        LeaderRecorder mypod2 = addMember("mypod2", 
LeaseResourceType.ConfigMap);
         context.start();
 
         mypod1.waitForAnyLeader(10, TimeUnit.SECONDS);
@@ -122,9 +141,31 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
     }
 
     @Test
-    public void testLeadershipLoss() throws Exception {
-        LeaderRecorder mypod1 = addMember("mypod1");
-        LeaderRecorder mypod2 = addMember("mypod2");
+    public void testSimpleLeaderElectionWithExistingLeases() throws Exception {
+        LeaseLockSimulator simulator = new 
LeaseLockSimulator("leaders-mygroup");
+        simulator.setResource(new LeaseBuilder()
+                .withNewMetadata().withName("leaders-mygroup")
+                .and()
+                .build(), true);
+        this.leaseLockSimulators.put("mygroup", simulator);
+
+        LeaderRecorder mypod1 = addMember("mypod1", "mygroup", 
LeaseResourceType.Lease);
+        LeaderRecorder mypod2 = addMember("mypod2", "mygroup", 
LeaseResourceType.Lease);
+        context.start();
+
+        mypod1.waitForAnyLeader(10, TimeUnit.SECONDS);
+        mypod2.waitForAnyLeader(10, TimeUnit.SECONDS);
+
+        String leader = mypod1.getCurrentLeader();
+        assertTrue(leader.startsWith("mypod"));
+        assertEquals(mypod2.getCurrentLeader(), leader, "Leaders should be 
equals");
+    }
+
+    @ParameterizedTest
+    @EnumSource(LeaseResourceType.class)
+    public void testLeadershipLoss(LeaseResourceType type) throws Exception {
+        LeaderRecorder mypod1 = addMember("mypod1", type);
+        LeaderRecorder mypod2 = addMember("mypod2", type);
         context.start();
 
         mypod1.waitForAnyLeader(5, TimeUnit.SECONDS);
@@ -152,10 +193,11 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
         checkLeadershipChangeDistance((LEASE_TIME_MILLIS - 
RENEW_DEADLINE_MILLIS) / 2, TimeUnit.MILLISECONDS, mypod1, mypod2);
     }
 
-    @Test
-    public void testSlowLeaderLosingLeadershipOnlyInternally() throws 
Exception {
-        LeaderRecorder mypod1 = addMember("mypod1");
-        LeaderRecorder mypod2 = addMember("mypod2");
+    @ParameterizedTest
+    @EnumSource(LeaseResourceType.class)
+    public void testSlowLeaderLosingLeadershipOnlyInternally(LeaseResourceType 
type) throws Exception {
+        LeaderRecorder mypod1 = addMember("mypod1", type);
+        LeaderRecorder mypod2 = addMember("mypod2", type);
         context.start();
 
         mypod1.waitForAnyLeader(5, TimeUnit.SECONDS);
@@ -173,10 +215,11 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
         assertEquals(firstLeader, formerLoserRecorder.getCurrentLeader());
     }
 
-    @Test
-    public void testRecoveryAfterFailure() throws Exception {
-        LeaderRecorder mypod1 = addMember("mypod1");
-        LeaderRecorder mypod2 = addMember("mypod2");
+    @ParameterizedTest
+    @EnumSource(LeaseResourceType.class)
+    public void testRecoveryAfterFailure(LeaseResourceType type) throws 
Exception {
+        LeaderRecorder mypod1 = addMember("mypod1", type);
+        LeaderRecorder mypod2 = addMember("mypod2", type);
         context.start();
 
         mypod1.waitForAnyLeader(5, TimeUnit.SECONDS);
@@ -197,10 +240,10 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
 
     @Test
     public void testSharedConfigMap() throws Exception {
-        LeaderRecorder a1 = addMember("a1");
-        LeaderRecorder a2 = addMember("a2");
-        LeaderRecorder b1 = addMember("b1", "app2");
-        LeaderRecorder b2 = addMember("b2", "app2");
+        LeaderRecorder a1 = addMember("a1", LeaseResourceType.ConfigMap);
+        LeaderRecorder a2 = addMember("a2", LeaseResourceType.ConfigMap);
+        LeaderRecorder b1 = addMember("b1", "app2", 
LeaseResourceType.ConfigMap);
+        LeaderRecorder b2 = addMember("b2", "app2", 
LeaseResourceType.ConfigMap);
         context.start();
 
         a1.waitForAnyLeader(5, TimeUnit.SECONDS);
@@ -218,33 +261,110 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
         assertNotEquals(a1.getCurrentLeader(), b2.getCurrentLeader());
     }
 
+    static Stream<Arguments> rebalancingProvider() {
+        return Stream.of(
+                // LeaseResourceType, pods, partitions, expected partitions 
owned, tolerance on owned partitions
+                Arguments.of(LeaseResourceType.Lease, 4, 2, 0, 1),
+                Arguments.of(LeaseResourceType.Lease, 1, 2, 2, 0),
+                Arguments.of(LeaseResourceType.Lease, 2, 2, 1, 0),
+                Arguments.of(LeaseResourceType.ConfigMap, 3, 10, 3, 1),
+                Arguments.of(LeaseResourceType.Lease, 3, 10, 3, 1),
+                Arguments.of(LeaseResourceType.ConfigMap, 6, 23, 3, 1),
+                Arguments.of(LeaseResourceType.Lease, 6, 23, 3, 1));
+    }
+
+    @ParameterizedTest
+    @MethodSource("rebalancingProvider")
+    public void testRebalancing(LeaseResourceType type, int pods, int 
partitions, int expectedPartitionsPerPod, int tolerance)
+            throws Exception {
+        Map<String, List<LeaderRecorder>> recorders = createCluster(type, 
pods, partitions);
+        context.start();
+
+        waitForAllLeaders(recorders, leaders -> {
+            Map<String, Long> counts = leaders.values().stream()
+                    .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
+
+            for (Long count : counts.values()) {
+                if (count < expectedPartitionsPerPod || count > 
expectedPartitionsPerPod + tolerance) {
+                    return false;
+                }
+            }
+            return true;
+        }, 30, TimeUnit.SECONDS);
+    }
+
+    private Map<String, List<LeaderRecorder>> createCluster(LeaseResourceType 
type, int pods, int partitions) {
+        Map<String, List<LeaderRecorder>> recorders = new HashMap<>();
+        for (int i = 0; i < partitions; i++) {
+            String partitionName = "partition-" + i;
+            recorders.put(partitionName, new ArrayList<>());
+            for (int j = 0; j < pods; j++) {
+                recorders.get(partitionName).add(addMember("mypod-" + j, 
partitionName, type, true));
+            }
+        }
+        return recorders;
+    }
+
+    private void waitForAllLeaders(
+            Map<String, List<LeaderRecorder>> partitionRecorders,
+            Predicate<Map<String, String>> condition, long time, TimeUnit 
unit) {
+        Awaitility.waitAtMost(time, unit).until(() -> {
+            Map<String, String> leaders = new HashMap<>();
+            for (String partition : partitionRecorders.keySet()) {
+                String leader = null;
+                for (LeaderRecorder recorder : 
partitionRecorders.get(partition)) {
+                    String partitionLeader = recorder.getCurrentLeader();
+                    if (partitionLeader == null || (leader != null && 
!leader.equals(partitionLeader))) {
+                        return false;
+                    }
+                    leader = partitionLeader;
+                }
+                if (leader == null) {
+                    return false;
+                }
+                leaders.put(partition, leader);
+            }
+            return condition.test(leaders);
+        });
+    }
+
+    private void withLockServer(String pod, Consumer<LockTestServer<?>> 
consumer) {
+        consumer.accept(this.lockServers.get(pod));
+    }
+
     private void delayRequestsFromPod(String pod, long delay, TimeUnit unit) {
-        
this.lockServers.get(pod).setDelayRequests(TimeUnit.MILLISECONDS.convert(delay, 
unit));
+        withLockServer(pod, server -> 
server.setDelayRequests(TimeUnit.MILLISECONDS.convert(delay, unit)));
     }
 
     private void refuseRequestsFromPod(String pod) {
-        this.lockServers.get(pod).setRefuseRequests(true);
+        withLockServer(pod, server -> server.setRefuseRequests(true));
     }
 
     private void allowRequestsFromPod(String pod) {
-        this.lockServers.get(pod).setRefuseRequests(false);
+        withLockServer(pod, server -> server.setRefuseRequests(false));
     }
 
     private void disconnectPod(String pod) {
-        for (LockTestServer server : this.lockServers.values()) {
+        for (LockTestServer<?> server : this.lockServers.values()) {
             server.removePod(pod);
         }
     }
 
     private void connectPod(String pod) {
-        for (LockTestServer server : this.lockServers.values()) {
+        for (LockTestServer<?> server : this.lockServers.values()) {
             server.addPod(pod);
         }
     }
 
+    private void connectSimulator(ResourceLockSimulator<?> lockSimulator) {
+        for (LockTestServer<?> server : this.lockServers.values()) {
+            server.addSimulator(lockSimulator);
+        }
+    }
+
     private void checkLeadershipChangeDistance(long minimum, TimeUnit unit, 
LeaderRecorder... recorders) {
         List<LeaderRecorder.LeadershipInfo> infos = 
Arrays.stream(recorders).flatMap(lr -> lr.getLeadershipInfo().stream())
-                .sorted((li1, li2) -> Long.compare(li1.getChangeTimestamp(), 
li2.getChangeTimestamp()))
+                
.sorted(Comparator.comparingLong(LeaderRecorder.LeadershipInfo::getChangeTimestamp))
                 .collect(Collectors.toList());
 
         LeaderRecorder.LeadershipInfo currentLeaderLastSeen = null;
@@ -266,30 +386,69 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
         }
     }
 
-    private LeaderRecorder addMember(String name) {
-        return addMember(name, "app");
+    private LeaderRecorder addMember(String name, LeaseResourceType type) {
+        return addMember(name, "app", type);
     }
 
-    private LeaderRecorder addMember(String name, String namespace) {
-        assertNull(this.lockServers.get(name));
+    private LeaderRecorder addMember(String name, String namespace, 
LeaseResourceType type) {
+        return addMember(name, namespace, type, false);
+    }
 
-        LockTestServer lockServer = new LockTestServer(lockSimulator);
-        this.lockServers.put(name, lockServer);
+    private LeaderRecorder addMember(String name, String namespace, 
LeaseResourceType type, boolean rebalancing) {
+        ResourceLockSimulator<?> lockSimulator;
+        switch (type) {
+            case ConfigMap:
+                if (this.configMapLockSimulator == null) {
+                    this.configMapLockSimulator = new 
ConfigMapLockSimulator("leaders");
+                }
+                lockSimulator = this.configMapLockSimulator;
+                break;
+            case Lease:
+                if (!this.leaseLockSimulators.containsKey(namespace)) {
+                    this.leaseLockSimulators.put(namespace, new 
LeaseLockSimulator("leaders-" + namespace));
+                }
+                lockSimulator = this.leaseLockSimulators.get(namespace);
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported 
LeaseResourceType " + type);
+        }
 
-        KubernetesConfiguration configuration = new KubernetesConfiguration();
-        configuration.setKubernetesClient(lockServer.createClient());
+        if (!this.lockServers.containsKey(name)) {
+            this.lockServers.put(name, new LockTestServer<>());
+        }
+        LockTestServer<?> lockServer = this.lockServers.get(name);
+
+        CamelPreemptiveClusterService member = clusterServices.get(name);
+        if (member == null) {
+            KubernetesConfiguration configuration = new 
KubernetesConfiguration();
+            configuration.setKubernetesClient(lockServer.createClient());
+
+            KubernetesClusterService service = new 
KubernetesClusterService(configuration);
+            service.setKubernetesNamespace("test");
+            service.setPodName(name);
+            service.setLeaseDurationMillis(LEASE_TIME_MILLIS);
+            service.setRenewDeadlineMillis(RENEW_DEADLINE_MILLIS);
+            service.setRetryPeriodMillis(RETRY_PERIOD_MILLIS);
+            service.setJitterFactor(JITTER_FACTOR);
+            service.setLeaseResourceType(type);
+
+            if (rebalancing) {
+                member = new RebalancingCamelClusterService(service, 
RETRY_PERIOD_MILLIS);
+            } else {
+                member = service;
+            }
+
+            try {
+                context().addService(member);
+            } catch (Exception ex) {
+                throw new RuntimeException(ex);
+            }
 
-        KubernetesClusterService member = new 
KubernetesClusterService(configuration);
-        member.setKubernetesNamespace("test");
-        member.setPodName(name);
-        member.setLeaseDurationMillis(LEASE_TIME_MILLIS);
-        member.setRenewDeadlineMillis(RENEW_DEADLINE_MILLIS);
-        member.setRetryPeriodMillis(RETRY_PERIOD_MILLIS);
-        member.setJitterFactor(JITTER_FACTOR);
+            clusterServices.put(name, member);
+        }
 
         LeaderRecorder recorder = new LeaderRecorder();
         try {
-            context().addService(member);
             member.getView(namespace).addEventListener(recorder);
         } catch (Exception ex) {
             throw new RuntimeException(ex);
@@ -297,7 +456,9 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
 
         for (String pod : this.lockServers.keySet()) {
             connectPod(pod);
+            connectSimulator(lockSimulator);
         }
+
         return recorder;
     }
 
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
index f4e6e21..66dbe31 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
@@ -18,63 +18,39 @@ package org.apache.camel.component.kubernetes.cluster.utils;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Central lock for testing leader election.
+ * Central lock for testing leader election based on ConfigMap.
  */
-public class ConfigMapLockSimulator {
+public class ConfigMapLockSimulator extends ResourceLockSimulator<ConfigMap> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ConfigMapLockSimulator.class);
-
-    private String configMapName;
-
-    private ConfigMap currentMap;
-
-    private long versionCounter = 1000000;
-
-    public ConfigMapLockSimulator(String configMapName) {
-        this.configMapName = configMapName;
+    public ConfigMapLockSimulator(String resourceName) {
+        super(resourceName);
     }
 
-    public String getConfigMapName() {
-        return configMapName;
+    @Override
+    protected ConfigMap withNewResourceVersion(ConfigMap resource, String 
newResourceVersion) {
+        return new 
ConfigMapBuilder(resource).editOrNewMetadata().withResourceVersion(newResourceVersion)
+                .endMetadata().build();
     }
 
-    public synchronized boolean setConfigMap(ConfigMap map, boolean insert) {
-        // Insert
-        if (insert && currentMap != null) {
-            LOG.error("Current map should have been null");
-            return false;
-        }
-
-        // Update
-        if (!insert && currentMap == null) {
-            LOG.error("Current map should not have been null");
-            return false;
-        }
-        String version = map.getMetadata() != null ? 
map.getMetadata().getResourceVersion() : null;
-        if (version != null) {
-            long versionLong = Long.parseLong(version);
-            if (versionLong != versionCounter) {
-                LOG.warn("Current resource version is {} while the update is 
related to version {}", versionCounter,
-                        versionLong);
-                return false;
-            }
-        }
-
-        this.currentMap = new 
ConfigMapBuilder(map).editOrNewMetadata().withResourceVersion(String.valueOf(++versionCounter))
-                .endMetadata().build();
-        return true;
+    @Override
+    protected ConfigMap copyOf(ConfigMap resource) {
+        return new ConfigMapBuilder(resource).build();
     }
 
-    public synchronized ConfigMap getConfigMap() {
-        if (currentMap == null) {
-            return null;
-        }
+    @Override
+    public String getResourcePath() {
+        return "configmaps";
+    }
 
-        return new ConfigMapBuilder(currentMap).build();
+    @Override
+    public String getAPIPath() {
+        return "/api/v1";
     }
 
+    @Override
+    public Class<ConfigMap> getResourceClass() {
+        return ConfigMap.class;
+    }
 }
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LeaseLockSimulator.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LeaseLockSimulator.java
new file mode 100644
index 0000000..3d99656
--- /dev/null
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LeaseLockSimulator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.cluster.utils;
+
+import io.fabric8.kubernetes.api.model.coordination.v1.Lease;
+import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder;
+
+/**
+ * Central lock for testing leader election based on Lease.
+ */
+public class LeaseLockSimulator extends ResourceLockSimulator<Lease> {
+
+    public LeaseLockSimulator(String resourceName) {
+        super(resourceName);
+    }
+
+    @Override
+    protected Lease withNewResourceVersion(Lease resource, String 
newResourceVersion) {
+        return new 
LeaseBuilder(resource).editOrNewMetadata().withResourceVersion(newResourceVersion)
+                .endMetadata().build();
+    }
+
+    @Override
+    protected Lease copyOf(Lease resource) {
+        return new LeaseBuilder(resource).build();
+    }
+
+    @Override
+    public String getResourcePath() {
+        return "leases";
+    }
+
+    @Override
+    public String getAPIPath() {
+        return "/apis/coordination.k8s.io/v1";
+    }
+
+    @Override
+    public Class<Lease> getResourceClass() {
+        return Lease.class;
+    }
+}
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java
index bd14acc..1a5bd7a 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java
@@ -19,12 +19,15 @@ package org.apache.camel.component.kubernetes.cluster.utils;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import io.fabric8.kubernetes.api.model.ConfigMap;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.PodListBuilder;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -37,7 +40,7 @@ import org.slf4j.LoggerFactory;
 /**
  * A Test server to interact with Kubernetes for locking on a ConfigMap.
  */
-public class LockTestServer extends KubernetesMockServer {
+public class LockTestServer<T extends HasMetadata> extends 
KubernetesMockServer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(LockTestServer.class);
 
@@ -47,15 +50,106 @@ public class LockTestServer extends KubernetesMockServer {
 
     private Set<String> pods;
 
-    public LockTestServer(ConfigMapLockSimulator lockSimulator) {
-        this(lockSimulator, Collections.emptySet());
+    private Map<String, ResourceLockSimulator<T>> simulators;
+
+    public LockTestServer() {
+        this(Collections.emptySet());
     }
 
-    public LockTestServer(ConfigMapLockSimulator lockSimulator, 
Collection<String> initialPods) {
+    public LockTestServer(Collection<String> initialPods) {
 
         this.pods = new TreeSet<>(initialPods);
+        this.simulators = new HashMap<>();
+
+        // Other resources
+        expect().get().withPath("/api/v1/namespaces/test/pods")
+                .andReply(200,
+                        request -> new 
PodListBuilder().withNewMetadata().withResourceVersion("1").and()
+                                .withItems(getCurrentPods().stream()
+                                        .map(name -> new 
PodBuilder().withNewMetadata().withName(name).and().build())
+                                        .collect(Collectors.toList()))
+                                .build())
+                .always();
 
-        expect().get().withPath("/api/v1/namespaces/test/configmaps/" + 
lockSimulator.getConfigMapName())
+    }
+
+    public void addSimulator(ResourceLockSimulator<?> paramLockSimulator) {
+        ResourceLockSimulator<T> lockSimulator = (ResourceLockSimulator<T>) 
paramLockSimulator;
+        if (this.simulators.containsKey(lockSimulator.getResourceName())) {
+            return;
+        }
+        this.simulators.put(lockSimulator.getResourceName(), lockSimulator);
+
+        if (this.simulators.size() == 1) {
+            // Global methods defined once
+            expect().post().withPath(lockSimulator.getAPIPath() + 
"/namespaces/test/" + lockSimulator.getResourcePath())
+                    .andReply(new ResponseProvider<Object>() {
+
+                        private Headers headers = new 
Headers.Builder().build();
+                        private Map<Integer, String> lockNames = new 
HashMap<>();
+
+                        @Override
+                        public int getStatusCode(RecordedRequest request) {
+                            if (refuseRequests) {
+                                return 500;
+                            }
+
+                            T resource;
+                            try {
+                                resource = convert(request, 
lockSimulator.getResourceClass());
+                            } catch (Exception e) {
+                                LOG.error("Error during resource conversion", 
e);
+                                return 500;
+                            }
+
+                            if (resource == null) {
+                                LOG.error("No resource received");
+                                return 500;
+                            }
+                            ResourceLockSimulator<T> lockSimulator = 
simulators.get(resource.getMetadata().getName());
+                            if (resource.getMetadata() == null
+                                    || 
!lockSimulator.getResourceName().equals(resource.getMetadata().getName())) {
+                                LOG.error("Illegal resource received");
+                                return 500;
+                            }
+
+                            boolean done = lockSimulator.setResource(resource, 
true);
+                            if (done) {
+                                lockNames.put(request.getSequenceNumber(), 
lockSimulator.getResourceName());
+                                return 201;
+                            }
+                            return 500;
+                        }
+
+                        @Override
+                        public Object getBody(RecordedRequest recordedRequest) 
{
+                            delayIfNecessary();
+
+                            if 
(lockNames.containsKey(recordedRequest.getSequenceNumber())) {
+                                T resource = 
simulators.get(lockNames.get(recordedRequest.getSequenceNumber())).getResource();
+                                if (resource != null) {
+                                    return resource;
+                                }
+                            }
+
+                            return "";
+                        }
+
+                        @Override
+                        public Headers getHeaders() {
+                            return headers;
+                        }
+
+                        @Override
+                        public void setHeaders(Headers headers) {
+                            this.headers = headers;
+                        }
+                    }).always();
+        }
+
+        expect().get()
+                .withPath(lockSimulator.getAPIPath() + "/namespaces/test/" + 
lockSimulator.getResourcePath() + "/"
+                          + lockSimulator.getResourceName())
                 .andReply(new ResponseProvider<Object>() {
 
                     private Headers headers = new Headers.Builder().build();
@@ -66,7 +160,7 @@ public class LockTestServer extends KubernetesMockServer {
                             return 500;
                         }
 
-                        if (lockSimulator.getConfigMap() != null) {
+                        if (lockSimulator.getResource() != null) {
                             return 200;
                         }
 
@@ -76,9 +170,9 @@ public class LockTestServer extends KubernetesMockServer {
                     @Override
                     public Object getBody(RecordedRequest recordedRequest) {
                         delayIfNecessary();
-                        ConfigMap map = lockSimulator.getConfigMap();
-                        if (map != null) {
-                            return map;
+                        T resource = lockSimulator.getResource();
+                        if (resource != null) {
+                            return resource;
                         }
                         return "";
                     }
@@ -94,53 +188,9 @@ public class LockTestServer extends KubernetesMockServer {
                     }
                 }).always();
 
-        
expect().post().withPath("/api/v1/namespaces/test/configmaps").andReply(new 
ResponseProvider<Object>() {
-
-            private Headers headers = new Headers.Builder().build();
-
-            @Override
-            public int getStatusCode(RecordedRequest request) {
-                if (refuseRequests) {
-                    return 500;
-                }
-
-                ConfigMap map = convert(request);
-                if (map == null || map.getMetadata() == null
-                        || 
!lockSimulator.getConfigMapName().equals(map.getMetadata().getName())) {
-                    throw new IllegalArgumentException("Illegal configMap 
received");
-                }
-
-                boolean done = lockSimulator.setConfigMap(map, true);
-                if (done) {
-                    return 201;
-                }
-                return 500;
-            }
-
-            @Override
-            public Object getBody(RecordedRequest recordedRequest) {
-                delayIfNecessary();
-
-                ConfigMap map = lockSimulator.getConfigMap();
-                if (map != null) {
-                    return map;
-                }
-
-                return "";
-            }
-
-            @Override
-            public Headers getHeaders() {
-                return headers;
-            }
-
-            @Override
-            public void setHeaders(Headers headers) {
-                this.headers = headers;
-            }
-        }).always();
-
-        expect().put().withPath("/api/v1/namespaces/test/configmaps/" + 
lockSimulator.getConfigMapName())
+        expect().put()
+                .withPath(lockSimulator.getAPIPath() + "/namespaces/test/" + 
lockSimulator.getResourcePath() + "/"
+                          + lockSimulator.getResourceName())
                 .andReply(new ResponseProvider<Object>() {
 
                     private Headers headers = new Headers.Builder().build();
@@ -151,9 +201,15 @@ public class LockTestServer extends KubernetesMockServer {
                             return 500;
                         }
 
-                        ConfigMap map = convert(request);
+                        T resource;
+                        try {
+                            resource = convert(request, 
lockSimulator.getResourceClass());
+                        } catch (Exception e) {
+                            LOG.error("Error during resource conversion", e);
+                            return 500;
+                        }
 
-                        boolean done = lockSimulator.setConfigMap(map, false);
+                        boolean done = lockSimulator.setResource(resource, 
false);
                         if (done) {
                             return 200;
                         }
@@ -163,9 +219,9 @@ public class LockTestServer extends KubernetesMockServer {
                     @Override
                     public Object getBody(RecordedRequest recordedRequest) {
                         delayIfNecessary();
-                        ConfigMap map = lockSimulator.getConfigMap();
-                        if (map != null) {
-                            return map;
+                        T resource = lockSimulator.getResource();
+                        if (resource != null) {
+                            return resource;
                         }
 
                         return "";
@@ -181,16 +237,6 @@ public class LockTestServer extends KubernetesMockServer {
                         this.headers = headers;
                     }
                 }).always();
-
-        // Other resources
-        expect().get().withPath("/api/v1/namespaces/test/pods")
-                .andReply(200,
-                        request -> new 
PodListBuilder().withNewMetadata().withResourceVersion("1").and()
-                                .withItems(getCurrentPods().stream()
-                                        .map(name -> new 
PodBuilder().withNewMetadata().withName(name).and().build())
-                                        .collect(Collectors.toList()))
-                                .build())
-                .always();
     }
 
     public boolean isRefuseRequests() {
@@ -231,13 +277,9 @@ public class LockTestServer extends KubernetesMockServer {
         }
     }
 
-    private ConfigMap convert(RecordedRequest request) {
-        try {
-            ObjectMapper mapper = new ObjectMapper();
-            return mapper.readValue(request.getBody().readByteArray(), 
ConfigMap.class);
-        } catch (IOException e) {
-            throw new IllegalArgumentException("Erroneous data", e);
-        }
+    private T convert(RecordedRequest request, Class<T> targetClass) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper().registerModule(new 
JavaTimeModule());
+        return mapper.readValue(request.getBody().readByteArray(), 
targetClass);
     }
 
 }
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ResourceLockSimulator.java
similarity index 53%
copy from 
components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
copy to 
components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ResourceLockSimulator.java
index f4e6e21..558f0f8 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ResourceLockSimulator.java
@@ -16,45 +16,44 @@
  */
 package org.apache.camel.component.kubernetes.cluster.utils;
 
-import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.HasMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Central lock for testing leader election.
  */
-public class ConfigMapLockSimulator {
+public abstract class ResourceLockSimulator<T extends HasMetadata> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ConfigMapLockSimulator.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResourceLockSimulator.class);
 
-    private String configMapName;
+    private String resourceName;
 
-    private ConfigMap currentMap;
+    private T currentResource;
 
     private long versionCounter = 1000000;
 
-    public ConfigMapLockSimulator(String configMapName) {
-        this.configMapName = configMapName;
+    public ResourceLockSimulator(String resourceName) {
+        this.resourceName = resourceName;
     }
 
-    public String getConfigMapName() {
-        return configMapName;
+    public String getResourceName() {
+        return resourceName;
     }
 
-    public synchronized boolean setConfigMap(ConfigMap map, boolean insert) {
+    public synchronized boolean setResource(T resource, boolean insert) {
         // Insert
-        if (insert && currentMap != null) {
-            LOG.error("Current map should have been null");
+        if (insert && currentResource != null) {
+            LOG.error("Current resource should have been null");
             return false;
         }
 
         // Update
-        if (!insert && currentMap == null) {
-            LOG.error("Current map should not have been null");
+        if (!insert && currentResource == null) {
+            LOG.error("Current resource should not have been null");
             return false;
         }
-        String version = map.getMetadata() != null ? 
map.getMetadata().getResourceVersion() : null;
+        String version = resource.getMetadata() != null ? 
resource.getMetadata().getResourceVersion() : null;
         if (version != null) {
             long versionLong = Long.parseLong(version);
             if (versionLong != versionCounter) {
@@ -64,17 +63,26 @@ public class ConfigMapLockSimulator {
             }
         }
 
-        this.currentMap = new 
ConfigMapBuilder(map).editOrNewMetadata().withResourceVersion(String.valueOf(++versionCounter))
-                .endMetadata().build();
+        this.currentResource = withNewResourceVersion(resource, 
String.valueOf(++versionCounter));
         return true;
     }
 
-    public synchronized ConfigMap getConfigMap() {
-        if (currentMap == null) {
+    public synchronized T getResource() {
+        if (currentResource == null) {
             return null;
         }
 
-        return new ConfigMapBuilder(currentMap).build();
+        return copyOf(currentResource);
     }
 
+    protected abstract T withNewResourceVersion(T resource, String 
newResourceVersion);
+
+    protected abstract T copyOf(T resource);
+
+    public abstract String getAPIPath();
+
+    public abstract String getResourcePath();
+
+    public abstract Class<T> getResourceClass();
+
 }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterService.java
 
b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterService.java
new file mode 100644
index 0000000..652ba67
--- /dev/null
+++ 
b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterService.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.cluster;
+
+/**
+ * A {@link CamelPreemptiveClusterService} is a {@link CamelClusterService} 
that manages
+ * {@link CamelPreemptiveClusterView}s.
+ */
+public interface CamelPreemptiveClusterService extends CamelClusterService {
+
+    @Override
+    CamelPreemptiveClusterView getView(String namespace) throws Exception;
+
+}
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterView.java
 
b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterView.java
new file mode 100644
index 0000000..c8a2136
--- /dev/null
+++ 
b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterView.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.cluster;
+
+/**
+ * A {@link CamelPreemptiveClusterView} is a {@link CamelClusterView} that can 
be externally disabled by another
+ * controller.
+ */
+public interface CamelPreemptiveClusterView extends CamelClusterView {
+
+    /**
+     * Enable or disables a view.
+     */
+    void setDisabled(boolean disabled);
+
+    /**
+     * Check if the view is disabled.
+     */
+    boolean isDisabled();
+
+}
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cluster/RebalancingCamelClusterService.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cluster/RebalancingCamelClusterService.java
new file mode 100644
index 0000000..372734f
--- /dev/null
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cluster/RebalancingCamelClusterService.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support.cluster;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.cluster.CamelPreemptiveClusterService;
+import org.apache.camel.cluster.CamelPreemptiveClusterView;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link RebalancingCamelClusterService} adds rebalancing capabilities to 
an underlying
+ * {@link CamelPreemptiveClusterService}. Each view is treated as a partition 
by this cluster service and it makes sure
+ * that all services belonging to the cluster own a balanced number of 
partitions (same number or difference at most 1
+ * when not possible).
+ */
+public class RebalancingCamelClusterService implements 
CamelPreemptiveClusterService {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RebalancingCamelClusterService.class);
+
+    protected ScheduledExecutorService serializedExecutor;
+
+    protected CamelPreemptiveClusterService delegate;
+
+    protected CamelContext camelContext;
+
+    protected long periodMillis;
+
+    public RebalancingCamelClusterService(CamelPreemptiveClusterService 
delegate, long periodMillis) {
+        this.delegate = ObjectHelper.notNull(delegate, "delegate");
+        this.periodMillis = periodMillis;
+    }
+
+    public RebalancingCamelClusterService(CamelContext camelContext, 
CamelPreemptiveClusterService delegate,
+                                          long periodMillis) {
+        this.camelContext = ObjectHelper.notNull(camelContext, "camelContext");
+        this.delegate = ObjectHelper.notNull(delegate, "delegate");
+        this.periodMillis = periodMillis;
+    }
+
+    @Override
+    public void start() {
+        delegate.start();
+        if (serializedExecutor == null) {
+            serializedExecutor = 
getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
+                    "RebalancingClusterService");
+            serializedExecutor.execute(this::reconcile);
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (serializedExecutor != null) {
+            serializedExecutor.shutdownNow();
+        }
+        serializedExecutor = null;
+
+        delegate.stop();
+    }
+
+    public CamelPreemptiveClusterService getDelegate() {
+        return delegate;
+    }
+
+    public long getPeriodMillis() {
+        return periodMillis;
+    }
+
+    public void setDelegate(CamelPreemptiveClusterService delegate) {
+        this.delegate = delegate;
+    }
+
+    protected void reconcile() {
+        Integer n = members();
+        List<String> partitions = partitionList();
+        int k = partitions.size();
+
+        if (n == null || n == 0 || k == 0) {
+            rescheduleAfterDelay();
+            return;
+        }
+
+        int threshold = 0;
+        while (threshold <= k) {
+            threshold += n;
+        }
+        threshold -= n;
+
+        int quota = threshold / n;
+
+        List<String> main = new ArrayList<>();
+        List<String> remaining = new ArrayList<>();
+        for (int i = 0; i < threshold; i++) {
+            main.add(partitions.get(i));
+        }
+        for (int i = threshold; i < partitions.size(); i++) {
+            remaining.add(partitions.get(i));
+        }
+
+        rebalanceGroup(main, quota);
+        rebalanceGroup(remaining, 1);
+        rescheduleAfterDelay();
+    }
+
+    protected void rebalanceGroup(List<String> partitions, int quota) {
+        List<String> owned = owned(partitions);
+        if (owned == null) {
+            return;
+        }
+
+        if (owned.size() < quota) {
+            // Open all (to let the controller choose which ones)
+            for (String partition : partitions) {
+                setDisabled(partition, false);
+            }
+        } else if (owned.size() > quota) {
+            for (int i = 0; i < owned.size() - quota; i++) {
+                setDisabled(owned.get(i), true);
+            }
+        } else {
+            // We're fine, but we prevent this instance from stealing locks 
that are not needed
+            Set<String> ownedSet = new HashSet<>(owned);
+            for (String partition : partitions) {
+                if (!ownedSet.contains(partition)) {
+                    setDisabled(partition, true);
+                }
+            }
+        }
+    }
+
+    protected void setDisabled(String partition, boolean disabled) {
+        try {
+            LOG.debug("Setting partition {} to disabled={}...", partition, 
disabled);
+            CamelPreemptiveClusterView view = delegate.getView(partition);
+            if (view.isDisabled() != disabled) {
+                view.setDisabled(disabled);
+            }
+        } catch (Exception ex) {
+            LOG.warn("Could not get view " + partition, ex);
+        }
+    }
+
+    protected List<String> owned(List<String> partitions) {
+        List<String> owned = new ArrayList<>(partitions.size());
+        for (String partition : partitions) {
+            try {
+                CamelPreemptiveClusterView view = delegate.getView(partition);
+                if (!view.isDisabled() && view.getLocalMember().isLeader()) {
+                    owned.add(partition);
+                }
+            } catch (Exception ex) {
+                LOG.warn("Could not get view " + partition, ex);
+                return null;
+            }
+        }
+        return owned;
+    }
+
+    protected List<String> partitionList() {
+        ArrayList<String> partitions = new ArrayList<>(this.getNamespaces());
+        Collections.sort(partitions);
+        return partitions;
+    }
+
+    protected Integer members() {
+        Set<String> members = null;
+        for (String group : this.getNamespaces()) {
+            try {
+                CamelPreemptiveClusterView view = delegate.getView(group);
+                Set<String> viewMembers = 
view.getMembers().stream().map(CamelClusterMember::getId).collect(Collectors.toSet());
+                if (members != null && !members.equals(viewMembers)) {
+                    LOG.debug("View members don't match: {} vs {}", members, 
viewMembers);
+                    return null;
+                }
+                members = viewMembers;
+            } catch (Exception ex) {
+                LOG.warn("Could not get view " + group, ex);
+                return null;
+            }
+        }
+        return members != null ? members.size() : 0;
+    }
+
+    private void rescheduleAfterDelay() {
+        this.serializedExecutor.schedule(this::reconcile,
+                this.periodMillis,
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public CamelPreemptiveClusterView getView(String namespace) throws 
Exception {
+        return delegate.getView(namespace);
+    }
+
+    @Override
+    public void releaseView(CamelClusterView view) throws Exception {
+        delegate.releaseView(view);
+    }
+
+    @Override
+    public Collection<String> getNamespaces() {
+        return delegate.getNamespaces();
+    }
+
+    @Override
+    public void startView(String namespace) throws Exception {
+        delegate.startView(namespace);
+    }
+
+    @Override
+    public void stopView(String namespace) throws Exception {
+        delegate.stopView(namespace);
+    }
+
+    @Override
+    public boolean isLeader(String namespace) {
+        return delegate.isLeader(namespace);
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+        delegate.setCamelContext(camelContext);
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return this.camelContext;
+    }
+
+    @Override
+    public void setId(String id) {
+        delegate.setId(id);
+    }
+
+    @Override
+    public String getId() {
+        return delegate.getId();
+    }
+}

Reply via email to