This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b8ec201e93 Add table level distributed locking for ensuring minion 
task generation is atomic (#16857)
4b8ec201e93 is described below

commit 4b8ec201e933ce1928848ed291f11fcf9a298a7f
Author: Sonam Mandal <[email protected]>
AuthorDate: Thu Oct 9 10:16:05 2025 -0700

    Add table level distributed locking for ensuring minion task generation is 
atomic (#16857)
    
    * Add table level distributed locking for ensuring minion task generation 
is atomic
    
    * Remove -State ZNode to simplify distributed locking mechanism
    
    * Address review comments
    
    * Add tests for PinotTaskManager with distributed locking enabled
    
    * Address review comments around force releasing the lock
    
    * Add ability to retry lock release on failure and if it still exists
---
 .../pinot/common/metadata/ZKMetadataProvider.java  |    6 +
 .../pinot/common/metrics/ControllerTimer.java      |    1 +
 .../apache/pinot/controller/ControllerConf.java    |    9 +
 .../api/resources/PinotTaskRestletResource.java    |   21 +
 .../core/minion/DistributedTaskLockManager.java    |  353 ++++++
 .../helix/core/minion/PinotTaskManager.java        |  220 ++--
 .../helix/core/minion/TaskSchedulingInfo.java      |    6 +
 .../minion/DistributedTaskLockManagerTest.java     |  365 +++++++
 .../PinotTaskManagerDistributedLockingTest.java    | 1118 ++++++++++++++++++++
 .../java/org/apache/pinot/core/auth/Actions.java   |    1 +
 10 files changed, 2039 insertions(+), 61 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 42be2fe5ae1..00db262b8d4 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -85,6 +85,7 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_SEGMENT_LINEAGE = 
"/SEGMENT_LINEAGE";
   private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX = 
"/MINION_TASK_METADATA";
   private static final String PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX = 
"/CONFIGS/QUERYWORKLOAD";
+  private static final String PROPERTYSTORE_TASK_LOCK_SUFFIX = "-Lock";
 
   public static void setUserConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String username, ZNRecord znRecord) {
     propertyStore.set(constructPropertyStorePathForUserConfig(username), 
znRecord, AccessOption.PERSISTENT);
@@ -308,6 +309,11 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, 
tableNameWithType);
   }
 
+  public static String 
constructPropertyStorePathForMinionTaskGenerationLock(String tableNameWithType) 
{
+    return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, 
tableNameWithType
+        + PROPERTYSTORE_TASK_LOCK_SUFFIX);
+  }
+
   public static String getPropertyStoreWorkloadConfigsPrefix() {
     return PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX;
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
index 82ddc6ba51a..d8f344e2ac5 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
@@ -29,6 +29,7 @@ public enum ControllerTimer implements AbstractMetrics.Timer {
   TABLE_REBALANCE_EXECUTION_TIME_MS("tableRebalanceExecutionTimeMs", false),
   CRON_SCHEDULER_JOB_EXECUTION_TIME_MS("cronSchedulerJobExecutionTimeMs", 
false),
   IDEAL_STATE_UPDATE_TIME_MS("IdealStateUpdateTimeMs", false),
+  
MINION_TASK_GENERATION_LOCK_HELD_ELAPSED_TIME_MS("minionTaskGenerationLockHeldElapsedTimeMs",
 false),
   // How long it took the server to start.
   STARTUP_SUCCESS_DURATION_MS("startupSuccessDurationMs", true),
   // Time taken to read the segment from deep store
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index a56ff5594c9..ee7c7c4dd35 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -172,6 +172,10 @@ public class ControllerConf extends PinotConfiguration {
     // end of the task.
     public static final String PINOT_TASK_EXPIRE_TIME_MS = 
"controller.task.expire.time.ms";
 
+    // Distributed lock enablement for PinotTaskManager
+    public static final String ENABLE_DISTRIBUTED_LOCKING = 
"controller.task.enableDistributedLocking";
+    public static final boolean DEFAULT_ENABLE_DISTRIBUTED_LOCKING = false;
+
     @Deprecated
     // RealtimeSegmentRelocator has been rebranded as SegmentRelocator
     public static final String DEPRECATED_REALTIME_SEGMENT_RELOCATOR_FREQUENCY 
=
@@ -1210,6 +1214,11 @@ public class ControllerConf extends PinotConfiguration {
     return 
getProperty(ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, 
false);
   }
 
+  public boolean isPinotTaskManagerDistributedLockingEnabled() {
+    return getProperty(ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
+        ControllerPeriodicTasksConf.DEFAULT_ENABLE_DISTRIBUTED_LOCKING);
+  }
+
   public long getPinotTaskExpireTimeInMs() {
     return getProperty(ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS, 
TimeUnit.HOURS.toMillis(24));
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 56c5aed10e8..6924296b9c2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -791,4 +791,25 @@ public class PinotTaskRestletResource {
     _pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete);
     return new SuccessResponse("Successfully deleted task: " + taskName);
   }
+
+  @DELETE
+  @Path("/tasks/lock/forceRelease")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.FORCE_RELEASE_TASK_GENERATION_LOCK,
+      paramName = "tableNameWithType")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation("Force releases the task generation lock for a given table. 
Call this API with caution")
+  public SuccessResponse forceReleaseTaskGenerationLock(
+      @ApiParam(value = "Table name (with type suffix).", required = true)
+      @QueryParam("tableNameWithType") String tableNameWithType) {
+    try {
+      _pinotTaskManager.forceReleaseLock(tableNameWithType);
+      return new SuccessResponse("Successfully released task generation lock 
on table " + tableNameWithType
+          + " for all task types");
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to release task 
generation lock on table: "
+          + tableNameWithType + ", with exception: " + 
ExceptionUtils.getStackTrace(e),
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManager.java
new file mode 100644
index 00000000000..e98f39a2adf
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManager.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ControllerTimer;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manages distributed locks for minion task generation using ZooKeeper 
ephemeral nodes that automatically disappear
+ * when the controller session ends or when the lock is explicitly released. 
This approach provides automatic cleanup
+ * and is suitable for long-running task generation.
+ * Locks are at the table level, to ensure that only one type of task can be 
generated per table at any given time.
+ * This is to prevent task types which shouldn't run in parallel from being 
generated at the same time.
+ * <p>
+ * ZK EPHEMERAL Lock Node:
+ *   <ul>
+ *     <li>Every lock is created at the table level with the name: 
{tableName}-Lock, under the base path
+ *     MINION_TASK_METADATA within the PROPERTYSTORE.
+ *     <li>If the propertyStore::create() call returns true, that means the 
lock node was successfully created and the
+ *     lock belongs to the current controller, otherwise it was not. If the 
lock node already exists, this will return
+ *     false. No clean-up of the lock node is needed if the 
propertyStore::create() call returns false.
+ *     <li>The locks are EPHEMERAL in nature, meaning that once the session 
with ZK is lost, the lock is automatically
+ *     cleaned up. Scenarios when the ZK session can be lost: a) controller 
shutdown, b) controller crash, c) ZK session
+ *     expiry (e.g. long GC pauses can cause this). This property helps ensure 
that the lock is released under
+ *     controller failure.
+ *   </ul>
+ * <p>
+ */
+public class DistributedTaskLockManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DistributedTaskLockManager.class);
+
+  // Lock paths are constructed using ZKMetadataProvider
+  private static final String LOCK_OWNER_KEY = "lockOwner";
+  private static final String LOCK_CREATION_TIME_MS = "lockCreationTimeMs";
+
+  // Retry constants
+  private static final int MAX_RETRIES = 3;
+  private static final int BASE_RETRY_DELAY_MS = 100;
+
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final String _controllerInstanceId;
+  private final ControllerMetrics _controllerMetrics;
+
+  public DistributedTaskLockManager(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String controllerInstanceId) {
+    _propertyStore = propertyStore;
+    _controllerInstanceId = controllerInstanceId;
+    _controllerMetrics = ControllerMetrics.get();
+  }
+
+  /**
+   * Attempts to acquire a distributed lock at the table level for task 
generation using session-based locking.
+   * The lock is at the table level instead of the task level to ensure that 
only a single task can be generated for
+   * a given table at any time. Certain tasks depend on other tasks not being 
generated at the same time
+   * The lock is held until explicitly released or the controller session ends.
+   *
+   * @param tableNameWithType the table name with type
+   * @return TaskLock object if successful, null if lock could not be acquired
+   */
+  @Nullable
+  public TaskLock acquireLock(String tableNameWithType) {
+    LOGGER.info("Attempting to acquire task generation lock for table: {} by 
controller: {}", tableNameWithType,
+        _controllerInstanceId);
+
+    try {
+      // Check if task generation is already in progress
+      if (isTaskGenerationInProgress(tableNameWithType)) {
+        LOGGER.info("Task generation already in progress for: {} by this or 
another controller", tableNameWithType);
+        return null;
+      }
+
+      // Try to acquire the lock using ephemeral node
+      TaskLock lock = tryAcquireSessionBasedLock(tableNameWithType);
+      if (lock != null) {
+        LOGGER.info("Successfully acquired task generation lock for table: {} 
by controller: {}", tableNameWithType,
+            _controllerInstanceId);
+        return lock;
+      } else {
+        LOGGER.warn("Could not acquire lock for table: {} - another controller 
must hold it", tableNameWithType);
+      }
+    } catch (Exception e) {
+      LOGGER.error("Error while trying to acquire task lock for table: {}", 
tableNameWithType, e);
+    }
+    return null;
+  }
+
+  private String getLockPath(String tableNameForPath) {
+    return 
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameForPath);
+  }
+
+  /**
+   * Releases a previously acquired session-based lock and marks task 
generation as completed.
+   *
+   * @param lock the lock to release
+   * @return true if successfully released, false otherwise
+   */
+  public boolean releaseLock(TaskLock lock) {
+    if (lock == null) {
+      return true;
+    }
+
+    String tableNameWithType = lock.getTableNameWithType();
+    String lockNode = lock.getLockZNodePath();
+
+    // Remove the ephemeral lock node
+    boolean status = true;
+    if (lockNode != null) {
+      try {
+        if (_propertyStore.exists(lockNode, AccessOption.EPHEMERAL)) {
+          status = removeWithRetries(lockNode, tableNameWithType);
+        } else {
+          LOGGER.warn("Ephemeral lock node: {} does not exist for table: {}, 
nothing to remove",
+              lockNode, tableNameWithType);
+        }
+      } catch (Exception e) {
+        status = false;
+        LOGGER.warn("Exception while trying to remove ephemeral lock node: 
{}", lockNode, e);
+      }
+    } else {
+      LOGGER.warn("Lock node path seems to be null for task lock: {}, treating 
release as a no-op", lock);
+    }
+
+    return status;
+  }
+
+  /**
+   * Attempts to remove a lock node with retries and exponential backoff.
+   * Only retries if the lock exists and the remove operation returns false.
+   */
+  private boolean removeWithRetries(String lockNode, String tableNameWithType) 
{
+    for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
+      try {
+        // Check if lock still exists before attempting removal
+        if (!_propertyStore.exists(lockNode, AccessOption.EPHEMERAL)) {
+          LOGGER.info("Lock node: {} no longer exists for table: {}, 
considering removal successful", lockNode,
+              tableNameWithType);
+          return true;
+        }
+
+        boolean removed = _propertyStore.remove(lockNode, 
AccessOption.EPHEMERAL);
+
+        if (removed) {
+          LOGGER.info("Successfully removed ephemeral lock node: {} for table: 
{} by controller: {} on attempt: {}",
+              lockNode, tableNameWithType, _controllerInstanceId, attempt);
+          return true;
+        } else {
+          LOGGER.warn("Failed to remove ephemeral lock node: {} for table: {} 
by controller: {} on attempt: {}/{}",
+              lockNode, tableNameWithType, _controllerInstanceId, attempt, 
MAX_RETRIES);
+
+          // If this is not the last attempt, wait before retrying
+          if (attempt < MAX_RETRIES) {
+            // Exponential backoff: 100ms, 200ms, 400ms
+            long delayMs = BASE_RETRY_DELAY_MS * (1L << (attempt - 1));
+            LOGGER.info("Retrying lock removal for table: {} after {}ms 
delay", tableNameWithType, delayMs);
+            Thread.sleep(delayMs);
+          }
+        }
+      } catch (InterruptedException e) {
+        LOGGER.warn("Interrupted while waiting to retry lock removal for 
table: {}", tableNameWithType);
+        Thread.currentThread().interrupt();
+        return false;
+      } catch (Exception e) {
+        LOGGER.warn("Exception while trying to remove ephemeral lock node: {} 
on attempt: {}/{}", lockNode, attempt,
+            MAX_RETRIES, e);
+
+        // If this is not the last attempt, wait before retrying
+        if (attempt < MAX_RETRIES) {
+          try {
+            // Exponential backoff: 100ms, 200ms, 400ms
+            long delayMs = BASE_RETRY_DELAY_MS * (1L << (attempt - 1));
+            LOGGER.info("Retrying lock removal for table: {} after {}ms delay 
due to exception", tableNameWithType,
+                delayMs);
+            Thread.sleep(delayMs);
+          } catch (InterruptedException ie) {
+            LOGGER.warn("Interrupted while waiting to retry lock removal for 
table: {}", tableNameWithType);
+            Thread.currentThread().interrupt();
+            return false;
+          }
+        }
+      }
+    }
+
+    LOGGER.error("Failed to remove ephemeral lock node: {} for table: {} after 
{} attempts", lockNode,
+        tableNameWithType, MAX_RETRIES);
+    return false;
+  }
+
+  /**
+   * Force release the lock without checking if any tasks are in progress
+   */
+  public void forceReleaseLock(String tableNameWithType) {
+    LOGGER.info("Trying to force release the lock for table: {}", 
tableNameWithType);
+    String lockPath = getLockPath(tableNameWithType);
+
+    if (!_propertyStore.exists(lockPath, AccessOption.EPHEMERAL)) {
+      String message = "No lock ZNode: " + lockPath + " found for table: " + 
tableNameWithType
+          + ", nothing to force release";
+      LOGGER.warn(message);
+      throw new RuntimeException(message);
+    }
+
+    LOGGER.info("Lock for table: {} found at path: {}, trying to remove", 
tableNameWithType, lockPath);
+    boolean result = _propertyStore.remove(lockPath, AccessOption.EPHEMERAL);
+    if (!result) {
+      String message = "Could not force release lock: " + lockPath + " for 
table: " + tableNameWithType;
+      LOGGER.error(message);
+      throw new RuntimeException(message);
+    }
+  }
+
+  /**
+   * Checks if any task generation is currently in progress for the given 
table.
+   *
+   * @param tableNameWithType the table name with type
+   * @return true if task generation is in progress, false otherwise
+   */
+  @VisibleForTesting
+  boolean isTaskGenerationInProgress(String tableNameWithType) {
+    String lockPath = getLockPath(tableNameWithType);
+
+    try {
+      long durationLockHeldMs = 0;
+      Stat stat = new Stat();
+      // Get the node instead of checking for existence to update the time 
held metric in case the node exists
+      ZNRecord zNRecord = _propertyStore.get(lockPath, stat, 
AccessOption.EPHEMERAL);
+      if (zNRecord != null) {
+        String creationTimeMsString = 
zNRecord.getSimpleField(LOCK_CREATION_TIME_MS);
+        long creationTimeMs = stat.getCtime();
+        if (creationTimeMsString != null) {
+          try {
+            creationTimeMs = Long.parseLong(creationTimeMsString);
+          } catch (NumberFormatException e) {
+            LOGGER.warn("Could not parse creationTimeMs string: {} into long 
from ZNode, using ZNode creation time",
+                creationTimeMsString);
+            creationTimeMs = stat.getCtime();
+          }
+        }
+        durationLockHeldMs = System.currentTimeMillis() - creationTimeMs;
+      }
+      _controllerMetrics.addTimedValue(tableNameWithType,
+          ControllerTimer.MINION_TASK_GENERATION_LOCK_HELD_ELAPSED_TIME_MS, 
durationLockHeldMs, TimeUnit.MILLISECONDS);
+      return zNRecord != null;
+    } catch (Exception e) {
+      LOGGER.error("Error checking task generation status for: {} with lock 
path: {}", tableNameWithType, lockPath, e);
+      return false;
+    }
+  }
+
+  /**
+   * Attempts to acquire a lock using ephemeral nodes.
+   */
+  @VisibleForTesting
+  TaskLock tryAcquireSessionBasedLock(String tableNameWithType) {
+    String lockPath = getLockPath(tableNameWithType);
+
+    try {
+      long currentTimeMs = System.currentTimeMillis();
+
+      // Create ephemeral node for this table, owned by this controller
+      ZNRecord lockRecord = new ZNRecord(_controllerInstanceId);
+      lockRecord.setSimpleField(LOCK_OWNER_KEY, _controllerInstanceId);
+      lockRecord.setSimpleField(LOCK_CREATION_TIME_MS, 
String.valueOf(currentTimeMs));
+
+      boolean created = _propertyStore.create(lockPath, lockRecord, 
AccessOption.EPHEMERAL);
+
+      if (created) {
+        LOGGER.info("Successfully created lock node at path: {}, for 
controller: {}, for table: {}", lockPath,
+            _controllerInstanceId, tableNameWithType);
+        return new TaskLock(tableNameWithType, _controllerInstanceId, 
currentTimeMs, lockPath);
+      }
+
+      // We could not create the lock node, returning null
+      LOGGER.warn("Could not create lock node at path: {} for controller: {}, 
for table: {}", lockPath,
+          _controllerInstanceId, tableNameWithType);
+    } catch (Exception e) {
+      LOGGER.error("Error creating lock under path: {}, for controller: {}, 
for table: {}", lockPath,
+          _controllerInstanceId, tableNameWithType, e);
+    }
+
+    return null;
+  }
+
+  /**
+   * Represents a session-based distributed lock for task generation.
+   * The lock is automatically released when the controller session ends.
+   * The state node is periodically cleaned up
+   */
+  public static class TaskLock {
+    private final String _tableNameWithType;
+    private final String _owner;
+    private final long _creationTimeMs;
+    private final String _lockZNodePath; // Path to the ephemeral lock node
+
+    public TaskLock(String tableNameWithType, String owner, long 
creationTimeMs, String lockZNodePath) {
+      _tableNameWithType = tableNameWithType;
+      _owner = owner;
+      _creationTimeMs = creationTimeMs;
+      _lockZNodePath = lockZNodePath;
+    }
+
+    public String getTableNameWithType() {
+      return _tableNameWithType;
+    }
+
+    public String getOwner() {
+      return _owner;
+    }
+
+    public long getCreationTimeMs() {
+      return _creationTimeMs;
+    }
+
+    public String getLockZNodePath() {
+      return _lockZNodePath;
+    }
+
+    public long getAge() {
+      return System.currentTimeMillis() - _creationTimeMs;
+    }
+
+    @Override
+    public String toString() {
+      return "TaskLock{tableNameWithType='" + _tableNameWithType + "', 
owner='" + _owner + "', creationTimeMs="
+          + _creationTimeMs + ", age=" + getAge() + ", lockZNodePath='" + 
_lockZNodePath + "'}";
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 8b9f33e9dbd..d7b7eaf8868 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -122,6 +122,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
 
   private final TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> 
_taskManagerStatusCache;
 
+  protected final @Nullable DistributedTaskLockManager 
_distributedTaskLockManager;
+
   public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
       PinotHelixResourceManager helixResourceManager, LeadControllerManager 
leadControllerManager,
       ControllerConf controllerConf, ControllerMetrics controllerMetrics,
@@ -149,6 +151,18 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     } else {
       _scheduler = null;
     }
+
+    // For distributed locking
+    boolean enableDistributedLocking = 
controllerConf.isPinotTaskManagerDistributedLockingEnabled();
+    if (enableDistributedLocking) {
+      LOGGER.info("Distributed locking is enabled for PinotTaskManager");
+      // Initialize distributed task lock manager if distributed locking is 
enabled
+      _distributedTaskLockManager = new 
DistributedTaskLockManager(helixResourceManager.getPropertyStore(),
+          helixResourceManager.getHelixZkManager().getInstanceName());
+    } else {
+      LOGGER.info("Distributed locking is disabled for PinotTaskManager");
+      _distributedTaskLockManager = null;
+    }
   }
 
   public void init() {
@@ -172,7 +186,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     }
   }
 
-  public Map<String, String> createTask(String taskType, String tableName, 
@Nullable String taskName,
+  public synchronized Map<String, String> createTask(String taskType, String 
tableName, @Nullable String taskName,
       Map<String, String> taskConfigs)
       throws Exception {
     if (taskName == null) {
@@ -210,6 +224,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     if (tableNameWithTypes.isEmpty()) {
       throw new TableNotFoundException("'tableName' " + tableName + " is not 
found");
     }
+    LOGGER.info("Generating tasks for {} tables, list: {}", 
tableNameWithTypes.size(), tableNameWithTypes);
 
     PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
     // Generate each type of tasks
@@ -239,33 +254,61 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
       // Example usage in BaseTaskGenerator.getNumSubTasks()
       taskConfigs.put(MinionConstants.TRIGGERED_BY, 
CommonConstants.TaskTriggers.ADHOC_TRIGGER.name());
 
-      List<PinotTaskConfig> pinotTaskConfigs = 
taskGenerator.generateTasks(tableConfig, taskConfigs);
-      if (pinotTaskConfigs.isEmpty()) {
-        LOGGER.warn("No ad-hoc task generated for task type: {}", taskType);
-        continue;
+      // Acquire distributed lock before proceeding with ad-hoc task generation
+      // Need locking to protect against:
+      // 1. Race conditions with periodic task generation
+      // 2. Multiple simultaneous ad-hoc requests
+      // 3. Leadership changes during task generation
+      DistributedTaskLockManager.TaskLock lock = null;
+      if (_distributedTaskLockManager != null) {
+        lock = _distributedTaskLockManager.acquireLock(tableNameWithType);
+        if (lock == null) {
+          String message = String.format("Could not acquire table level 
distributed lock for ad-hoc task type: %s, "
+                  + "table: %s. Another controller is likely generating tasks 
for this table. Please try again later.",
+              taskType, tableNameWithType);
+          LOGGER.warn(message);
+          throw new RuntimeException(message);
+        }
+        LOGGER.info("Acquired table level distributed lock for ad-hoc task 
type: {} on table: {}", taskType,
+            tableNameWithType);
       }
-      int maxNumberOfSubTasks = taskGenerator.getMaxAllowedSubTasksPerTask();
-      if (pinotTaskConfigs.size() > maxNumberOfSubTasks) {
-        String message = String.format(
-            "Number of tasks generated for task type: %s for table: %s is %d, 
which is greater than the "
-                + "maximum number of tasks to schedule: %d. This is "
-                + "controlled by the cluster config %s which is set based on 
controller's performance.", taskType,
-            tableName, pinotTaskConfigs.size(), maxNumberOfSubTasks, 
MinionConstants.MAX_ALLOWED_SUB_TASKS_KEY);
-        message += "Optimise the task config or reduce tableMaxNumTasks to 
avoid the error";
-        // We throw an exception to notify the user
-        // This is to ensure that the user is aware of the task generation 
limit
-        throw new RuntimeException(message);
+
+      try {
+        List<PinotTaskConfig> pinotTaskConfigs = 
taskGenerator.generateTasks(tableConfig, taskConfigs);
+        if (pinotTaskConfigs.isEmpty()) {
+          LOGGER.warn("No ad-hoc task generated for task type: {}, for table: 
{}", taskType, tableNameWithType);
+          continue;
+        }
+        int maxNumberOfSubTasks = taskGenerator.getMaxAllowedSubTasksPerTask();
+        if (pinotTaskConfigs.size() > maxNumberOfSubTasks) {
+          String message = String.format(
+              "Number of tasks generated for task type: %s for table: %s is 
%d, which is greater than the "
+                  + "maximum number of tasks to schedule: %d. This is 
controlled by the cluster config %s which is set "
+                  + "based on controller's performance.", taskType, 
tableNameWithType, pinotTaskConfigs.size(),
+              maxNumberOfSubTasks, MinionConstants.MAX_ALLOWED_SUB_TASKS_KEY);
+          message += "Optimise the task config or reduce tableMaxNumTasks to 
avoid the error";
+          // We throw an exception to notify the user
+          // This is to ensure that the user is aware of the task generation 
limit
+          throw new RuntimeException(message);
+        }
+        pinotTaskConfigs.forEach(pinotTaskConfig -> 
pinotTaskConfig.getConfigs()
+            .computeIfAbsent(MinionConstants.TRIGGERED_BY, k -> 
CommonConstants.TaskTriggers.ADHOC_TRIGGER.name()));
+        addDefaultsToTaskConfig(pinotTaskConfigs);
+        LOGGER.info("Submitting ad-hoc task for task type: {} with task 
configs: {}", taskType, pinotTaskConfigs);
+        _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1);
+        responseMap.put(tableNameWithType,
+            _helixTaskResourceManager.submitTask(parentTaskName, 
pinotTaskConfigs, minionInstanceTag,
+                taskGenerator.getTaskTimeoutMs(minionInstanceTag),
+                
taskGenerator.getNumConcurrentTasksPerInstance(minionInstanceTag),
+                taskGenerator.getMaxAttemptsPerTask(minionInstanceTag)));
+      } finally {
+        if (!responseMap.containsKey(tableNameWithType)) {
+          LOGGER.warn("No task submitted for tableNameWithType: {}", 
tableNameWithType);
+        }
+        if (lock != null) {
+          _distributedTaskLockManager.releaseLock(lock);
+        }
       }
-      pinotTaskConfigs.forEach(pinotTaskConfig -> pinotTaskConfig.getConfigs()
-          .computeIfAbsent(MinionConstants.TRIGGERED_BY, k -> 
CommonConstants.TaskTriggers.ADHOC_TRIGGER.name()));
-      addDefaultsToTaskConfig(pinotTaskConfigs);
-      LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: 
{}", taskType, pinotTaskConfigs);
-      _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1);
-      responseMap.put(tableNameWithType,
-          _helixTaskResourceManager.submitTask(parentTaskName, 
pinotTaskConfigs, minionInstanceTag,
-              taskGenerator.getTaskTimeoutMs(minionInstanceTag),
-              
taskGenerator.getNumConcurrentTasksPerInstance(minionInstanceTag),
-              taskGenerator.getMaxAttemptsPerTask(minionInstanceTag)));
     }
     if (responseMap.isEmpty()) {
       LOGGER.warn("No task submitted for tableName: {}", tableName);
@@ -273,6 +316,15 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     return responseMap;
   }
 
+  public void forceReleaseLock(String tableNameWithType) {
+    if (_distributedTaskLockManager == null) {
+      String message = "Distributed task lock manager is disabled, no locks to 
release";
+      LOGGER.warn(message);
+      throw new RuntimeException(message);
+    }
+    _distributedTaskLockManager.forceReleaseLock(tableNameWithType);
+  }
+
   private class ZkTableConfigChangeListener implements IZkChildListener {
 
     @Override
@@ -691,8 +743,45 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
       if (taskGenerator != null) {
         _helixTaskResourceManager.ensureTaskQueueExists(taskType);
         addTaskTypeMetricsUpdaterIfNeeded(taskType);
-        tasksScheduled.put(taskType, scheduleTask(taskGenerator, 
enabledTableConfigs, context.isLeader(),
-            context.getMinionInstanceTag(), context.getTriggeredBy()));
+
+        // Take the lock for all tables for which to schedule the tasks and 
pass the list of tables for which getting
+        // the lock was successful
+        // Need locking to protect against:
+        // 1. Race conditions with periodic task generation
+        // 2. Multiple simultaneous ad-hoc requests
+        // 3. Leadership changes during task generation
+        List<String> enabledTables =
+            
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
+        Map<String, DistributedTaskLockManager.TaskLock> acquiredTaskLocks = 
new HashMap<>();
+        for (String tableName : enabledTables) {
+          DistributedTaskLockManager.TaskLock lock;
+          if (_distributedTaskLockManager != null) {
+            lock = _distributedTaskLockManager.acquireLock(tableName);
+            if (lock == null) {
+              LOGGER.warn("Could not acquire table level distributed lock for 
scheduled task type: {} on table: {}, "
+                  + "skipping lock acquisition", taskType, tableName);
+              continue;
+            }
+            acquiredTaskLocks.put(tableName, lock);
+            LOGGER.info("Acquired table level distributed lock for scheduled 
task type: {} on table: {}", taskType,
+                tableName);
+          }
+        }
+
+        try {
+          tasksScheduled.put(taskType, scheduleTask(taskGenerator, 
enabledTableConfigs, context.isLeader(),
+              context.getMinionInstanceTag(), context.getTriggeredBy(), 
acquiredTaskLocks));
+        } catch (RuntimeException e) {
+          LOGGER.error("Caught exception while trying to schedule task type: 
{} for tables: {}, gathered responses: {}",
+              taskType, enabledTables, tasksScheduled, e);
+          throw e;
+        } finally {
+          // Release all the distributed table locks if any exist
+          assert acquiredTaskLocks.isEmpty() || _distributedTaskLockManager != 
null;
+          for (Map.Entry<String, DistributedTaskLockManager.TaskLock> 
taskLockEntry : acquiredTaskLocks.entrySet()) {
+            _distributedTaskLockManager.releaseLock(taskLockEntry.getValue());
+          }
+        }
       } else {
         List<String> enabledTables =
             
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
@@ -728,7 +817,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    *  - list of task scheduling errors if any
    */
   protected TaskSchedulingInfo scheduleTask(PinotTaskGenerator taskGenerator, 
List<TableConfig> enabledTableConfigs,
-      boolean isLeader, @Nullable String minionInstanceTagForTask, String 
triggeredBy) {
+      boolean isLeader, @Nullable String minionInstanceTagForTask, String 
triggeredBy,
+      Map<String, DistributedTaskLockManager.TaskLock> acquiredTaskLocks) {
     TaskSchedulingInfo response = new TaskSchedulingInfo();
     String taskType = taskGenerator.getTaskType();
     List<String> enabledTables =
@@ -749,7 +839,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
               + "table: %s. Disk utilization for one or more servers hosting 
this table has exceeded the threshold. "
               + "Tasks won't be generated until the issue is mitigated.", 
tableName);
           LOGGER.warn(message);
-          response.addSchedulingError(message);
+          response.addGenerationError(message);
           _controllerMetrics.setOrUpdateTableGauge(tableName, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 1L);
           continue;
         }
@@ -767,39 +857,47 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
           
tableTaskConfig.getConfigsForTaskType(taskType).put(MinionConstants.TRIGGERED_BY,
 triggeredBy);
         }
 
-        taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
-        int maxNumberOfSubTasks = taskGenerator.getMaxAllowedSubTasksPerTask();
-        if (presentTaskConfig.size() > maxNumberOfSubTasks) {
-          String message = String.format(
-              "Number of tasks generated for task type: %s for table: %s is 
%d, which is greater than the "
-                  + "maximum number of tasks to schedule: %d. This is "
-                  + "controlled by the cluster config %s which is set based on 
controller's performance.", taskType,
-              tableName, presentTaskConfig.size(), maxNumberOfSubTasks, 
MinionConstants.MAX_ALLOWED_SUB_TASKS_KEY);
-          if (TaskSchedulingContext.isUserTriggeredTask(triggeredBy)) {
-            message += "Optimise the task config or reduce tableMaxNumTasks to 
avoid the error";
-            presentTaskConfig.clear();
-            // If the task is user-triggered, we throw an exception to notify 
the user
-            // This is to ensure that the user is aware of the task generation 
limit
-            throw new RuntimeException(message);
+        if (_distributedTaskLockManager == null || 
acquiredTaskLocks.containsKey(tableName)) {
+          taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
+          int maxNumberOfSubTasks = 
taskGenerator.getMaxAllowedSubTasksPerTask();
+          if (presentTaskConfig.size() > maxNumberOfSubTasks) {
+            String message = String.format(
+                "Number of tasks generated for task type: %s for table: %s is 
%d, which is greater than the "
+                    + "maximum number of tasks to schedule: %d. This is "
+                    + "controlled by the cluster config %s which is set based 
on controller's performance.", taskType,
+                tableName, presentTaskConfig.size(), maxNumberOfSubTasks, 
MinionConstants.MAX_ALLOWED_SUB_TASKS_KEY);
+            if (TaskSchedulingContext.isUserTriggeredTask(triggeredBy)) {
+              message += "Optimise the task config or reduce tableMaxNumTasks 
to avoid the error";
+              presentTaskConfig.clear();
+              // If the task is user-triggered, we throw an exception to 
notify the user
+              // This is to ensure that the user is aware of the task 
generation limit
+              throw new RuntimeException(message);
+            }
+            // For scheduled tasks, we log a warning and limit the number of 
tasks
+            LOGGER.warn(message + "Only the first {} tasks will be scheduled", 
maxNumberOfSubTasks);
+            presentTaskConfig = new ArrayList<>(presentTaskConfig.subList(0, 
maxNumberOfSubTasks));
+            // Provide user visibility to the maximum number of subtasks that 
were used for the task
+            presentTaskConfig.forEach(pinotTaskConfig -> 
pinotTaskConfig.getConfigs()
+                .put(MinionConstants.TABLE_MAX_NUM_TASKS_KEY, 
String.valueOf(maxNumberOfSubTasks)));
           }
-          // For scheduled tasks, we log a warning and limit the number of 
tasks
-          LOGGER.warn(message + "Only the first {} tasks will be scheduled", 
maxNumberOfSubTasks);
-          presentTaskConfig = new ArrayList<>(presentTaskConfig.subList(0, 
maxNumberOfSubTasks));
-          // Provide user visibility to the maximum number of subtasks that 
were used for the task
-          presentTaskConfig.forEach(pinotTaskConfig -> 
pinotTaskConfig.getConfigs()
-              .put(MinionConstants.TABLE_MAX_NUM_TASKS_KEY, 
String.valueOf(maxNumberOfSubTasks)));
+          minionInstanceTagToTaskConfigs.put(minionInstanceTag, 
presentTaskConfig);
+          long successRunTimestamp = System.currentTimeMillis();
+          _taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
+              taskGeneratorMostRecentRunInfo -> 
taskGeneratorMostRecentRunInfo.addSuccessRunTs(successRunTimestamp));
+          // before the first task schedule, the follow two gauge metrics will 
be empty
+          // TODO: find a better way to report task generation information
+          _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
+              
ControllerGauge.TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION,
+              () -> System.currentTimeMillis() - successRunTimestamp);
+          _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
+              ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 
0L);
+        } else {
+          String message = String.format("Could not acquire table level 
distributed lock for scheduled task type: "
+              + "%s, table: %s. Another controller is likely generating tasks 
for this table. Please try again later.",
+              taskType, tableName);
+          LOGGER.warn(message);
+          response.addGenerationError(message);
         }
-        minionInstanceTagToTaskConfigs.put(minionInstanceTag, 
presentTaskConfig);
-        long successRunTimestamp = System.currentTimeMillis();
-        _taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
-            taskGeneratorMostRecentRunInfo -> 
taskGeneratorMostRecentRunInfo.addSuccessRunTs(successRunTimestamp));
-        // before the first task schedule, the follow two gauge metrics will 
be empty
-        // TODO: find a better way to report task generation information
-        _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
-            
ControllerGauge.TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION,
-            () -> System.currentTimeMillis() - successRunTimestamp);
-        _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
-            ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 0L);
       } catch (Exception e) {
         StringWriter errors = new StringWriter();
         try (PrintWriter pw = new PrintWriter(errors)) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
index 2ffa11676fa..76e1114a85b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
@@ -53,4 +53,10 @@ public class TaskSchedulingInfo {
   public void addSchedulingError(String schedulingError) {
     _schedulingErrors.add(schedulingError);
   }
+
+  @Override
+  public String toString() {
+    return "TaskSchedulingInfo{scheduledTaskNames='" + _scheduledTaskNames + 
"', generationErrors='" + _generationErrors
+        + "', schedulingErrors='" + _schedulingErrors + "'}";
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManagerTest.java
new file mode 100644
index 00000000000..642694c2a14
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManagerTest.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ControllerTimer;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.contains;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class DistributedTaskLockManagerTest {
+
+  @Test
+  public void testEphemeralLockAcquisitionAndRelease() {
+    // Mock the property store and data accessor
+    ZkHelixPropertyStore<ZNRecord> mockPropertyStore = 
mock(ZkHelixPropertyStore.class);
+
+    // Configure mocks for ephemeral node creation
+    when(mockPropertyStore.exists(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
+    when(mockPropertyStore.create(anyString(), any(ZNRecord.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+    when(mockPropertyStore.remove(anyString(), 
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+
+    DistributedTaskLockManager lockManager = new 
DistributedTaskLockManager(mockPropertyStore, "controller1");
+
+    // Test lock acquisition
+    DistributedTaskLockManager.TaskLock lock = 
lockManager.acquireLock("testTable");
+    Assert.assertNotNull(lock, "Should successfully acquire lock");
+    assertEquals(lock.getOwner(), "controller1");
+    Assert.assertNotNull(lock.getLockZNodePath(), "Lock ZNode path should not 
be null");
+    assertEquals(lock.getLockZNodePath(), 
"/MINION_TASK_METADATA/testTable-Lock");
+    assertTrue(lock.getAge() >= 0, "Lock should have valid age");
+
+    // Test lock release
+    when(mockPropertyStore.exists(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+    boolean released = lockManager.releaseLock(lock);
+    Assert.assertTrue(released, "Should successfully release lock");
+
+    // Verify ephemeral node interactions
+    verify(mockPropertyStore, times(1)).create(eq(lock.getLockZNodePath()), 
any(ZNRecord.class),
+        eq(AccessOption.EPHEMERAL));
+    verify(mockPropertyStore, times(1)).remove(eq(lock.getLockZNodePath()),
+        eq(AccessOption.EPHEMERAL));
+  }
+
+  @Test
+  public void testConcurrentEphemeralLockAcquisition() {
+    // Mock the property store and data accessor
+    ZkHelixPropertyStore<ZNRecord> mockPropertyStore = 
mock(ZkHelixPropertyStore.class);
+
+    // Configure mocks to simulate another controller already has the lock
+    when(mockPropertyStore.exists(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
+    when(mockPropertyStore.create(anyString(), any(ZNRecord.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(false);
+    when(mockPropertyStore.remove(anyString(), 
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+
+    DistributedTaskLockManager lockManager = new 
DistributedTaskLockManager(mockPropertyStore, "controller1");
+
+    // Test lock acquisition should fail because create returned false
+    DistributedTaskLockManager.TaskLock lock = 
lockManager.acquireLock("testTable");
+    Assert.assertNull(lock, "Should fail to acquire lock when create returns 
false");
+
+    // Verify that create was called. No need to clean-up the lock so remove 
should not have been called
+    verify(mockPropertyStore, times(1)).create(anyString(), 
any(ZNRecord.class),
+        eq(AccessOption.EPHEMERAL));
+    verify(mockPropertyStore, times(0)).remove(anyString(), 
eq(AccessOption.EPHEMERAL));
+  }
+
+  @Test
+  public void testTaskGenerationInProgressDetection() {
+    // Mock the property store and data accessor
+    ZkHelixPropertyStore<ZNRecord> mockPropertyStore = 
mock(ZkHelixPropertyStore.class);
+
+    String controllerId = "controller1";
+
+    // Construct a record to return when propertyStore.get() is called on the 
lock ZNode
+    ZNRecord record = new ZNRecord(controllerId);
+    record.setSimpleField("lockCreationTimeMs", 
String.valueOf(System.currentTimeMillis()));
+    record.setSimpleField("lockOwner", controllerId);
+
+    // Simulate active ephemeral nodes indicating task generation in progress
+    when(mockPropertyStore.exists(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
+    when(mockPropertyStore.get(contains("testTable"), any(Stat.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(record);
+
+    DistributedTaskLockManager lockManager = new 
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+    // Test that we can detect task generation in progress
+    boolean inProgress = lockManager.isTaskGenerationInProgress("testTable");
+    Assert.assertTrue(inProgress, "Should detect task generation in progress 
when ephemeral nodes exist");
+
+    // Test that we detect that the task generation is not in progress when we 
don't have any task lock
+    when(mockPropertyStore.get(contains("testTable"), any(Stat.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(null);
+    inProgress = lockManager.isTaskGenerationInProgress("testTable");
+    Assert.assertFalse(inProgress, "Should detect task generation in not 
progress when ephemeral nodes don't exist");
+  }
+
+  @Test
+  public void testMinionTaskGenerationLockHeldMetric() {
+    // Mock the property store
+    ZkHelixPropertyStore<ZNRecord> mockPropertyStore = 
mock(ZkHelixPropertyStore.class);
+
+    // Mock ControllerMetrics to verify metric calls
+    ControllerMetrics mockControllerMetrics = mock(ControllerMetrics.class);
+
+    String controllerId = "controller1";
+    String tableNameWithType = "testTable";
+
+    // Test case 1: Lock is held (record exists with creation time)
+    long lockCreationTime = System.currentTimeMillis() - 5000; // 5 seconds ago
+    ZNRecord lockRecord = new ZNRecord(controllerId);
+    lockRecord.setSimpleField("lockCreationTimeMs", 
String.valueOf(lockCreationTime));
+    lockRecord.setSimpleField("lockOwner", controllerId);
+
+    when(mockPropertyStore.get(contains(tableNameWithType), any(Stat.class), 
eq(AccessOption.EPHEMERAL)))
+        .thenReturn(lockRecord);
+
+    // Mock ControllerMetrics.get() to return our mock
+    try (var mockedStatic = mockStatic(ControllerMetrics.class)) {
+      
mockedStatic.when(ControllerMetrics::get).thenReturn(mockControllerMetrics);
+
+      DistributedTaskLockManager lockManager = new 
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+      // Call the method that should update the metric
+      boolean inProgress = 
lockManager.isTaskGenerationInProgress(tableNameWithType);
+      Assert.assertTrue(inProgress, "Should detect task generation in progress 
when lock record exists");
+
+      // Verify that addTimedValue was called with a non-zero duration 
(approximately 5000ms)
+      verify(mockControllerMetrics, times(1)).addTimedValue(
+          eq(tableNameWithType),
+          eq(ControllerTimer.MINION_TASK_GENERATION_LOCK_HELD_ELAPSED_TIME_MS),
+          anyLong(), // We can't predict the exact value due to timing, but it 
should be > 0
+          eq(TimeUnit.MILLISECONDS)
+      );
+    }
+
+    // Test case 2: No lock held (record is null)
+    when(mockPropertyStore.get(contains(tableNameWithType), any(Stat.class), 
eq(AccessOption.EPHEMERAL)))
+        .thenReturn(null);
+
+    try (var mockedStatic = mockStatic(ControllerMetrics.class)) {
+      
mockedStatic.when(ControllerMetrics::get).thenReturn(mockControllerMetrics);
+
+      DistributedTaskLockManager lockManager = new 
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+      // Call the method that should update the metric
+      boolean inProgress = 
lockManager.isTaskGenerationInProgress(tableNameWithType);
+      Assert.assertFalse(inProgress, "Should detect task generation not in 
progress when no lock record exists");
+
+
+      // Verify that addTimedValue was called with 0 duration (no lock held)
+      verify(mockControllerMetrics, times(1)).addTimedValue(
+          eq(tableNameWithType),
+          eq(ControllerTimer.MINION_TASK_GENERATION_LOCK_HELD_ELAPSED_TIME_MS),
+          eq(0L), // Should be 0 when no lock is held
+          eq(TimeUnit.MILLISECONDS)
+      );
+    }
+  }
+
+  @Test
+  public void testLockReleaseRetriesOnFailure() throws InterruptedException {
+    // Mock the property store
+    ZkHelixPropertyStore<ZNRecord> mockPropertyStore = 
mock(ZkHelixPropertyStore.class);
+    String controllerId = "controller1";
+    String tableNameWithType = "testTable";
+
+    // Configure mocks for lock acquisition
+    when(mockPropertyStore.exists(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
+    when(mockPropertyStore.create(anyString(), any(ZNRecord.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+    // Mock isTaskGenerationInProgress to return null (no task in progress)
+    when(mockPropertyStore.get(anyString(), any(Stat.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(null);
+
+    DistributedTaskLockManager lockManager = new 
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+    // Acquire a lock first
+    DistributedTaskLockManager.TaskLock lock = 
lockManager.acquireLock(tableNameWithType);
+    Assert.assertNotNull(lock, "Should successfully acquire lock");
+
+    // Configure mock for lock release - simulate failure on first 2 attempts, 
success on 3rd attempt
+    when(mockPropertyStore.exists(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL)))
+        .thenReturn(true); // Lock exists for all attempts
+
+    when(mockPropertyStore.remove(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL)))
+        .thenReturn(false) // First attempt fails
+        .thenReturn(false) // Second attempt fails
+        .thenReturn(true); // Third attempt succeeds
+
+    // Record start time to verify retry delays
+    long startTime = System.currentTimeMillis();
+
+    // Test lock release with retries
+    boolean released = lockManager.releaseLock(lock);
+
+    long elapsedTime = System.currentTimeMillis() - startTime;
+
+    // Verify that the lock was eventually released
+    Assert.assertTrue(released, "Should successfully release lock after 
retries");
+
+    // Verify that remove was called 3 times (2 failures + 1 success)
+    verify(mockPropertyStore, times(3)).remove(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL));
+
+    // Verify that the total time includes retry delays
+    // Expected delays: 100ms + 200ms = 300ms minimum (plus some execution 
overhead)
+    Assert.assertTrue(elapsedTime >= 250,
+        "Should have waited for retry delays. Elapsed time: " + elapsedTime + 
"ms");
+
+    // Should not take too long (max 3 seconds for safety, accounting for test 
environment variability)
+    Assert.assertTrue(elapsedTime < 3000,
+        "Should not take too long. Elapsed time: " + elapsedTime + "ms");
+  }
+
+  @Test
+  public void testLockReleaseFailsAfterMaxRetries() {
+    // Mock the property store
+    ZkHelixPropertyStore<ZNRecord> mockPropertyStore = 
mock(ZkHelixPropertyStore.class);
+    String controllerId = "controller1";
+    String tableNameWithType = "testTable";
+
+    // Configure mocks for lock acquisition
+    when(mockPropertyStore.exists(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
+    when(mockPropertyStore.create(anyString(), any(ZNRecord.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+    // Mock isTaskGenerationInProgress to return null (no task in progress)
+    when(mockPropertyStore.get(anyString(), any(Stat.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(null);
+
+    DistributedTaskLockManager lockManager = new 
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+    // Acquire a lock first
+    DistributedTaskLockManager.TaskLock lock = 
lockManager.acquireLock(tableNameWithType);
+    Assert.assertNotNull(lock, "Should successfully acquire lock");
+
+    // Configure mock for lock release - simulate failure on all attempts
+    when(mockPropertyStore.exists(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL)))
+        .thenReturn(true); // Lock exists for all attempts
+
+    when(mockPropertyStore.remove(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL)))
+        .thenReturn(false); // All attempts fail
+
+    // Test lock release with retries - should fail after max retries
+    boolean released = lockManager.releaseLock(lock);
+
+    // Verify that the lock release failed
+    Assert.assertFalse(released, "Should fail to release lock after max 
retries");
+
+    // Verify that remove was called exactly MAX_RETRIES times (3 times)
+    verify(mockPropertyStore, times(3)).remove(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL));
+  }
+
+  @Test
+  public void testLockReleaseSucceedsWhenLockDisappearsBeforeRetry() {
+    // Mock the property store
+    ZkHelixPropertyStore<ZNRecord> mockPropertyStore = 
mock(ZkHelixPropertyStore.class);
+    String controllerId = "controller1";
+    String tableNameWithType = "testTable";
+
+    // Configure mocks for lock acquisition
+    when(mockPropertyStore.exists(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
+    when(mockPropertyStore.create(anyString(), any(ZNRecord.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+    // Mock isTaskGenerationInProgress to return null (no task in progress)
+    when(mockPropertyStore.get(anyString(), any(Stat.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(null);
+
+    DistributedTaskLockManager lockManager = new 
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+    // Acquire a lock first
+    DistributedTaskLockManager.TaskLock lock = 
lockManager.acquireLock(tableNameWithType);
+    Assert.assertNotNull(lock, "Should successfully acquire lock");
+
+    // Configure mock for lock release - simulate:
+    // 1. First exists() call in releaseLock(): lock exists, so it calls 
removeWithRetries()
+    // 2. Second exists() call in removeWithRetries(): lock no longer exists 
(disappeared between attempts)
+    when(mockPropertyStore.exists(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL)))
+        .thenReturn(true)  // First check in releaseLock(): lock exists
+        .thenReturn(false); // Second check in removeWithRetries(): lock 
disappeared
+
+    // Since the lock disappears, remove() should never be called
+
+    // Test lock release - should succeed when lock disappears
+    boolean released = lockManager.releaseLock(lock);
+
+    // Verify that the lock release succeeded
+    Assert.assertTrue(released, "Should succeed when lock disappears between 
retry attempts");
+
+    // Verify that remove was never called because lock disappeared before 
retry
+    verify(mockPropertyStore, times(0)).remove(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL));
+
+    // Verify that exists was called twice (releaseLock() + 
removeWithRetries() first attempt)
+    verify(mockPropertyStore, times(2)).exists(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL));
+  }
+
+  @Test
+  public void testLockReleaseWithMultipleExistsChecks() {
+    // Mock the property store
+    ZkHelixPropertyStore<ZNRecord> mockPropertyStore = 
mock(ZkHelixPropertyStore.class);
+    String controllerId = "controller1";
+    String tableNameWithType = "testTable";
+
+    // Configure mocks for lock acquisition
+    when(mockPropertyStore.exists(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
+    when(mockPropertyStore.create(anyString(), any(ZNRecord.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+    // Mock isTaskGenerationInProgress to return null (no task in progress)
+    when(mockPropertyStore.get(anyString(), any(Stat.class), 
eq(AccessOption.EPHEMERAL))).thenReturn(null);
+
+    DistributedTaskLockManager lockManager = new 
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+    // Acquire a lock first
+    DistributedTaskLockManager.TaskLock lock = 
lockManager.acquireLock(tableNameWithType);
+    Assert.assertNotNull(lock, "Should successfully acquire lock");
+
+    // Configure mock for lock release - simulate:
+    // 1. First exists() call in releaseLock(): lock exists
+    // 2. Second exists() call in removeWithRetries() attempt 1: lock exists, 
remove fails
+    // 3. Third exists() call in removeWithRetries() attempt 2: lock exists, 
remove succeeds
+    when(mockPropertyStore.exists(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL)))
+        .thenReturn(true)  // First check in releaseLock(): lock exists
+        .thenReturn(true)  // Second check in removeWithRetries() attempt 1: 
lock exists
+        .thenReturn(true); // Third check in removeWithRetries() attempt 2: 
lock exists
+
+    when(mockPropertyStore.remove(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL)))
+        .thenReturn(false) // First remove attempt fails
+        .thenReturn(true); // Second remove attempt succeeds
+
+    // Test lock release - should succeed after retry
+    boolean released = lockManager.releaseLock(lock);
+
+    // Verify that the lock release succeeded
+    Assert.assertTrue(released, "Should succeed after retry");
+
+    // Verify that remove was called twice (first fails, second succeeds)
+    verify(mockPropertyStore, times(2)).remove(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL));
+
+    // Verify that exists was called 3 times (releaseLock() + 
removeWithRetries() attempt 1 + attempt 2)
+    verify(mockPropertyStore, times(3)).exists(eq(lock.getLockZNodePath()), 
eq(AccessOption.EPHEMERAL));
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerDistributedLockingTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerDistributedLockingTest.java
new file mode 100644
index 00000000000..a8e8a144fac
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerDistributedLockingTest.java
@@ -0,0 +1,1118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.controller.BaseControllerStarter;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Test class for PinotTaskManager with distributed locking enabled. Tests 
scenarios run across multiple controllers
+ */
+public class PinotTaskManagerDistributedLockingTest extends ControllerTest {
+
+  private static final String RAW_TABLE_NAME_1 = "testTable1";
+  private static final String RAW_TABLE_NAME_2 = "testTable2";
+  private static final String RAW_TABLE_NAME_3 = "testTable3";
+
+  private static final String TEST_TASK_TYPE = "TestDistributedLockTaskType";
+
+  @BeforeMethod
+  public void setUpMethod() throws Exception {
+    startZk();
+  }
+
+  @AfterMethod
+  public void tearDownMethod() {
+    try {
+      if (_controllerStarter != null) {
+        // Clean up any running tasks before stopping controller
+        try {
+          Map<String, TaskState> taskStates = 
_controllerStarter.getHelixTaskResourceManager()
+              .getTaskStates(TEST_TASK_TYPE);
+          for (String taskName : taskStates.keySet()) {
+            try {
+              
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+            } catch (Exception e) {
+              // Ignore individual task deletion errors
+            }
+          }
+          Thread.sleep(500); // Give time for task cancellation
+        } catch (Exception e) {
+          // Ignore cleanup errors
+        }
+        stopController();
+      }
+    } catch (Exception e) {
+      // Ignore
+    }
+    try {
+      stopFakeInstances();
+    } catch (Exception e) {
+      // Ignore
+    }
+    stopZk();
+  }
+
+  /**
+   * Test scenario: Tests the schedule task happy path on a single controller 
to ensure task generation goes through
+   * for all tables
+   */
+  @Test
+  public void testScheduleTaskForSpecificTables() throws Exception {
+    // Setup first controller with distributed locking enabled
+    Map<String, Object> properties1 = getDefaultControllerConfiguration();
+    
properties1.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 true);
+    
properties1.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
 true);
+    properties1.put(ControllerConf.CONTROLLER_PORT, 21002);
+
+    // Setup second controller with distributed locking enabled (different 
port)
+    Map<String, Object> properties2 = getDefaultControllerConfiguration();
+    // Disable scheduler to avoid Quartz conflicts
+    
properties2.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 false);
+    
properties2.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
 true);
+    properties2.put(ControllerConf.CONTROLLER_PORT, 21003);
+
+    // Start both controllers
+    startController(properties1);
+    BaseControllerStarter controller1 = _controllerStarter;
+    PinotTaskManager taskManager1 = controller1.getTaskManager();
+
+    // Start second controller instance
+    BaseControllerStarter controller2 = 
startControllerOnDifferentPort(properties2);
+    PinotTaskManager taskManager2 = controller2.getTaskManager();
+
+    try {
+      // Setup test environment (using first controller)
+      addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+      // Create and register task generators on both controllers
+      ControllableTaskGenerator generator1 = new 
ControllableTaskGenerator(TEST_TASK_TYPE);
+      generator1.setGenerationDelay(3000); // 3 second delay for createTask
+      ClusterInfoAccessor clusterInfoAccessor1 = 
Mockito.mock(ClusterInfoAccessor.class);
+      generator1.init(clusterInfoAccessor1);
+      taskManager1.registerTaskGenerator(generator1);
+
+      ControllableTaskGenerator generator2 = new 
ControllableTaskGenerator(TEST_TASK_TYPE);
+      generator2.setGenerationDelay(500); // Shorter delay for scheduleTasks
+      ClusterInfoAccessor clusterInfoAccessor2 = 
Mockito.mock(ClusterInfoAccessor.class);
+      generator2.init(clusterInfoAccessor2);
+      taskManager2.registerTaskGenerator(generator2);
+
+      // Ensure task queues exist on both controllers
+      
controller1.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+      
controller2.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+      // Create schemas and all three tables ONLY on controller1 to avoid 
Quartz job conflicts
+      // The tables will be visible to both controllers via ZooKeeper
+      Schema schema1 = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema1);
+      createSingleTestTable(RAW_TABLE_NAME_1);
+
+      Schema schema2 = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_2)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema2);
+      createSingleTestTable(RAW_TABLE_NAME_2);
+
+      Schema schema3 = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_3)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema3);
+      createSingleTestTable(RAW_TABLE_NAME_3);
+
+      // Wait for controllers to be fully initialized
+      Thread.sleep(2000);
+
+      ExecutorService executor = Executors.newFixedThreadPool(1);
+      CountDownLatch startLatch = new CountDownLatch(1);
+      CountDownLatch completionLatch = new CountDownLatch(1);
+
+      AtomicInteger scheduleTasksCompleted = new AtomicInteger(0);
+      Map<String, TaskSchedulingInfo> scheduleTasksResult = new HashMap<>();
+
+      // Controller 2: Run scheduleTasks for all tables
+      executor.submit(() -> {
+        try {
+          startLatch.await();
+          Thread.sleep(500); // Start after createTask begins
+
+          TaskSchedulingContext context = new TaskSchedulingContext();
+          Set<String> tablesToSchedule = new HashSet<>();
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1));
 // This should be blocked
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_2));
 // This should succeed
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_3));
 // This should succeed
+          context.setTablesToSchedule(tablesToSchedule);
+          context.setLeader(true);
+
+          Map<String, TaskSchedulingInfo> result = 
taskManager2.scheduleTasks(context);
+          scheduleTasksResult.putAll(result);
+          scheduleTasksCompleted.incrementAndGet();
+        } catch (Exception ignored) {
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+
+      // Start both operations
+      startLatch.countDown();
+
+      // Wait for completion
+      assertTrue(completionLatch.await(45, TimeUnit.SECONDS), "Tasks should 
complete within 45 seconds");
+
+      executor.shutdownNow();
+      assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor 
should terminate");
+
+      // scheduleTasks currently fails due to RuntimeException when generation 
errors occur
+      // This is the current behavior - it throws exception instead of 
returning partial results
+      assertEquals(scheduleTasksCompleted.get(), 1, "scheduleTasks should 
complete");
+
+      // Verify that both controllers generated tasks
+      int totalGenerations = generator1.getTaskGenerationCount() + 
generator2.getTaskGenerationCount();
+      assertEquals(totalGenerations, 3, "scheduleTasks should've succeeded on 
one controller for three tables");
+
+      assertNotNull(scheduleTasksResult);
+      assertEquals(scheduleTasksResult.size(), 1);
+
+      TaskSchedulingInfo taskSchedulingInfo = 
scheduleTasksResult.get(TEST_TASK_TYPE);
+      assertNotNull(taskSchedulingInfo);
+      assertNotNull(taskSchedulingInfo.getScheduledTaskNames());
+      assertNotNull(taskSchedulingInfo.getSchedulingErrors());
+      assertNotNull(taskSchedulingInfo.getGenerationErrors());
+      // Should see one task scheduled for both tables
+      assertEquals(taskSchedulingInfo.getScheduledTaskNames().size(), 1);
+      // Should see 0 errors
+      assertEquals(taskSchedulingInfo.getGenerationErrors().size(), 0);
+      assertEquals(taskSchedulingInfo.getSchedulingErrors().size(), 0);
+    } finally {
+      // Cleanup
+      try {
+        // Cancel all running tasks before dropping tables
+        Map<String, TaskState> taskStates = 
_controllerStarter.getHelixTaskResourceManager()
+            .getTaskStates(TEST_TASK_TYPE);
+        for (String taskName : taskStates.keySet()) {
+          try {
+            
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+          } catch (Exception e) {
+            // Ignore individual task deletion errors
+          }
+        }
+        Thread.sleep(1000); // Give time for task cancellation
+
+        dropOfflineTable(RAW_TABLE_NAME_1);
+        dropOfflineTable(RAW_TABLE_NAME_2);
+        dropOfflineTable(RAW_TABLE_NAME_3);
+      } catch (Exception ignored) {
+      }
+
+      // Stop second controller first
+      controller2.stop();
+    }
+  }
+
+  /**
+   * Test scenario: Two actual controllers trying to submit createTask for the 
same table simultaneously.
+   * This test starts multiple controller instances to test true 
multi-controller distributed locking.
+   */
+  @Test
+  public void testConcurrentCreateTaskFromMultipleControllers() throws 
Exception {
+    // Setup first controller with distributed locking enabled
+    Map<String, Object> properties1 = getDefaultControllerConfiguration();
+    
properties1.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 true);
+    
properties1.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
 true);
+    properties1.put(ControllerConf.CONTROLLER_PORT, 21000);
+
+    // Setup second controller with distributed locking enabled (different 
port)
+    Map<String, Object> properties2 = getDefaultControllerConfiguration();
+    
properties2.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 true);
+    
properties2.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
 true);
+    properties2.put(ControllerConf.CONTROLLER_PORT, 21001);
+
+    // Start both controllers
+    startController(properties1);
+    BaseControllerStarter controller1 = _controllerStarter;
+    PinotTaskManager taskManager1 = controller1.getTaskManager();
+
+    // Start second controller instance
+    BaseControllerStarter controller2 = 
startControllerOnDifferentPort(properties2);
+    PinotTaskManager taskManager2 = controller2.getTaskManager();
+
+    try {
+      // Setup test environment (using first controller)
+      addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+      // Create and register task generators on both controllers FIRST
+      ControllableTaskGenerator generator1 = new 
ControllableTaskGenerator(TEST_TASK_TYPE);
+      generator1.setGenerationDelay(2000); // 2 second delay
+      ClusterInfoAccessor clusterInfoAccessor1 = 
Mockito.mock(ClusterInfoAccessor.class);
+      generator1.init(clusterInfoAccessor1);
+      taskManager1.registerTaskGenerator(generator1);
+
+      ControllableTaskGenerator generator2 = new 
ControllableTaskGenerator(TEST_TASK_TYPE);
+      generator2.setGenerationDelay(2000); // 2 second delay
+      ClusterInfoAccessor clusterInfoAccessor2 = 
Mockito.mock(ClusterInfoAccessor.class);
+      generator2.init(clusterInfoAccessor2);
+      taskManager2.registerTaskGenerator(generator2);
+
+      // Ensure task queues exist on both controllers
+      
controller1.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+      
controller2.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+      // Create schema and table AFTER registering task generators
+      Schema schema = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema);
+      createSingleTestTable(RAW_TABLE_NAME_1);
+
+      // Wait a bit for controllers to be fully initialized
+      Thread.sleep(2000);
+
+      // Now test concurrent createTask from both controllers
+      ExecutorService executor = Executors.newFixedThreadPool(2);
+      CountDownLatch startLatch = new CountDownLatch(1);
+      CountDownLatch completionLatch = new CountDownLatch(2);
+
+      AtomicInteger successfulCreations = new AtomicInteger(0);
+      AtomicInteger failedCreations = new AtomicInteger(0);
+
+      // Controller 1 attempts to create task
+      executor.submit(() -> {
+        try {
+          startLatch.await();
+          Map<String, String> result = taskManager1.createTask(TEST_TASK_TYPE, 
RAW_TABLE_NAME_1, null, new HashMap<>());
+          if (result != null && !result.isEmpty()) {
+            successfulCreations.incrementAndGet();
+          }
+        } catch (Exception e) {
+          failedCreations.incrementAndGet();
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+
+      // Controller 2 attempts to create task for the same table
+      executor.submit(() -> {
+        try {
+          startLatch.await();
+          Thread.sleep(100); // Small delay to ensure some overlap
+          Map<String, String> result = taskManager2.createTask(TEST_TASK_TYPE, 
RAW_TABLE_NAME_1, null, new HashMap<>());
+          if (result != null && !result.isEmpty()) {
+            successfulCreations.incrementAndGet();
+          }
+        } catch (Exception e) {
+          failedCreations.incrementAndGet();
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+
+      // Start both controllers simultaneously
+      startLatch.countDown();
+
+      // Wait for completion
+      assertTrue(completionLatch.await(30, TimeUnit.SECONDS), "Tasks should 
complete within 30 seconds");
+
+      executor.shutdownNow();
+      assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor 
should terminate");
+
+      // With distributed locking between actual controllers, exactly one 
should succeed
+      assertEquals(successfulCreations.get(), 1, "Exactly one createTask 
should succeed due to distributed locking");
+      assertEquals(failedCreations.get(), 1, "Exactly one createTask should 
fail due to distributed locking");
+
+      int totalGenerations = generator1.getTaskGenerationCount() + 
generator2.getTaskGenerationCount();
+      assertEquals(totalGenerations, 1, "At least one task generation should 
have occurred");
+    } finally {
+      // Cleanup
+      try {
+        // Cancel all running tasks before dropping tables
+        Map<String, TaskState> taskStates = 
_controllerStarter.getHelixTaskResourceManager()
+            .getTaskStates(TEST_TASK_TYPE);
+        for (String taskName : taskStates.keySet()) {
+          try {
+            
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+          } catch (Exception e) {
+            // Ignore individual task deletion errors
+          }
+        }
+        Thread.sleep(1000); // Give time for task cancellation
+
+        dropOfflineTable(RAW_TABLE_NAME_1);
+      } catch (Exception ignored) {
+      }
+
+      // Stop second controller first
+      controller2.stop();
+    }
+  }
+
+  /**
+   * Test scenario: One controller runs createTask for a specific table while 
another controller
+   * runs scheduleTasks for multiple tables including the locked table.
+   * The scheduleTasks should succeed for other tables but fail for the locked 
table.
+   */
+  @Test
+  public void testCreateTaskBlocksScheduleTaskForSpecificTable() throws 
Exception {
+    // Setup first controller with distributed locking enabled
+    Map<String, Object> properties1 = getDefaultControllerConfiguration();
+    
properties1.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 true);
+    
properties1.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
 true);
+    properties1.put(ControllerConf.CONTROLLER_PORT, 21002);
+
+    // Setup second controller with distributed locking enabled (different 
port)
+    Map<String, Object> properties2 = getDefaultControllerConfiguration();
+    // Disable scheduler to avoid Quartz conflicts
+    
properties2.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 false);
+    
properties2.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
 true);
+    properties2.put(ControllerConf.CONTROLLER_PORT, 21003);
+
+    // Start both controllers
+    startController(properties1);
+    BaseControllerStarter controller1 = _controllerStarter;
+    PinotTaskManager taskManager1 = controller1.getTaskManager();
+
+    // Start second controller instance
+    BaseControllerStarter controller2 = 
startControllerOnDifferentPort(properties2);
+    PinotTaskManager taskManager2 = controller2.getTaskManager();
+
+    try {
+      // Setup test environment (using first controller)
+      addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+      // Create and register task generators on both controllers
+      ControllableTaskGenerator generator1 = new 
ControllableTaskGenerator(TEST_TASK_TYPE);
+      generator1.setGenerationDelay(3000); // 3 second delay for createTask
+      ClusterInfoAccessor clusterInfoAccessor1 = 
Mockito.mock(ClusterInfoAccessor.class);
+      generator1.init(clusterInfoAccessor1);
+      taskManager1.registerTaskGenerator(generator1);
+
+      ControllableTaskGenerator generator2 = new 
ControllableTaskGenerator(TEST_TASK_TYPE);
+      generator2.setGenerationDelay(500); // Shorter delay for scheduleTasks
+      ClusterInfoAccessor clusterInfoAccessor2 = 
Mockito.mock(ClusterInfoAccessor.class);
+      generator2.init(clusterInfoAccessor2);
+      taskManager2.registerTaskGenerator(generator2);
+
+      // Ensure task queues exist on both controllers
+      
controller1.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+      
controller2.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+      // Create schemas and all three tables ONLY on controller1 to avoid 
Quartz job conflicts
+      // The tables will be visible to both controllers via ZooKeeper
+      Schema schema1 = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema1);
+      createSingleTestTable(RAW_TABLE_NAME_1);
+
+      Schema schema2 = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_2)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema2);
+      createSingleTestTable(RAW_TABLE_NAME_2);
+
+      Schema schema3 = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_3)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema3);
+      createSingleTestTable(RAW_TABLE_NAME_3);
+
+      // Wait for controllers to be fully initialized
+      Thread.sleep(2000);
+
+      ExecutorService executor = Executors.newFixedThreadPool(2);
+      CountDownLatch startLatch = new CountDownLatch(1);
+      CountDownLatch completionLatch = new CountDownLatch(2);
+
+      AtomicInteger createTaskSuccessful = new AtomicInteger(0);
+      AtomicInteger scheduleTasksCompleted = new AtomicInteger(0);
+      Map<String, TaskSchedulingInfo> scheduleTasksResult = new HashMap<>();
+
+      // Controller 1: Run createTask for table1 (this will hold the lock for 
table1)
+      executor.submit(() -> {
+        try {
+          startLatch.await();
+          Map<String, String> result = taskManager1.createTask(TEST_TASK_TYPE, 
RAW_TABLE_NAME_1, null, new HashMap<>());
+          if (result != null && !result.isEmpty()) {
+            createTaskSuccessful.incrementAndGet();
+          }
+        } catch (Exception ignored) {
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+
+      // Controller 2: Run scheduleTasks for all tables (should be blocked for 
table1 but succeed for others)
+      executor.submit(() -> {
+        try {
+          startLatch.await();
+          Thread.sleep(500); // Start after createTask begins
+
+          TaskSchedulingContext context = new TaskSchedulingContext();
+          Set<String> tablesToSchedule = new HashSet<>();
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1));
 // This should be blocked
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_2));
 // This should succeed
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_3));
 // This should succeed
+          context.setTablesToSchedule(tablesToSchedule);
+          context.setLeader(true);
+
+          Map<String, TaskSchedulingInfo> result = 
taskManager2.scheduleTasks(context);
+          scheduleTasksResult.putAll(result);
+          scheduleTasksCompleted.incrementAndGet();
+        } catch (Exception ignored) {
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+
+      // Start both operations
+      startLatch.countDown();
+
+      // Wait for completion
+      assertTrue(completionLatch.await(45, TimeUnit.SECONDS), "Tasks should 
complete within 45 seconds");
+
+      executor.shutdownNow();
+      assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor 
should terminate");
+
+      // Verify results
+      assertEquals(createTaskSuccessful.get(), 1, "createTask should succeed");
+
+      // scheduleTasks currently fails due to RuntimeException when generation 
errors occur
+      // This is the current behavior - it throws exception instead of 
returning partial results
+      assertEquals(scheduleTasksCompleted.get(), 1, "scheduleTasks should 
complete");
+
+      // Verify that both controllers generated tasks
+      int totalGenerations = generator1.getTaskGenerationCount() + 
generator2.getTaskGenerationCount();
+      assertTrue(totalGenerations >= 3, "Both createTask and scheduleTasks 
should've succeeded on both controllers");
+
+      assertNotNull(scheduleTasksResult);
+      assertEquals(scheduleTasksResult.size(), 1);
+
+      TaskSchedulingInfo taskSchedulingInfo = 
scheduleTasksResult.get(TEST_TASK_TYPE);
+      assertNotNull(taskSchedulingInfo);
+      assertNotNull(taskSchedulingInfo.getScheduledTaskNames());
+      assertNotNull(taskSchedulingInfo.getSchedulingErrors());
+      assertNotNull(taskSchedulingInfo.getGenerationErrors());
+      // Should see one task scheduled for both tables
+      assertEquals(taskSchedulingInfo.getScheduledTaskNames().size(), 1);
+      // Should see one generation error, indicating that testTable1's lock 
couldn't be taken
+      assertEquals(taskSchedulingInfo.getGenerationErrors().size(), 1);
+      assertEquals(taskSchedulingInfo.getGenerationErrors().get(0), "Could not 
acquire table level distributed lock "
+          + "for scheduled task type: TestDistributedLockTaskType, table: 
testTable1_OFFLINE. Another controller is "
+          + "likely generating tasks for this table. Please try again later.");
+      assertEquals(taskSchedulingInfo.getSchedulingErrors().size(), 0);
+    } finally {
+      // Cleanup
+      try {
+        // Cancel all running tasks before dropping tables
+        Map<String, TaskState> taskStates = 
_controllerStarter.getHelixTaskResourceManager()
+            .getTaskStates(TEST_TASK_TYPE);
+        for (String taskName : taskStates.keySet()) {
+          try {
+            
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+          } catch (Exception e) {
+            // Ignore individual task deletion errors
+          }
+        }
+        Thread.sleep(1000); // Give time for task cancellation
+
+        dropOfflineTable(RAW_TABLE_NAME_1);
+        dropOfflineTable(RAW_TABLE_NAME_2);
+        dropOfflineTable(RAW_TABLE_NAME_3);
+      } catch (Exception ignored) {
+      }
+
+      // Stop second controller first
+      controller2.stop();
+    }
+  }
+
+  /**
+   * Test scenario: Both controllers scheduleTasks for the same table.
+   * The scheduleTasks should succeed for one of the controllers.
+   */
+  @Test
+  public void testMultipleScheduleTaskForSpecificTableOnBothControllers() 
throws Exception {
+    // Setup first controller with distributed locking enabled
+    Map<String, Object> properties1 = getDefaultControllerConfiguration();
+    
properties1.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 true);
+    
properties1.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
 true);
+    properties1.put(ControllerConf.CONTROLLER_PORT, 21002);
+
+    // Setup second controller with distributed locking enabled (different 
port)
+    Map<String, Object> properties2 = getDefaultControllerConfiguration();
+    // Disable scheduler to avoid Quartz conflicts
+    
properties2.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 false);
+    
properties2.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
 true);
+    properties2.put(ControllerConf.CONTROLLER_PORT, 21003);
+
+    // Start both controllers
+    startController(properties1);
+    BaseControllerStarter controller1 = _controllerStarter;
+    PinotTaskManager taskManager1 = controller1.getTaskManager();
+
+    // Start second controller instance
+    BaseControllerStarter controller2 = 
startControllerOnDifferentPort(properties2);
+    PinotTaskManager taskManager2 = controller2.getTaskManager();
+
+    try {
+      // Setup test environment (using first controller)
+      addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+      // Create and register task generators on both controllers
+      ControllableTaskGenerator generator1 = new 
ControllableTaskGenerator(TEST_TASK_TYPE);
+      generator1.setGenerationDelay(3000); // 3 second delay for createTask
+      ClusterInfoAccessor clusterInfoAccessor1 = 
Mockito.mock(ClusterInfoAccessor.class);
+      generator1.init(clusterInfoAccessor1);
+      taskManager1.registerTaskGenerator(generator1);
+
+      ControllableTaskGenerator generator2 = new 
ControllableTaskGenerator(TEST_TASK_TYPE);
+      generator2.setGenerationDelay(500); // Shorter delay for scheduleTasks
+      ClusterInfoAccessor clusterInfoAccessor2 = 
Mockito.mock(ClusterInfoAccessor.class);
+      generator2.init(clusterInfoAccessor2);
+      taskManager2.registerTaskGenerator(generator2);
+
+      // Ensure task queues exist on both controllers
+      
controller1.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+      
controller2.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+      // Create schemas and all three tables ONLY on controller1 to avoid 
Quartz job conflicts
+      // The tables will be visible to both controllers via ZooKeeper
+      Schema schema1 = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema1);
+      createSingleTestTable(RAW_TABLE_NAME_1);
+
+      Schema schema2 = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_2)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema2);
+      createSingleTestTable(RAW_TABLE_NAME_2);
+
+      Schema schema3 = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_3)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema3);
+      createSingleTestTable(RAW_TABLE_NAME_3);
+
+      // Wait for controllers to be fully initialized
+      Thread.sleep(2000);
+
+      ExecutorService executor = Executors.newFixedThreadPool(2);
+      CountDownLatch startLatch = new CountDownLatch(1);
+      CountDownLatch completionLatch = new CountDownLatch(2);
+
+      AtomicInteger scheduleTasksCompleted1 = new AtomicInteger(0);
+      AtomicInteger scheduleTasksCompleted2 = new AtomicInteger(0);
+      Map<String, TaskSchedulingInfo> scheduleTasksResult1 = new HashMap<>();
+      Map<String, TaskSchedulingInfo> scheduleTasksResult2 = new HashMap<>();
+
+      AtomicInteger failedCreations = new AtomicInteger(0);
+
+      // Controller 1: Run scheduleTasks for all tables (this will hold the 
lock for all tables)
+      executor.submit(() -> {
+        try {
+          startLatch.await();
+
+          TaskSchedulingContext context = new TaskSchedulingContext();
+          Set<String> tablesToSchedule = new HashSet<>();
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1));
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_2));
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_3));
+          context.setTablesToSchedule(tablesToSchedule);
+          context.setLeader(true);
+
+          Map<String, TaskSchedulingInfo> result = 
taskManager1.scheduleTasks(context);
+          scheduleTasksResult1.putAll(result);
+          scheduleTasksCompleted1.incrementAndGet();
+        } catch (Exception ignored) {
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+
+      // Controller 2: Run scheduleTasks for all tables (should fail)
+      executor.submit(() -> {
+        try {
+          startLatch.await();
+          Thread.sleep(500); // Start after controller1 begins
+
+          TaskSchedulingContext context = new TaskSchedulingContext();
+          Set<String> tablesToSchedule = new HashSet<>();
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1));
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_2));
+          
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_3));
+          context.setTablesToSchedule(tablesToSchedule);
+          context.setLeader(true);
+
+          Map<String, TaskSchedulingInfo> result = 
taskManager2.scheduleTasks(context);
+          scheduleTasksResult2.putAll(result);
+          scheduleTasksCompleted2.incrementAndGet();
+        } catch (Exception e) {
+          failedCreations.incrementAndGet();
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+
+      // Start both operations
+      startLatch.countDown();
+
+      // Wait for completion
+      assertTrue(completionLatch.await(45, TimeUnit.SECONDS), "Tasks should 
complete within 45 seconds");
+
+      executor.shutdownNow();
+      assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor 
should terminate");
+
+      // Verify results
+      assertEquals(scheduleTasksCompleted1.get(), 1, "scheduleTasks on 
controller1 should succeed");
+
+      // scheduleTasks currently fails due to RuntimeException when generation 
errors occur
+      // This is the current behavior - it throws exception instead of 
returning partial results
+      assertEquals(scheduleTasksCompleted2.get(), 0, "scheduleTasks on 
controller2 shouldn't complete");
+
+      // Validate controller 2's scheduleTasks failed due to being unable to 
get any locks
+      assertEquals(failedCreations.get(), 1, "Controller 2's scheduleTasks 
should've failed");
+
+      // Verify that only 1 controller generated tasks
+      int totalGenerations = generator1.getTaskGenerationCount() + 
generator2.getTaskGenerationCount();
+      assertEquals(totalGenerations, 3, "Only 1 scheduleTasks succeeded for 
all three tables");
+      assertEquals(generator2.getTaskGenerationCount(), 0, "Controller 2's 
task shouldn't have passed");
+
+      assertNotNull(scheduleTasksResult1);
+      assertEquals(scheduleTasksResult1.size(), 1);
+
+      assertNotNull(scheduleTasksResult2);
+      assertEquals(scheduleTasksResult2.size(), 0);
+
+      TaskSchedulingInfo taskSchedulingInfo = 
scheduleTasksResult1.get(TEST_TASK_TYPE);
+      assertNotNull(taskSchedulingInfo);
+      assertNotNull(taskSchedulingInfo.getScheduledTaskNames());
+      assertNotNull(taskSchedulingInfo.getSchedulingErrors());
+      assertNotNull(taskSchedulingInfo.getGenerationErrors());
+      // Should see one task scheduled for both tables
+      assertEquals(taskSchedulingInfo.getScheduledTaskNames().size(), 1);
+      // Should see 0 errors
+      assertEquals(taskSchedulingInfo.getGenerationErrors().size(), 0);
+      assertEquals(taskSchedulingInfo.getSchedulingErrors().size(), 0);
+    } finally {
+      // Cleanup
+      try {
+        // Cancel all running tasks before dropping tables
+        Map<String, TaskState> taskStates = 
_controllerStarter.getHelixTaskResourceManager()
+            .getTaskStates(TEST_TASK_TYPE);
+        for (String taskName : taskStates.keySet()) {
+          try {
+            
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+          } catch (Exception e) {
+            // Ignore individual task deletion errors
+          }
+        }
+        Thread.sleep(1000); // Give time for task cancellation
+
+        dropOfflineTable(RAW_TABLE_NAME_1);
+        dropOfflineTable(RAW_TABLE_NAME_2);
+        dropOfflineTable(RAW_TABLE_NAME_3);
+      } catch (Exception ignored) {
+      }
+
+      // Stop second controller first
+      controller2.stop();
+    }
+  }
+
+  /**
+   * Helper method to start a second controller instance on a different port
+   */
+  private BaseControllerStarter startControllerOnDifferentPort(Map<String, 
Object> properties) throws Exception {
+    BaseControllerStarter controllerStarter = createControllerStarter();
+    controllerStarter.init(new 
org.apache.pinot.spi.env.PinotConfiguration(properties));
+    controllerStarter.start();
+    return controllerStarter;
+  }
+
+  /**
+   * Helper to create a single test table
+   */
+  private void createSingleTestTable(String rawTableName) throws Exception {
+    Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
+    taskTypeConfigsMap.put(TEST_TASK_TYPE, ImmutableMap.of("schedule", "0 */10 
* ? * * *"));
+
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(rawTableName)
+        .setTaskConfig(new TableTaskConfig(taskTypeConfigsMap))
+        .build();
+
+    addTableConfig(tableConfig);
+  }
+
+  /**
+   * Task generator with controllable behavior
+   */
+  private static class ControllableTaskGenerator implements PinotTaskGenerator 
{
+    private final String _taskType;
+    private int _taskGenerationCount = 0;
+    private long _generationDelay = 0;
+
+    public ControllableTaskGenerator(String taskType) {
+      _taskType = taskType;
+    }
+
+    @Override
+    public String getTaskType() {
+      return _taskType;
+    }
+
+    @Override
+    public void init(ClusterInfoAccessor clusterInfoAccessor) {
+      // No-op
+    }
+
+    public void setGenerationDelay(long delayMs) {
+      _generationDelay = delayMs;
+    }
+
+    public int getTaskGenerationCount() {
+      return _taskGenerationCount;
+    }
+
+    @Override
+    public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) 
{
+      if (_generationDelay > 0) {
+        try {
+          Thread.sleep(_generationDelay);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      _taskGenerationCount++;
+      List<PinotTaskConfig> tasks = new ArrayList<>();
+      for (TableConfig tableConfig : tableConfigs) {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("tableName", tableConfig.getTableName());
+        tasks.add(new PinotTaskConfig(_taskType, configs));
+      }
+      return tasks;
+    }
+
+    @Override
+    public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs) {
+      if (_generationDelay > 0) {
+        try {
+          Thread.sleep(_generationDelay);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      _taskGenerationCount++;
+      Map<String, String> configs = new HashMap<>();
+      configs.put("tableName", tableConfig.getTableName());
+      return Collections.singletonList(new PinotTaskConfig(_taskType, 
configs));
+    }
+
+    @Override
+    public void generateTasks(List<TableConfig> tableConfigs, 
List<PinotTaskConfig> pinotTaskConfigs) {
+      if (_generationDelay > 0) {
+        try {
+          Thread.sleep(_generationDelay);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      _taskGenerationCount++;
+      Map<String, String> configs = new HashMap<>();
+      configs.put("tableName", tableConfigs.get(0).getTableName());
+      pinotTaskConfigs.add(new PinotTaskConfig(_taskType, configs));
+    }
+  }
+
+  /**
+   * Test scenario: Tests the forceReleaseLock API functionality.
+   * Verifies that when forceReleaseLock is called during task execution,
+   * the lock is released and another createTask can be started for the same 
table.
+   */
+  @Test
+  public void testForceReleaseLockDuringTaskExecution() throws Exception {
+    // Setup controller with distributed locking enabled
+    Map<String, Object> properties = getDefaultControllerConfiguration();
+    
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 true);
+    
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
 true);
+
+    startController(properties);
+    PinotTaskManager taskManager = _controllerStarter.getTaskManager();
+
+    try {
+      // Setup test environment
+      addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+      // Create and register a slow task generator that will hold the lock for 
a long time
+      ControllableTaskGenerator slowGenerator = new 
ControllableTaskGenerator(TEST_TASK_TYPE);
+      slowGenerator.setGenerationDelay(5000); // 5 second delay to simulate 
long-running task
+      ClusterInfoAccessor clusterInfoAccessor = 
Mockito.mock(ClusterInfoAccessor.class);
+      slowGenerator.init(clusterInfoAccessor);
+      taskManager.registerTaskGenerator(slowGenerator);
+
+      // Ensure task queue exists
+      
_controllerStarter.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+      // Create schema and table
+      Schema schema = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema);
+      createSingleTestTable(RAW_TABLE_NAME_1);
+
+      String tableNameWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1);
+
+      // Wait for controller to be fully initialized
+      Thread.sleep(1000);
+
+      ExecutorService executor = Executors.newFixedThreadPool(2);
+      CountDownLatch startLatch = new CountDownLatch(1);
+      CountDownLatch firstTaskStarted = new CountDownLatch(1);
+      CountDownLatch completionLatch = new CountDownLatch(2);
+
+      AtomicInteger firstTaskCompleted = new AtomicInteger(0);
+      AtomicInteger secondTaskCompleted = new AtomicInteger(0);
+      AtomicInteger forceReleaseResult = new AtomicInteger(0);
+      AtomicInteger forceReleaseResultException = new AtomicInteger(0);
+
+      // First createTask - this will acquire the lock and run for 5 seconds
+      executor.submit(() -> {
+        try {
+          startLatch.await();
+          firstTaskStarted.countDown(); // Signal that first task has started
+          Map<String, String> result = taskManager.createTask(TEST_TASK_TYPE, 
RAW_TABLE_NAME_1, null, new HashMap<>());
+          if (result != null && !result.isEmpty()) {
+            firstTaskCompleted.incrementAndGet();
+          }
+        } catch (Exception ignored) {
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+
+      // Second thread: Wait for first task to start, then force release lock 
and try another createTask
+      executor.submit(() -> {
+        try {
+          startLatch.await();
+          firstTaskStarted.await(); // Wait for first task to start
+          Thread.sleep(1000); // Let first task run for a bit to ensure it has 
the lock
+
+          // Force release the lock while first task is still running
+          try {
+            taskManager.forceReleaseLock(tableNameWithType);
+            forceReleaseResult.incrementAndGet();
+          } catch (Exception e) {
+            forceReleaseResultException.incrementAndGet();
+          }
+
+          // Now try to create another task for the same table - this should 
succeed since lock was released
+          Thread.sleep(500); // Small delay to ensure lock release is processed
+          Map<String, String> result = taskManager.createTask(TEST_TASK_TYPE, 
RAW_TABLE_NAME_1, null, new HashMap<>());
+          if (result != null && !result.isEmpty()) {
+            secondTaskCompleted.incrementAndGet();
+          }
+        } catch (Exception ignored) {
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+
+      // Start both operations
+      startLatch.countDown();
+
+      // Wait for completion
+      assertTrue(completionLatch.await(15, TimeUnit.SECONDS), "Tasks should 
complete within 15 seconds");
+
+      executor.shutdownNow();
+      assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS), "Executor 
should terminate");
+
+      // Verify results
+      assertEquals(forceReleaseResult.get(), 1, "forceReleaseLock should 
succeed");
+      assertEquals(forceReleaseResultException.get(), 0, "forceReleaseLock 
shouldn't throw exception");
+      assertEquals(firstTaskCompleted.get(), 1, "First createTask should 
complete successfully");
+      assertEquals(secondTaskCompleted.get(), 1, "Second createTask should 
succeed after lock was force released");
+
+      // Verify that both tasks were generated (first task continues even 
after lock release)
+      assertEquals(slowGenerator.getTaskGenerationCount(), 2, "Both createTask 
calls should have generated tasks");
+    } finally {
+      // Cleanup
+      try {
+        // Cancel all running tasks before dropping tables
+        Map<String, TaskState> taskStates = 
_controllerStarter.getHelixTaskResourceManager()
+            .getTaskStates(TEST_TASK_TYPE);
+        for (String taskName : taskStates.keySet()) {
+          try {
+            
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+          } catch (Exception e) {
+            // Ignore individual task deletion errors
+          }
+        }
+        Thread.sleep(1000); // Give time for task cancellation
+
+        dropOfflineTable(RAW_TABLE_NAME_1);
+      } catch (Exception ignored) {
+      }
+    }
+  }
+
+  /**
+   * Test scenario: Tests the forceReleaseLock API functionality to ensure it 
throws exception if no lock is present
+   */
+  @Test
+  public void testForceReleaseLockWhenNoLockExists() throws Exception {
+    // Setup controller with distributed locking enabled
+    Map<String, Object> properties = getDefaultControllerConfiguration();
+    
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 true);
+    
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
 true);
+
+    startController(properties);
+    PinotTaskManager taskManager = _controllerStarter.getTaskManager();
+
+    try {
+      // Setup test environment
+      addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+      addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+      // Create and register a slow task generator that will hold the lock for 
a long time
+      ControllableTaskGenerator slowGenerator = new 
ControllableTaskGenerator(TEST_TASK_TYPE);
+      slowGenerator.setGenerationDelay(1000); // 1 second delay to simulate 
long-running task
+      ClusterInfoAccessor clusterInfoAccessor = 
Mockito.mock(ClusterInfoAccessor.class);
+      slowGenerator.init(clusterInfoAccessor);
+      taskManager.registerTaskGenerator(slowGenerator);
+
+      // Ensure task queue exists
+      
_controllerStarter.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+      // Create schema and table
+      Schema schema = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+          .addSingleValueDimension("testCol", 
FieldSpec.DataType.STRING).build();
+      addSchema(schema);
+      createSingleTestTable(RAW_TABLE_NAME_1);
+
+      String tableNameWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1);
+
+      // Wait for controller to be fully initialized
+      Thread.sleep(1000);
+
+      ExecutorService executor = Executors.newFixedThreadPool(1);
+      CountDownLatch startLatch = new CountDownLatch(1);
+      CountDownLatch completionLatch = new CountDownLatch(1);
+
+      AtomicInteger firstTaskCompleted = new AtomicInteger(0);
+      AtomicInteger forceReleaseResult = new AtomicInteger(0);
+      AtomicInteger forceReleaseResultException = new AtomicInteger(0);
+
+      // Force release lock and try a createTask
+      executor.submit(() -> {
+        try {
+          startLatch.await();
+
+          // Force release the lock while first task is still running
+          try {
+            taskManager.forceReleaseLock(tableNameWithType);
+            forceReleaseResult.incrementAndGet();
+          } catch (Exception e) {
+            forceReleaseResultException.incrementAndGet();
+          }
+
+          // Now try to create another task for the same table - this should 
succeed since lock was released
+          Thread.sleep(500); // Small delay to ensure lock release is processed
+          Map<String, String> result = taskManager.createTask(TEST_TASK_TYPE, 
RAW_TABLE_NAME_1, null, new HashMap<>());
+          if (result != null && !result.isEmpty()) {
+            firstTaskCompleted.incrementAndGet();
+          }
+        } catch (Exception ignored) {
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+
+      // Start operation
+      startLatch.countDown();
+
+      // Wait for completion
+      assertTrue(completionLatch.await(15, TimeUnit.SECONDS), "Tasks should 
complete within 15 seconds");
+
+      executor.shutdownNow();
+      assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS), "Executor 
should terminate");
+
+      // Verify results
+      assertEquals(forceReleaseResult.get(), 0,
+          "forceReleaseLock should have thrown exception as no lock should be 
held");
+      assertEquals(forceReleaseResultException.get(), 1, "forceReleaseLock 
should throw an exception");
+      assertEquals(firstTaskCompleted.get(), 1, "First createTask should 
complete successfully");
+
+      // Verify that both tasks were generated (first task continues even 
after lock release)
+      assertEquals(slowGenerator.getTaskGenerationCount(), 1, "Both createTask 
calls should have generated tasks");
+    } finally {
+      // Cleanup
+      try {
+        // Cancel all running tasks before dropping tables
+        Map<String, TaskState> taskStates = 
_controllerStarter.getHelixTaskResourceManager()
+            .getTaskStates(TEST_TASK_TYPE);
+        for (String taskName : taskStates.keySet()) {
+          try {
+            
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+          } catch (Exception e) {
+            // Ignore individual task deletion errors
+          }
+        }
+        Thread.sleep(1000); // Give time for task cancellation
+
+        dropOfflineTable(RAW_TABLE_NAME_1);
+      } catch (Exception ignored) {
+      }
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 000aab90c80..b8fc93363df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -166,5 +166,6 @@ public class Actions {
     public static final String UPLOAD_SEGMENT = "UploadSegment";
     public static final String VALIDATE_SCHEMA = "ValidateSchema";
     public static final String VALIDATE_TABLE_CONFIGS = "ValidateTableConfigs";
+    public static final String FORCE_RELEASE_TASK_GENERATION_LOCK = 
"ForceReleaseTaskGenerationLock";
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to