This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4b8ec201e93 Add table level distributed locking for ensuring minion
task generation is atomic (#16857)
4b8ec201e93 is described below
commit 4b8ec201e933ce1928848ed291f11fcf9a298a7f
Author: Sonam Mandal <[email protected]>
AuthorDate: Thu Oct 9 10:16:05 2025 -0700
Add table level distributed locking for ensuring minion task generation is
atomic (#16857)
* Add table level distributed locking for ensuring minion task generation
is atomic
* Remove -State ZNode to simplify distributed locking mechanism
* Address review comments
* Add tests for PinotTaskManager with distributed locking enabled
* Address review comments around force releasing the lock
* Add ability to retry lock release on failure and if it still exists
---
.../pinot/common/metadata/ZKMetadataProvider.java | 6 +
.../pinot/common/metrics/ControllerTimer.java | 1 +
.../apache/pinot/controller/ControllerConf.java | 9 +
.../api/resources/PinotTaskRestletResource.java | 21 +
.../core/minion/DistributedTaskLockManager.java | 353 ++++++
.../helix/core/minion/PinotTaskManager.java | 220 ++--
.../helix/core/minion/TaskSchedulingInfo.java | 6 +
.../minion/DistributedTaskLockManagerTest.java | 365 +++++++
.../PinotTaskManagerDistributedLockingTest.java | 1118 ++++++++++++++++++++
.../java/org/apache/pinot/core/auth/Actions.java | 1 +
10 files changed, 2039 insertions(+), 61 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 42be2fe5ae1..00db262b8d4 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -85,6 +85,7 @@ public class ZKMetadataProvider {
private static final String PROPERTYSTORE_SEGMENT_LINEAGE =
"/SEGMENT_LINEAGE";
private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX =
"/MINION_TASK_METADATA";
private static final String PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX =
"/CONFIGS/QUERYWORKLOAD";
+ private static final String PROPERTYSTORE_TASK_LOCK_SUFFIX = "-Lock";
public static void setUserConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String username, ZNRecord znRecord) {
propertyStore.set(constructPropertyStorePathForUserConfig(username),
znRecord, AccessOption.PERSISTENT);
@@ -308,6 +309,11 @@ public class ZKMetadataProvider {
return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX,
tableNameWithType);
}
+ public static String
constructPropertyStorePathForMinionTaskGenerationLock(String tableNameWithType)
{
+ return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX,
tableNameWithType
+ + PROPERTYSTORE_TASK_LOCK_SUFFIX);
+ }
+
public static String getPropertyStoreWorkloadConfigsPrefix() {
return PROPERTYSTORE_QUERY_WORKLOAD_CONFIGS_PREFIX;
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
index 82ddc6ba51a..d8f344e2ac5 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
@@ -29,6 +29,7 @@ public enum ControllerTimer implements AbstractMetrics.Timer {
TABLE_REBALANCE_EXECUTION_TIME_MS("tableRebalanceExecutionTimeMs", false),
CRON_SCHEDULER_JOB_EXECUTION_TIME_MS("cronSchedulerJobExecutionTimeMs",
false),
IDEAL_STATE_UPDATE_TIME_MS("IdealStateUpdateTimeMs", false),
+
MINION_TASK_GENERATION_LOCK_HELD_ELAPSED_TIME_MS("minionTaskGenerationLockHeldElapsedTimeMs",
false),
// How long it took the server to start.
STARTUP_SUCCESS_DURATION_MS("startupSuccessDurationMs", true),
// Time taken to read the segment from deep store
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index a56ff5594c9..ee7c7c4dd35 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -172,6 +172,10 @@ public class ControllerConf extends PinotConfiguration {
// end of the task.
public static final String PINOT_TASK_EXPIRE_TIME_MS =
"controller.task.expire.time.ms";
+ // Distributed lock enablement for PinotTaskManager
+ public static final String ENABLE_DISTRIBUTED_LOCKING =
"controller.task.enableDistributedLocking";
+ public static final boolean DEFAULT_ENABLE_DISTRIBUTED_LOCKING = false;
+
@Deprecated
// RealtimeSegmentRelocator has been rebranded as SegmentRelocator
public static final String DEPRECATED_REALTIME_SEGMENT_RELOCATOR_FREQUENCY
=
@@ -1210,6 +1214,11 @@ public class ControllerConf extends PinotConfiguration {
return
getProperty(ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
false);
}
+ public boolean isPinotTaskManagerDistributedLockingEnabled() {
+ return getProperty(ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
+ ControllerPeriodicTasksConf.DEFAULT_ENABLE_DISTRIBUTED_LOCKING);
+ }
+
public long getPinotTaskExpireTimeInMs() {
return getProperty(ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS,
TimeUnit.HOURS.toMillis(24));
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 56c5aed10e8..6924296b9c2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -791,4 +791,25 @@ public class PinotTaskRestletResource {
_pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete);
return new SuccessResponse("Successfully deleted task: " + taskName);
}
+
+ @DELETE
+ @Path("/tasks/lock/forceRelease")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.FORCE_RELEASE_TASK_GENERATION_LOCK,
+ paramName = "tableNameWithType")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation("Force releases the task generation lock for a given table.
Call this API with caution")
+ public SuccessResponse forceReleaseTaskGenerationLock(
+ @ApiParam(value = "Table name (with type suffix).", required = true)
+ @QueryParam("tableNameWithType") String tableNameWithType) {
+ try {
+ _pinotTaskManager.forceReleaseLock(tableNameWithType);
+ return new SuccessResponse("Successfully released task generation lock
on table " + tableNameWithType
+ + " for all task types");
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to release task
generation lock on table: "
+ + tableNameWithType + ", with exception: " +
ExceptionUtils.getStackTrace(e),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManager.java
new file mode 100644
index 00000000000..e98f39a2adf
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManager.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ControllerTimer;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manages distributed locks for minion task generation using ZooKeeper
ephemeral nodes that automatically disappear
+ * when the controller session ends or when the lock is explicitly released.
This approach provides automatic cleanup
+ * and is suitable for long-running task generation.
+ * Locks are at the table level, to ensure that only one type of task can be
generated per table at any given time.
+ * This is to prevent task types which shouldn't run in parallel from being
generated at the same time.
+ * <p>
+ * ZK EPHEMERAL Lock Node:
+ * <ul>
+ * <li>Every lock is created at the table level with the name:
{tableName}-Lock, under the base path
+ * MINION_TASK_METADATA within the PROPERTYSTORE.
+ * <li>If the propertyStore::create() call returns true, that means the
lock node was successfully created and the
+ * lock belongs to the current controller, otherwise it was not. If the
lock node already exists, this will return
+ * false. No clean-up of the lock node is needed if the
propertyStore::create() call returns false.
+ * <li>The locks are EPHEMERAL in nature, meaning that once the session
with ZK is lost, the lock is automatically
+ * cleaned up. Scenarios when the ZK session can be lost: a) controller
shutdown, b) controller crash, c) ZK session
+ * expiry (e.g. long GC pauses can cause this). This property helps ensure
that the lock is released under
+ * controller failure.
+ * </ul>
+ * <p>
+ */
+public class DistributedTaskLockManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DistributedTaskLockManager.class);
+
+ // Lock paths are constructed using ZKMetadataProvider
+ private static final String LOCK_OWNER_KEY = "lockOwner";
+ private static final String LOCK_CREATION_TIME_MS = "lockCreationTimeMs";
+
+ // Retry constants
+ private static final int MAX_RETRIES = 3;
+ private static final int BASE_RETRY_DELAY_MS = 100;
+
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private final String _controllerInstanceId;
+ private final ControllerMetrics _controllerMetrics;
+
+ public DistributedTaskLockManager(ZkHelixPropertyStore<ZNRecord>
propertyStore, String controllerInstanceId) {
+ _propertyStore = propertyStore;
+ _controllerInstanceId = controllerInstanceId;
+ _controllerMetrics = ControllerMetrics.get();
+ }
+
+ /**
+ * Attempts to acquire a distributed lock at the table level for task
generation using session-based locking.
+ * The lock is at the table level instead of the task level to ensure that
only a single task can be generated for
+ * a given table at any time. Certain tasks depend on other tasks not being
generated at the same time
+ * The lock is held until explicitly released or the controller session ends.
+ *
+ * @param tableNameWithType the table name with type
+ * @return TaskLock object if successful, null if lock could not be acquired
+ */
+ @Nullable
+ public TaskLock acquireLock(String tableNameWithType) {
+ LOGGER.info("Attempting to acquire task generation lock for table: {} by
controller: {}", tableNameWithType,
+ _controllerInstanceId);
+
+ try {
+ // Check if task generation is already in progress
+ if (isTaskGenerationInProgress(tableNameWithType)) {
+ LOGGER.info("Task generation already in progress for: {} by this or
another controller", tableNameWithType);
+ return null;
+ }
+
+ // Try to acquire the lock using ephemeral node
+ TaskLock lock = tryAcquireSessionBasedLock(tableNameWithType);
+ if (lock != null) {
+ LOGGER.info("Successfully acquired task generation lock for table: {}
by controller: {}", tableNameWithType,
+ _controllerInstanceId);
+ return lock;
+ } else {
+ LOGGER.warn("Could not acquire lock for table: {} - another controller
must hold it", tableNameWithType);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error while trying to acquire task lock for table: {}",
tableNameWithType, e);
+ }
+ return null;
+ }
+
+ private String getLockPath(String tableNameForPath) {
+ return
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameForPath);
+ }
+
+ /**
+ * Releases a previously acquired session-based lock and marks task
generation as completed.
+ *
+ * @param lock the lock to release
+ * @return true if successfully released, false otherwise
+ */
+ public boolean releaseLock(TaskLock lock) {
+ if (lock == null) {
+ return true;
+ }
+
+ String tableNameWithType = lock.getTableNameWithType();
+ String lockNode = lock.getLockZNodePath();
+
+ // Remove the ephemeral lock node
+ boolean status = true;
+ if (lockNode != null) {
+ try {
+ if (_propertyStore.exists(lockNode, AccessOption.EPHEMERAL)) {
+ status = removeWithRetries(lockNode, tableNameWithType);
+ } else {
+ LOGGER.warn("Ephemeral lock node: {} does not exist for table: {},
nothing to remove",
+ lockNode, tableNameWithType);
+ }
+ } catch (Exception e) {
+ status = false;
+ LOGGER.warn("Exception while trying to remove ephemeral lock node:
{}", lockNode, e);
+ }
+ } else {
+ LOGGER.warn("Lock node path seems to be null for task lock: {}, treating
release as a no-op", lock);
+ }
+
+ return status;
+ }
+
+ /**
+ * Attempts to remove a lock node with retries and exponential backoff.
+ * Only retries if the lock exists and the remove operation returns false.
+ */
+ private boolean removeWithRetries(String lockNode, String tableNameWithType)
{
+ for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
+ try {
+ // Check if lock still exists before attempting removal
+ if (!_propertyStore.exists(lockNode, AccessOption.EPHEMERAL)) {
+ LOGGER.info("Lock node: {} no longer exists for table: {},
considering removal successful", lockNode,
+ tableNameWithType);
+ return true;
+ }
+
+ boolean removed = _propertyStore.remove(lockNode,
AccessOption.EPHEMERAL);
+
+ if (removed) {
+ LOGGER.info("Successfully removed ephemeral lock node: {} for table:
{} by controller: {} on attempt: {}",
+ lockNode, tableNameWithType, _controllerInstanceId, attempt);
+ return true;
+ } else {
+ LOGGER.warn("Failed to remove ephemeral lock node: {} for table: {}
by controller: {} on attempt: {}/{}",
+ lockNode, tableNameWithType, _controllerInstanceId, attempt,
MAX_RETRIES);
+
+ // If this is not the last attempt, wait before retrying
+ if (attempt < MAX_RETRIES) {
+ // Exponential backoff: 100ms, 200ms, 400ms
+ long delayMs = BASE_RETRY_DELAY_MS * (1L << (attempt - 1));
+ LOGGER.info("Retrying lock removal for table: {} after {}ms
delay", tableNameWithType, delayMs);
+ Thread.sleep(delayMs);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while waiting to retry lock removal for
table: {}", tableNameWithType);
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (Exception e) {
+ LOGGER.warn("Exception while trying to remove ephemeral lock node: {}
on attempt: {}/{}", lockNode, attempt,
+ MAX_RETRIES, e);
+
+ // If this is not the last attempt, wait before retrying
+ if (attempt < MAX_RETRIES) {
+ try {
+ // Exponential backoff: 100ms, 200ms, 400ms
+ long delayMs = BASE_RETRY_DELAY_MS * (1L << (attempt - 1));
+ LOGGER.info("Retrying lock removal for table: {} after {}ms delay
due to exception", tableNameWithType,
+ delayMs);
+ Thread.sleep(delayMs);
+ } catch (InterruptedException ie) {
+ LOGGER.warn("Interrupted while waiting to retry lock removal for
table: {}", tableNameWithType);
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ }
+ }
+
+ LOGGER.error("Failed to remove ephemeral lock node: {} for table: {} after
{} attempts", lockNode,
+ tableNameWithType, MAX_RETRIES);
+ return false;
+ }
+
+ /**
+ * Force release the lock without checking if any tasks are in progress
+ */
+ public void forceReleaseLock(String tableNameWithType) {
+ LOGGER.info("Trying to force release the lock for table: {}",
tableNameWithType);
+ String lockPath = getLockPath(tableNameWithType);
+
+ if (!_propertyStore.exists(lockPath, AccessOption.EPHEMERAL)) {
+ String message = "No lock ZNode: " + lockPath + " found for table: " +
tableNameWithType
+ + ", nothing to force release";
+ LOGGER.warn(message);
+ throw new RuntimeException(message);
+ }
+
+ LOGGER.info("Lock for table: {} found at path: {}, trying to remove",
tableNameWithType, lockPath);
+ boolean result = _propertyStore.remove(lockPath, AccessOption.EPHEMERAL);
+ if (!result) {
+ String message = "Could not force release lock: " + lockPath + " for
table: " + tableNameWithType;
+ LOGGER.error(message);
+ throw new RuntimeException(message);
+ }
+ }
+
+ /**
+ * Checks if any task generation is currently in progress for the given
table.
+ *
+ * @param tableNameWithType the table name with type
+ * @return true if task generation is in progress, false otherwise
+ */
+ @VisibleForTesting
+ boolean isTaskGenerationInProgress(String tableNameWithType) {
+ String lockPath = getLockPath(tableNameWithType);
+
+ try {
+ long durationLockHeldMs = 0;
+ Stat stat = new Stat();
+ // Get the node instead of checking for existence to update the time
held metric in case the node exists
+ ZNRecord zNRecord = _propertyStore.get(lockPath, stat,
AccessOption.EPHEMERAL);
+ if (zNRecord != null) {
+ String creationTimeMsString =
zNRecord.getSimpleField(LOCK_CREATION_TIME_MS);
+ long creationTimeMs = stat.getCtime();
+ if (creationTimeMsString != null) {
+ try {
+ creationTimeMs = Long.parseLong(creationTimeMsString);
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Could not parse creationTimeMs string: {} into long
from ZNode, using ZNode creation time",
+ creationTimeMsString);
+ creationTimeMs = stat.getCtime();
+ }
+ }
+ durationLockHeldMs = System.currentTimeMillis() - creationTimeMs;
+ }
+ _controllerMetrics.addTimedValue(tableNameWithType,
+ ControllerTimer.MINION_TASK_GENERATION_LOCK_HELD_ELAPSED_TIME_MS,
durationLockHeldMs, TimeUnit.MILLISECONDS);
+ return zNRecord != null;
+ } catch (Exception e) {
+ LOGGER.error("Error checking task generation status for: {} with lock
path: {}", tableNameWithType, lockPath, e);
+ return false;
+ }
+ }
+
+ /**
+ * Attempts to acquire a lock using ephemeral nodes.
+ */
+ @VisibleForTesting
+ TaskLock tryAcquireSessionBasedLock(String tableNameWithType) {
+ String lockPath = getLockPath(tableNameWithType);
+
+ try {
+ long currentTimeMs = System.currentTimeMillis();
+
+ // Create ephemeral node for this table, owned by this controller
+ ZNRecord lockRecord = new ZNRecord(_controllerInstanceId);
+ lockRecord.setSimpleField(LOCK_OWNER_KEY, _controllerInstanceId);
+ lockRecord.setSimpleField(LOCK_CREATION_TIME_MS,
String.valueOf(currentTimeMs));
+
+ boolean created = _propertyStore.create(lockPath, lockRecord,
AccessOption.EPHEMERAL);
+
+ if (created) {
+ LOGGER.info("Successfully created lock node at path: {}, for
controller: {}, for table: {}", lockPath,
+ _controllerInstanceId, tableNameWithType);
+ return new TaskLock(tableNameWithType, _controllerInstanceId,
currentTimeMs, lockPath);
+ }
+
+ // We could not create the lock node, returning null
+ LOGGER.warn("Could not create lock node at path: {} for controller: {},
for table: {}", lockPath,
+ _controllerInstanceId, tableNameWithType);
+ } catch (Exception e) {
+ LOGGER.error("Error creating lock under path: {}, for controller: {},
for table: {}", lockPath,
+ _controllerInstanceId, tableNameWithType, e);
+ }
+
+ return null;
+ }
+
+ /**
+ * Represents a session-based distributed lock for task generation.
+ * The lock is automatically released when the controller session ends.
+ * The state node is periodically cleaned up
+ */
+ public static class TaskLock {
+ private final String _tableNameWithType;
+ private final String _owner;
+ private final long _creationTimeMs;
+ private final String _lockZNodePath; // Path to the ephemeral lock node
+
+ public TaskLock(String tableNameWithType, String owner, long
creationTimeMs, String lockZNodePath) {
+ _tableNameWithType = tableNameWithType;
+ _owner = owner;
+ _creationTimeMs = creationTimeMs;
+ _lockZNodePath = lockZNodePath;
+ }
+
+ public String getTableNameWithType() {
+ return _tableNameWithType;
+ }
+
+ public String getOwner() {
+ return _owner;
+ }
+
+ public long getCreationTimeMs() {
+ return _creationTimeMs;
+ }
+
+ public String getLockZNodePath() {
+ return _lockZNodePath;
+ }
+
+ public long getAge() {
+ return System.currentTimeMillis() - _creationTimeMs;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskLock{tableNameWithType='" + _tableNameWithType + "',
owner='" + _owner + "', creationTimeMs="
+ + _creationTimeMs + ", age=" + getAge() + ", lockZNodePath='" +
_lockZNodePath + "'}";
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 8b9f33e9dbd..d7b7eaf8868 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -122,6 +122,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
private final TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo>
_taskManagerStatusCache;
+ protected final @Nullable DistributedTaskLockManager
_distributedTaskLockManager;
+
public PinotTaskManager(PinotHelixTaskResourceManager
helixTaskResourceManager,
PinotHelixResourceManager helixResourceManager, LeadControllerManager
leadControllerManager,
ControllerConf controllerConf, ControllerMetrics controllerMetrics,
@@ -149,6 +151,18 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
} else {
_scheduler = null;
}
+
+ // For distributed locking
+ boolean enableDistributedLocking =
controllerConf.isPinotTaskManagerDistributedLockingEnabled();
+ if (enableDistributedLocking) {
+ LOGGER.info("Distributed locking is enabled for PinotTaskManager");
+ // Initialize distributed task lock manager if distributed locking is
enabled
+ _distributedTaskLockManager = new
DistributedTaskLockManager(helixResourceManager.getPropertyStore(),
+ helixResourceManager.getHelixZkManager().getInstanceName());
+ } else {
+ LOGGER.info("Distributed locking is disabled for PinotTaskManager");
+ _distributedTaskLockManager = null;
+ }
}
public void init() {
@@ -172,7 +186,7 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
}
}
- public Map<String, String> createTask(String taskType, String tableName,
@Nullable String taskName,
+ public synchronized Map<String, String> createTask(String taskType, String
tableName, @Nullable String taskName,
Map<String, String> taskConfigs)
throws Exception {
if (taskName == null) {
@@ -210,6 +224,7 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
if (tableNameWithTypes.isEmpty()) {
throw new TableNotFoundException("'tableName' " + tableName + " is not
found");
}
+ LOGGER.info("Generating tasks for {} tables, list: {}",
tableNameWithTypes.size(), tableNameWithTypes);
PinotTaskGenerator taskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
// Generate each type of tasks
@@ -239,33 +254,61 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
// Example usage in BaseTaskGenerator.getNumSubTasks()
taskConfigs.put(MinionConstants.TRIGGERED_BY,
CommonConstants.TaskTriggers.ADHOC_TRIGGER.name());
- List<PinotTaskConfig> pinotTaskConfigs =
taskGenerator.generateTasks(tableConfig, taskConfigs);
- if (pinotTaskConfigs.isEmpty()) {
- LOGGER.warn("No ad-hoc task generated for task type: {}", taskType);
- continue;
+ // Acquire distributed lock before proceeding with ad-hoc task generation
+ // Need locking to protect against:
+ // 1. Race conditions with periodic task generation
+ // 2. Multiple simultaneous ad-hoc requests
+ // 3. Leadership changes during task generation
+ DistributedTaskLockManager.TaskLock lock = null;
+ if (_distributedTaskLockManager != null) {
+ lock = _distributedTaskLockManager.acquireLock(tableNameWithType);
+ if (lock == null) {
+ String message = String.format("Could not acquire table level
distributed lock for ad-hoc task type: %s, "
+ + "table: %s. Another controller is likely generating tasks
for this table. Please try again later.",
+ taskType, tableNameWithType);
+ LOGGER.warn(message);
+ throw new RuntimeException(message);
+ }
+ LOGGER.info("Acquired table level distributed lock for ad-hoc task
type: {} on table: {}", taskType,
+ tableNameWithType);
}
- int maxNumberOfSubTasks = taskGenerator.getMaxAllowedSubTasksPerTask();
- if (pinotTaskConfigs.size() > maxNumberOfSubTasks) {
- String message = String.format(
- "Number of tasks generated for task type: %s for table: %s is %d,
which is greater than the "
- + "maximum number of tasks to schedule: %d. This is "
- + "controlled by the cluster config %s which is set based on
controller's performance.", taskType,
- tableName, pinotTaskConfigs.size(), maxNumberOfSubTasks,
MinionConstants.MAX_ALLOWED_SUB_TASKS_KEY);
- message += "Optimise the task config or reduce tableMaxNumTasks to
avoid the error";
- // We throw an exception to notify the user
- // This is to ensure that the user is aware of the task generation
limit
- throw new RuntimeException(message);
+
+ try {
+ List<PinotTaskConfig> pinotTaskConfigs =
taskGenerator.generateTasks(tableConfig, taskConfigs);
+ if (pinotTaskConfigs.isEmpty()) {
+ LOGGER.warn("No ad-hoc task generated for task type: {}, for table:
{}", taskType, tableNameWithType);
+ continue;
+ }
+ int maxNumberOfSubTasks = taskGenerator.getMaxAllowedSubTasksPerTask();
+ if (pinotTaskConfigs.size() > maxNumberOfSubTasks) {
+ String message = String.format(
+ "Number of tasks generated for task type: %s for table: %s is
%d, which is greater than the "
+ + "maximum number of tasks to schedule: %d. This is
controlled by the cluster config %s which is set "
+ + "based on controller's performance.", taskType,
tableNameWithType, pinotTaskConfigs.size(),
+ maxNumberOfSubTasks, MinionConstants.MAX_ALLOWED_SUB_TASKS_KEY);
+ message += "Optimise the task config or reduce tableMaxNumTasks to
avoid the error";
+ // We throw an exception to notify the user
+ // This is to ensure that the user is aware of the task generation
limit
+ throw new RuntimeException(message);
+ }
+ pinotTaskConfigs.forEach(pinotTaskConfig ->
pinotTaskConfig.getConfigs()
+ .computeIfAbsent(MinionConstants.TRIGGERED_BY, k ->
CommonConstants.TaskTriggers.ADHOC_TRIGGER.name()));
+ addDefaultsToTaskConfig(pinotTaskConfigs);
+ LOGGER.info("Submitting ad-hoc task for task type: {} with task
configs: {}", taskType, pinotTaskConfigs);
+ _controllerMetrics.addMeteredTableValue(taskType,
ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1);
+ responseMap.put(tableNameWithType,
+ _helixTaskResourceManager.submitTask(parentTaskName,
pinotTaskConfigs, minionInstanceTag,
+ taskGenerator.getTaskTimeoutMs(minionInstanceTag),
+
taskGenerator.getNumConcurrentTasksPerInstance(minionInstanceTag),
+ taskGenerator.getMaxAttemptsPerTask(minionInstanceTag)));
+ } finally {
+ if (!responseMap.containsKey(tableNameWithType)) {
+ LOGGER.warn("No task submitted for tableNameWithType: {}",
tableNameWithType);
+ }
+ if (lock != null) {
+ _distributedTaskLockManager.releaseLock(lock);
+ }
}
- pinotTaskConfigs.forEach(pinotTaskConfig -> pinotTaskConfig.getConfigs()
- .computeIfAbsent(MinionConstants.TRIGGERED_BY, k ->
CommonConstants.TaskTriggers.ADHOC_TRIGGER.name()));
- addDefaultsToTaskConfig(pinotTaskConfigs);
- LOGGER.info("Submitting ad-hoc task for task type: {} with task configs:
{}", taskType, pinotTaskConfigs);
- _controllerMetrics.addMeteredTableValue(taskType,
ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1);
- responseMap.put(tableNameWithType,
- _helixTaskResourceManager.submitTask(parentTaskName,
pinotTaskConfigs, minionInstanceTag,
- taskGenerator.getTaskTimeoutMs(minionInstanceTag),
-
taskGenerator.getNumConcurrentTasksPerInstance(minionInstanceTag),
- taskGenerator.getMaxAttemptsPerTask(minionInstanceTag)));
}
if (responseMap.isEmpty()) {
LOGGER.warn("No task submitted for tableName: {}", tableName);
@@ -273,6 +316,15 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
return responseMap;
}
+ public void forceReleaseLock(String tableNameWithType) {
+ if (_distributedTaskLockManager == null) {
+ String message = "Distributed task lock manager is disabled, no locks to
release";
+ LOGGER.warn(message);
+ throw new RuntimeException(message);
+ }
+ _distributedTaskLockManager.forceReleaseLock(tableNameWithType);
+ }
+
private class ZkTableConfigChangeListener implements IZkChildListener {
@Override
@@ -691,8 +743,45 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
if (taskGenerator != null) {
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
- tasksScheduled.put(taskType, scheduleTask(taskGenerator,
enabledTableConfigs, context.isLeader(),
- context.getMinionInstanceTag(), context.getTriggeredBy()));
+
+ // Take the lock for all tables for which to schedule the tasks and
pass the list of tables for which getting
+ // the lock was successful
+ // Need locking to protect against:
+ // 1. Race conditions with periodic task generation
+ // 2. Multiple simultaneous ad-hoc requests
+ // 3. Leadership changes during task generation
+ List<String> enabledTables =
+
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
+ Map<String, DistributedTaskLockManager.TaskLock> acquiredTaskLocks =
new HashMap<>();
+ for (String tableName : enabledTables) {
+ DistributedTaskLockManager.TaskLock lock;
+ if (_distributedTaskLockManager != null) {
+ lock = _distributedTaskLockManager.acquireLock(tableName);
+ if (lock == null) {
+ LOGGER.warn("Could not acquire table level distributed lock for
scheduled task type: {} on table: {}, "
+ + "skipping lock acquisition", taskType, tableName);
+ continue;
+ }
+ acquiredTaskLocks.put(tableName, lock);
+ LOGGER.info("Acquired table level distributed lock for scheduled
task type: {} on table: {}", taskType,
+ tableName);
+ }
+ }
+
+ try {
+ tasksScheduled.put(taskType, scheduleTask(taskGenerator,
enabledTableConfigs, context.isLeader(),
+ context.getMinionInstanceTag(), context.getTriggeredBy(),
acquiredTaskLocks));
+ } catch (RuntimeException e) {
+ LOGGER.error("Caught exception while trying to schedule task type:
{} for tables: {}, gathered responses: {}",
+ taskType, enabledTables, tasksScheduled, e);
+ throw e;
+ } finally {
+ // Release all the distributed table locks if any exist
+ assert acquiredTaskLocks.isEmpty() || _distributedTaskLockManager !=
null;
+ for (Map.Entry<String, DistributedTaskLockManager.TaskLock>
taskLockEntry : acquiredTaskLocks.entrySet()) {
+ _distributedTaskLockManager.releaseLock(taskLockEntry.getValue());
+ }
+ }
} else {
List<String> enabledTables =
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
@@ -728,7 +817,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* - list of task scheduling errors if any
*/
protected TaskSchedulingInfo scheduleTask(PinotTaskGenerator taskGenerator,
List<TableConfig> enabledTableConfigs,
- boolean isLeader, @Nullable String minionInstanceTagForTask, String
triggeredBy) {
+ boolean isLeader, @Nullable String minionInstanceTagForTask, String
triggeredBy,
+ Map<String, DistributedTaskLockManager.TaskLock> acquiredTaskLocks) {
TaskSchedulingInfo response = new TaskSchedulingInfo();
String taskType = taskGenerator.getTaskType();
List<String> enabledTables =
@@ -749,7 +839,7 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
+ "table: %s. Disk utilization for one or more servers hosting
this table has exceeded the threshold. "
+ "Tasks won't be generated until the issue is mitigated.",
tableName);
LOGGER.warn(message);
- response.addSchedulingError(message);
+ response.addGenerationError(message);
_controllerMetrics.setOrUpdateTableGauge(tableName,
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 1L);
continue;
}
@@ -767,39 +857,47 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
tableTaskConfig.getConfigsForTaskType(taskType).put(MinionConstants.TRIGGERED_BY,
triggeredBy);
}
- taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
- int maxNumberOfSubTasks = taskGenerator.getMaxAllowedSubTasksPerTask();
- if (presentTaskConfig.size() > maxNumberOfSubTasks) {
- String message = String.format(
- "Number of tasks generated for task type: %s for table: %s is
%d, which is greater than the "
- + "maximum number of tasks to schedule: %d. This is "
- + "controlled by the cluster config %s which is set based on
controller's performance.", taskType,
- tableName, presentTaskConfig.size(), maxNumberOfSubTasks,
MinionConstants.MAX_ALLOWED_SUB_TASKS_KEY);
- if (TaskSchedulingContext.isUserTriggeredTask(triggeredBy)) {
- message += "Optimise the task config or reduce tableMaxNumTasks to
avoid the error";
- presentTaskConfig.clear();
- // If the task is user-triggered, we throw an exception to notify
the user
- // This is to ensure that the user is aware of the task generation
limit
- throw new RuntimeException(message);
+ if (_distributedTaskLockManager == null ||
acquiredTaskLocks.containsKey(tableName)) {
+ taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
+ int maxNumberOfSubTasks =
taskGenerator.getMaxAllowedSubTasksPerTask();
+ if (presentTaskConfig.size() > maxNumberOfSubTasks) {
+ String message = String.format(
+ "Number of tasks generated for task type: %s for table: %s is
%d, which is greater than the "
+ + "maximum number of tasks to schedule: %d. This is "
+ + "controlled by the cluster config %s which is set based
on controller's performance.", taskType,
+ tableName, presentTaskConfig.size(), maxNumberOfSubTasks,
MinionConstants.MAX_ALLOWED_SUB_TASKS_KEY);
+ if (TaskSchedulingContext.isUserTriggeredTask(triggeredBy)) {
+ message += "Optimise the task config or reduce tableMaxNumTasks
to avoid the error";
+ presentTaskConfig.clear();
+ // If the task is user-triggered, we throw an exception to
notify the user
+ // This is to ensure that the user is aware of the task
generation limit
+ throw new RuntimeException(message);
+ }
+ // For scheduled tasks, we log a warning and limit the number of
tasks
+ LOGGER.warn(message + "Only the first {} tasks will be scheduled",
maxNumberOfSubTasks);
+ presentTaskConfig = new ArrayList<>(presentTaskConfig.subList(0,
maxNumberOfSubTasks));
+ // Provide user visibility to the maximum number of subtasks that
were used for the task
+ presentTaskConfig.forEach(pinotTaskConfig ->
pinotTaskConfig.getConfigs()
+ .put(MinionConstants.TABLE_MAX_NUM_TASKS_KEY,
String.valueOf(maxNumberOfSubTasks)));
}
- // For scheduled tasks, we log a warning and limit the number of
tasks
- LOGGER.warn(message + "Only the first {} tasks will be scheduled",
maxNumberOfSubTasks);
- presentTaskConfig = new ArrayList<>(presentTaskConfig.subList(0,
maxNumberOfSubTasks));
- // Provide user visibility to the maximum number of subtasks that
were used for the task
- presentTaskConfig.forEach(pinotTaskConfig ->
pinotTaskConfig.getConfigs()
- .put(MinionConstants.TABLE_MAX_NUM_TASKS_KEY,
String.valueOf(maxNumberOfSubTasks)));
+ minionInstanceTagToTaskConfigs.put(minionInstanceTag,
presentTaskConfig);
+ long successRunTimestamp = System.currentTimeMillis();
+ _taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
+ taskGeneratorMostRecentRunInfo ->
taskGeneratorMostRecentRunInfo.addSuccessRunTs(successRunTimestamp));
+ // before the first task schedule, the follow two gauge metrics will
be empty
+ // TODO: find a better way to report task generation information
+ _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
+
ControllerGauge.TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION,
+ () -> System.currentTimeMillis() - successRunTimestamp);
+ _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
+ ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR,
0L);
+ } else {
+ String message = String.format("Could not acquire table level
distributed lock for scheduled task type: "
+ + "%s, table: %s. Another controller is likely generating tasks
for this table. Please try again later.",
+ taskType, tableName);
+ LOGGER.warn(message);
+ response.addGenerationError(message);
}
- minionInstanceTagToTaskConfigs.put(minionInstanceTag,
presentTaskConfig);
- long successRunTimestamp = System.currentTimeMillis();
- _taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
- taskGeneratorMostRecentRunInfo ->
taskGeneratorMostRecentRunInfo.addSuccessRunTs(successRunTimestamp));
- // before the first task schedule, the follow two gauge metrics will
be empty
- // TODO: find a better way to report task generation information
- _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
-
ControllerGauge.TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION,
- () -> System.currentTimeMillis() - successRunTimestamp);
- _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
- ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 0L);
} catch (Exception e) {
StringWriter errors = new StringWriter();
try (PrintWriter pw = new PrintWriter(errors)) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
index 2ffa11676fa..76e1114a85b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
@@ -53,4 +53,10 @@ public class TaskSchedulingInfo {
public void addSchedulingError(String schedulingError) {
_schedulingErrors.add(schedulingError);
}
+
+ @Override
+ public String toString() {
+ return "TaskSchedulingInfo{scheduledTaskNames='" + _scheduledTaskNames +
"', generationErrors='" + _generationErrors
+ + "', schedulingErrors='" + _schedulingErrors + "'}";
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManagerTest.java
new file mode 100644
index 00000000000..642694c2a14
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/DistributedTaskLockManagerTest.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ControllerTimer;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.contains;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class DistributedTaskLockManagerTest {
+
+ @Test
+ public void testEphemeralLockAcquisitionAndRelease() {
+ // Mock the property store and data accessor
+ ZkHelixPropertyStore<ZNRecord> mockPropertyStore =
mock(ZkHelixPropertyStore.class);
+
+ // Configure mocks for ephemeral node creation
+ when(mockPropertyStore.exists(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
+ when(mockPropertyStore.create(anyString(), any(ZNRecord.class),
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+ when(mockPropertyStore.remove(anyString(),
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+
+ DistributedTaskLockManager lockManager = new
DistributedTaskLockManager(mockPropertyStore, "controller1");
+
+ // Test lock acquisition
+ DistributedTaskLockManager.TaskLock lock =
lockManager.acquireLock("testTable");
+ Assert.assertNotNull(lock, "Should successfully acquire lock");
+ assertEquals(lock.getOwner(), "controller1");
+ Assert.assertNotNull(lock.getLockZNodePath(), "Lock ZNode path should not
be null");
+ assertEquals(lock.getLockZNodePath(),
"/MINION_TASK_METADATA/testTable-Lock");
+ assertTrue(lock.getAge() >= 0, "Lock should have valid age");
+
+ // Test lock release
+ when(mockPropertyStore.exists(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+ boolean released = lockManager.releaseLock(lock);
+ Assert.assertTrue(released, "Should successfully release lock");
+
+ // Verify ephemeral node interactions
+ verify(mockPropertyStore, times(1)).create(eq(lock.getLockZNodePath()),
any(ZNRecord.class),
+ eq(AccessOption.EPHEMERAL));
+ verify(mockPropertyStore, times(1)).remove(eq(lock.getLockZNodePath()),
+ eq(AccessOption.EPHEMERAL));
+ }
+
+ @Test
+ public void testConcurrentEphemeralLockAcquisition() {
+ // Mock the property store and data accessor
+ ZkHelixPropertyStore<ZNRecord> mockPropertyStore =
mock(ZkHelixPropertyStore.class);
+
+ // Configure mocks to simulate another controller already has the lock
+ when(mockPropertyStore.exists(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
+ when(mockPropertyStore.create(anyString(), any(ZNRecord.class),
eq(AccessOption.EPHEMERAL))).thenReturn(false);
+ when(mockPropertyStore.remove(anyString(),
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+
+ DistributedTaskLockManager lockManager = new
DistributedTaskLockManager(mockPropertyStore, "controller1");
+
+ // Test lock acquisition should fail because create returned false
+ DistributedTaskLockManager.TaskLock lock =
lockManager.acquireLock("testTable");
+ Assert.assertNull(lock, "Should fail to acquire lock when create returns
false");
+
+ // Verify that create was called. No need to clean-up the lock so remove
should not have been called
+ verify(mockPropertyStore, times(1)).create(anyString(),
any(ZNRecord.class),
+ eq(AccessOption.EPHEMERAL));
+ verify(mockPropertyStore, times(0)).remove(anyString(),
eq(AccessOption.EPHEMERAL));
+ }
+
+ @Test
+ public void testTaskGenerationInProgressDetection() {
+ // Mock the property store and data accessor
+ ZkHelixPropertyStore<ZNRecord> mockPropertyStore =
mock(ZkHelixPropertyStore.class);
+
+ String controllerId = "controller1";
+
+ // Construct a record to return when propertyStore.get() is called on the
lock ZNode
+ ZNRecord record = new ZNRecord(controllerId);
+ record.setSimpleField("lockCreationTimeMs",
String.valueOf(System.currentTimeMillis()));
+ record.setSimpleField("lockOwner", controllerId);
+
+ // Simulate active ephemeral nodes indicating task generation in progress
+ when(mockPropertyStore.exists(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
+ when(mockPropertyStore.get(contains("testTable"), any(Stat.class),
eq(AccessOption.EPHEMERAL))).thenReturn(record);
+
+ DistributedTaskLockManager lockManager = new
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+ // Test that we can detect task generation in progress
+ boolean inProgress = lockManager.isTaskGenerationInProgress("testTable");
+ Assert.assertTrue(inProgress, "Should detect task generation in progress
when ephemeral nodes exist");
+
+ // Test that we detect that the task generation is not in progress when we
don't have any task lock
+ when(mockPropertyStore.get(contains("testTable"), any(Stat.class),
eq(AccessOption.EPHEMERAL))).thenReturn(null);
+ inProgress = lockManager.isTaskGenerationInProgress("testTable");
+ Assert.assertFalse(inProgress, "Should detect task generation in not
progress when ephemeral nodes don't exist");
+ }
+
+ @Test
+ public void testMinionTaskGenerationLockHeldMetric() {
+ // Mock the property store
+ ZkHelixPropertyStore<ZNRecord> mockPropertyStore =
mock(ZkHelixPropertyStore.class);
+
+ // Mock ControllerMetrics to verify metric calls
+ ControllerMetrics mockControllerMetrics = mock(ControllerMetrics.class);
+
+ String controllerId = "controller1";
+ String tableNameWithType = "testTable";
+
+ // Test case 1: Lock is held (record exists with creation time)
+ long lockCreationTime = System.currentTimeMillis() - 5000; // 5 seconds ago
+ ZNRecord lockRecord = new ZNRecord(controllerId);
+ lockRecord.setSimpleField("lockCreationTimeMs",
String.valueOf(lockCreationTime));
+ lockRecord.setSimpleField("lockOwner", controllerId);
+
+ when(mockPropertyStore.get(contains(tableNameWithType), any(Stat.class),
eq(AccessOption.EPHEMERAL)))
+ .thenReturn(lockRecord);
+
+ // Mock ControllerMetrics.get() to return our mock
+ try (var mockedStatic = mockStatic(ControllerMetrics.class)) {
+
mockedStatic.when(ControllerMetrics::get).thenReturn(mockControllerMetrics);
+
+ DistributedTaskLockManager lockManager = new
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+ // Call the method that should update the metric
+ boolean inProgress =
lockManager.isTaskGenerationInProgress(tableNameWithType);
+ Assert.assertTrue(inProgress, "Should detect task generation in progress
when lock record exists");
+
+ // Verify that addTimedValue was called with a non-zero duration
(approximately 5000ms)
+ verify(mockControllerMetrics, times(1)).addTimedValue(
+ eq(tableNameWithType),
+ eq(ControllerTimer.MINION_TASK_GENERATION_LOCK_HELD_ELAPSED_TIME_MS),
+ anyLong(), // We can't predict the exact value due to timing, but it
should be > 0
+ eq(TimeUnit.MILLISECONDS)
+ );
+ }
+
+ // Test case 2: No lock held (record is null)
+ when(mockPropertyStore.get(contains(tableNameWithType), any(Stat.class),
eq(AccessOption.EPHEMERAL)))
+ .thenReturn(null);
+
+ try (var mockedStatic = mockStatic(ControllerMetrics.class)) {
+
mockedStatic.when(ControllerMetrics::get).thenReturn(mockControllerMetrics);
+
+ DistributedTaskLockManager lockManager = new
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+ // Call the method that should update the metric
+ boolean inProgress =
lockManager.isTaskGenerationInProgress(tableNameWithType);
+ Assert.assertFalse(inProgress, "Should detect task generation not in
progress when no lock record exists");
+
+
+ // Verify that addTimedValue was called with 0 duration (no lock held)
+ verify(mockControllerMetrics, times(1)).addTimedValue(
+ eq(tableNameWithType),
+ eq(ControllerTimer.MINION_TASK_GENERATION_LOCK_HELD_ELAPSED_TIME_MS),
+ eq(0L), // Should be 0 when no lock is held
+ eq(TimeUnit.MILLISECONDS)
+ );
+ }
+ }
+
+ @Test
+ public void testLockReleaseRetriesOnFailure() throws InterruptedException {
+ // Mock the property store
+ ZkHelixPropertyStore<ZNRecord> mockPropertyStore =
mock(ZkHelixPropertyStore.class);
+ String controllerId = "controller1";
+ String tableNameWithType = "testTable";
+
+ // Configure mocks for lock acquisition
+ when(mockPropertyStore.exists(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
+ when(mockPropertyStore.create(anyString(), any(ZNRecord.class),
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+ // Mock isTaskGenerationInProgress to return null (no task in progress)
+ when(mockPropertyStore.get(anyString(), any(Stat.class),
eq(AccessOption.EPHEMERAL))).thenReturn(null);
+
+ DistributedTaskLockManager lockManager = new
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+ // Acquire a lock first
+ DistributedTaskLockManager.TaskLock lock =
lockManager.acquireLock(tableNameWithType);
+ Assert.assertNotNull(lock, "Should successfully acquire lock");
+
+ // Configure mock for lock release - simulate failure on first 2 attempts,
success on 3rd attempt
+ when(mockPropertyStore.exists(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL)))
+ .thenReturn(true); // Lock exists for all attempts
+
+ when(mockPropertyStore.remove(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL)))
+ .thenReturn(false) // First attempt fails
+ .thenReturn(false) // Second attempt fails
+ .thenReturn(true); // Third attempt succeeds
+
+ // Record start time to verify retry delays
+ long startTime = System.currentTimeMillis();
+
+ // Test lock release with retries
+ boolean released = lockManager.releaseLock(lock);
+
+ long elapsedTime = System.currentTimeMillis() - startTime;
+
+ // Verify that the lock was eventually released
+ Assert.assertTrue(released, "Should successfully release lock after
retries");
+
+ // Verify that remove was called 3 times (2 failures + 1 success)
+ verify(mockPropertyStore, times(3)).remove(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL));
+
+ // Verify that the total time includes retry delays
+ // Expected delays: 100ms + 200ms = 300ms minimum (plus some execution
overhead)
+ Assert.assertTrue(elapsedTime >= 250,
+ "Should have waited for retry delays. Elapsed time: " + elapsedTime +
"ms");
+
+ // Should not take too long (max 3 seconds for safety, accounting for test
environment variability)
+ Assert.assertTrue(elapsedTime < 3000,
+ "Should not take too long. Elapsed time: " + elapsedTime + "ms");
+ }
+
+ @Test
+ public void testLockReleaseFailsAfterMaxRetries() {
+ // Mock the property store
+ ZkHelixPropertyStore<ZNRecord> mockPropertyStore =
mock(ZkHelixPropertyStore.class);
+ String controllerId = "controller1";
+ String tableNameWithType = "testTable";
+
+ // Configure mocks for lock acquisition
+ when(mockPropertyStore.exists(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
+ when(mockPropertyStore.create(anyString(), any(ZNRecord.class),
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+ // Mock isTaskGenerationInProgress to return null (no task in progress)
+ when(mockPropertyStore.get(anyString(), any(Stat.class),
eq(AccessOption.EPHEMERAL))).thenReturn(null);
+
+ DistributedTaskLockManager lockManager = new
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+ // Acquire a lock first
+ DistributedTaskLockManager.TaskLock lock =
lockManager.acquireLock(tableNameWithType);
+ Assert.assertNotNull(lock, "Should successfully acquire lock");
+
+ // Configure mock for lock release - simulate failure on all attempts
+ when(mockPropertyStore.exists(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL)))
+ .thenReturn(true); // Lock exists for all attempts
+
+ when(mockPropertyStore.remove(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL)))
+ .thenReturn(false); // All attempts fail
+
+ // Test lock release with retries - should fail after max retries
+ boolean released = lockManager.releaseLock(lock);
+
+ // Verify that the lock release failed
+ Assert.assertFalse(released, "Should fail to release lock after max
retries");
+
+ // Verify that remove was called exactly MAX_RETRIES times (3 times)
+ verify(mockPropertyStore, times(3)).remove(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL));
+ }
+
+ @Test
+ public void testLockReleaseSucceedsWhenLockDisappearsBeforeRetry() {
+ // Mock the property store
+ ZkHelixPropertyStore<ZNRecord> mockPropertyStore =
mock(ZkHelixPropertyStore.class);
+ String controllerId = "controller1";
+ String tableNameWithType = "testTable";
+
+ // Configure mocks for lock acquisition
+ when(mockPropertyStore.exists(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
+ when(mockPropertyStore.create(anyString(), any(ZNRecord.class),
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+ // Mock isTaskGenerationInProgress to return null (no task in progress)
+ when(mockPropertyStore.get(anyString(), any(Stat.class),
eq(AccessOption.EPHEMERAL))).thenReturn(null);
+
+ DistributedTaskLockManager lockManager = new
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+ // Acquire a lock first
+ DistributedTaskLockManager.TaskLock lock =
lockManager.acquireLock(tableNameWithType);
+ Assert.assertNotNull(lock, "Should successfully acquire lock");
+
+ // Configure mock for lock release - simulate:
+ // 1. First exists() call in releaseLock(): lock exists, so it calls
removeWithRetries()
+ // 2. Second exists() call in removeWithRetries(): lock no longer exists
(disappeared between attempts)
+ when(mockPropertyStore.exists(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL)))
+ .thenReturn(true) // First check in releaseLock(): lock exists
+ .thenReturn(false); // Second check in removeWithRetries(): lock
disappeared
+
+ // Since the lock disappears, remove() should never be called
+
+ // Test lock release - should succeed when lock disappears
+ boolean released = lockManager.releaseLock(lock);
+
+ // Verify that the lock release succeeded
+ Assert.assertTrue(released, "Should succeed when lock disappears between
retry attempts");
+
+ // Verify that remove was never called because lock disappeared before
retry
+ verify(mockPropertyStore, times(0)).remove(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL));
+
+ // Verify that exists was called twice (releaseLock() +
removeWithRetries() first attempt)
+ verify(mockPropertyStore, times(2)).exists(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL));
+ }
+
+ @Test
+ public void testLockReleaseWithMultipleExistsChecks() {
+ // Mock the property store
+ ZkHelixPropertyStore<ZNRecord> mockPropertyStore =
mock(ZkHelixPropertyStore.class);
+ String controllerId = "controller1";
+ String tableNameWithType = "testTable";
+
+ // Configure mocks for lock acquisition
+ when(mockPropertyStore.exists(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
+ when(mockPropertyStore.create(anyString(), any(ZNRecord.class),
eq(AccessOption.EPHEMERAL))).thenReturn(true);
+ // Mock isTaskGenerationInProgress to return null (no task in progress)
+ when(mockPropertyStore.get(anyString(), any(Stat.class),
eq(AccessOption.EPHEMERAL))).thenReturn(null);
+
+ DistributedTaskLockManager lockManager = new
DistributedTaskLockManager(mockPropertyStore, controllerId);
+
+ // Acquire a lock first
+ DistributedTaskLockManager.TaskLock lock =
lockManager.acquireLock(tableNameWithType);
+ Assert.assertNotNull(lock, "Should successfully acquire lock");
+
+ // Configure mock for lock release - simulate:
+ // 1. First exists() call in releaseLock(): lock exists
+ // 2. Second exists() call in removeWithRetries() attempt 1: lock exists,
remove fails
+ // 3. Third exists() call in removeWithRetries() attempt 2: lock exists,
remove succeeds
+ when(mockPropertyStore.exists(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL)))
+ .thenReturn(true) // First check in releaseLock(): lock exists
+ .thenReturn(true) // Second check in removeWithRetries() attempt 1:
lock exists
+ .thenReturn(true); // Third check in removeWithRetries() attempt 2:
lock exists
+
+ when(mockPropertyStore.remove(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL)))
+ .thenReturn(false) // First remove attempt fails
+ .thenReturn(true); // Second remove attempt succeeds
+
+ // Test lock release - should succeed after retry
+ boolean released = lockManager.releaseLock(lock);
+
+ // Verify that the lock release succeeded
+ Assert.assertTrue(released, "Should succeed after retry");
+
+ // Verify that remove was called twice (first fails, second succeeds)
+ verify(mockPropertyStore, times(2)).remove(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL));
+
+ // Verify that exists was called 3 times (releaseLock() +
removeWithRetries() attempt 1 + attempt 2)
+ verify(mockPropertyStore, times(3)).exists(eq(lock.getLockZNodePath()),
eq(AccessOption.EPHEMERAL));
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerDistributedLockingTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerDistributedLockingTest.java
new file mode 100644
index 00000000000..a8e8a144fac
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerDistributedLockingTest.java
@@ -0,0 +1,1118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.controller.BaseControllerStarter;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Test class for PinotTaskManager with distributed locking enabled. Tests
scenarios run across multiple controllers
+ */
+public class PinotTaskManagerDistributedLockingTest extends ControllerTest {
+
+ private static final String RAW_TABLE_NAME_1 = "testTable1";
+ private static final String RAW_TABLE_NAME_2 = "testTable2";
+ private static final String RAW_TABLE_NAME_3 = "testTable3";
+
+ private static final String TEST_TASK_TYPE = "TestDistributedLockTaskType";
+
+ @BeforeMethod
+ public void setUpMethod() throws Exception {
+ startZk();
+ }
+
+ @AfterMethod
+ public void tearDownMethod() {
+ try {
+ if (_controllerStarter != null) {
+ // Clean up any running tasks before stopping controller
+ try {
+ Map<String, TaskState> taskStates =
_controllerStarter.getHelixTaskResourceManager()
+ .getTaskStates(TEST_TASK_TYPE);
+ for (String taskName : taskStates.keySet()) {
+ try {
+
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+ } catch (Exception e) {
+ // Ignore individual task deletion errors
+ }
+ }
+ Thread.sleep(500); // Give time for task cancellation
+ } catch (Exception e) {
+ // Ignore cleanup errors
+ }
+ stopController();
+ }
+ } catch (Exception e) {
+ // Ignore
+ }
+ try {
+ stopFakeInstances();
+ } catch (Exception e) {
+ // Ignore
+ }
+ stopZk();
+ }
+
+ /**
+ * Test scenario: Tests the schedule task happy path on a single controller
to ensure task generation goes through
+ * for all tables
+ */
+ @Test
+ public void testScheduleTaskForSpecificTables() throws Exception {
+ // Setup first controller with distributed locking enabled
+ Map<String, Object> properties1 = getDefaultControllerConfiguration();
+
properties1.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
true);
+
properties1.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
true);
+ properties1.put(ControllerConf.CONTROLLER_PORT, 21002);
+
+ // Setup second controller with distributed locking enabled (different
port)
+ Map<String, Object> properties2 = getDefaultControllerConfiguration();
+ // Disable scheduler to avoid Quartz conflicts
+
properties2.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
false);
+
properties2.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
true);
+ properties2.put(ControllerConf.CONTROLLER_PORT, 21003);
+
+ // Start both controllers
+ startController(properties1);
+ BaseControllerStarter controller1 = _controllerStarter;
+ PinotTaskManager taskManager1 = controller1.getTaskManager();
+
+ // Start second controller instance
+ BaseControllerStarter controller2 =
startControllerOnDifferentPort(properties2);
+ PinotTaskManager taskManager2 = controller2.getTaskManager();
+
+ try {
+ // Setup test environment (using first controller)
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+ // Create and register task generators on both controllers
+ ControllableTaskGenerator generator1 = new
ControllableTaskGenerator(TEST_TASK_TYPE);
+ generator1.setGenerationDelay(3000); // 3 second delay for createTask
+ ClusterInfoAccessor clusterInfoAccessor1 =
Mockito.mock(ClusterInfoAccessor.class);
+ generator1.init(clusterInfoAccessor1);
+ taskManager1.registerTaskGenerator(generator1);
+
+ ControllableTaskGenerator generator2 = new
ControllableTaskGenerator(TEST_TASK_TYPE);
+ generator2.setGenerationDelay(500); // Shorter delay for scheduleTasks
+ ClusterInfoAccessor clusterInfoAccessor2 =
Mockito.mock(ClusterInfoAccessor.class);
+ generator2.init(clusterInfoAccessor2);
+ taskManager2.registerTaskGenerator(generator2);
+
+ // Ensure task queues exist on both controllers
+
controller1.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
controller2.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+ // Create schemas and all three tables ONLY on controller1 to avoid
Quartz job conflicts
+ // The tables will be visible to both controllers via ZooKeeper
+ Schema schema1 = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema1);
+ createSingleTestTable(RAW_TABLE_NAME_1);
+
+ Schema schema2 = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_2)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema2);
+ createSingleTestTable(RAW_TABLE_NAME_2);
+
+ Schema schema3 = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_3)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema3);
+ createSingleTestTable(RAW_TABLE_NAME_3);
+
+ // Wait for controllers to be fully initialized
+ Thread.sleep(2000);
+
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(1);
+
+ AtomicInteger scheduleTasksCompleted = new AtomicInteger(0);
+ Map<String, TaskSchedulingInfo> scheduleTasksResult = new HashMap<>();
+
+ // Controller 2: Run scheduleTasks for all tables
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ Thread.sleep(500); // Start after createTask begins
+
+ TaskSchedulingContext context = new TaskSchedulingContext();
+ Set<String> tablesToSchedule = new HashSet<>();
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1));
// This should be blocked
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_2));
// This should succeed
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_3));
// This should succeed
+ context.setTablesToSchedule(tablesToSchedule);
+ context.setLeader(true);
+
+ Map<String, TaskSchedulingInfo> result =
taskManager2.scheduleTasks(context);
+ scheduleTasksResult.putAll(result);
+ scheduleTasksCompleted.incrementAndGet();
+ } catch (Exception ignored) {
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Start both operations
+ startLatch.countDown();
+
+ // Wait for completion
+ assertTrue(completionLatch.await(45, TimeUnit.SECONDS), "Tasks should
complete within 45 seconds");
+
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor
should terminate");
+
+ // scheduleTasks currently fails due to RuntimeException when generation
errors occur
+ // This is the current behavior - it throws exception instead of
returning partial results
+ assertEquals(scheduleTasksCompleted.get(), 1, "scheduleTasks should
complete");
+
+ // Verify that both controllers generated tasks
+ int totalGenerations = generator1.getTaskGenerationCount() +
generator2.getTaskGenerationCount();
+ assertEquals(totalGenerations, 3, "scheduleTasks should've succeeded on
one controller for three tables");
+
+ assertNotNull(scheduleTasksResult);
+ assertEquals(scheduleTasksResult.size(), 1);
+
+ TaskSchedulingInfo taskSchedulingInfo =
scheduleTasksResult.get(TEST_TASK_TYPE);
+ assertNotNull(taskSchedulingInfo);
+ assertNotNull(taskSchedulingInfo.getScheduledTaskNames());
+ assertNotNull(taskSchedulingInfo.getSchedulingErrors());
+ assertNotNull(taskSchedulingInfo.getGenerationErrors());
+ // Should see one task scheduled for both tables
+ assertEquals(taskSchedulingInfo.getScheduledTaskNames().size(), 1);
+ // Should see 0 errors
+ assertEquals(taskSchedulingInfo.getGenerationErrors().size(), 0);
+ assertEquals(taskSchedulingInfo.getSchedulingErrors().size(), 0);
+ } finally {
+ // Cleanup
+ try {
+ // Cancel all running tasks before dropping tables
+ Map<String, TaskState> taskStates =
_controllerStarter.getHelixTaskResourceManager()
+ .getTaskStates(TEST_TASK_TYPE);
+ for (String taskName : taskStates.keySet()) {
+ try {
+
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+ } catch (Exception e) {
+ // Ignore individual task deletion errors
+ }
+ }
+ Thread.sleep(1000); // Give time for task cancellation
+
+ dropOfflineTable(RAW_TABLE_NAME_1);
+ dropOfflineTable(RAW_TABLE_NAME_2);
+ dropOfflineTable(RAW_TABLE_NAME_3);
+ } catch (Exception ignored) {
+ }
+
+ // Stop second controller first
+ controller2.stop();
+ }
+ }
+
+ /**
+ * Test scenario: Two actual controllers trying to submit createTask for the
same table simultaneously.
+ * This test starts multiple controller instances to test true
multi-controller distributed locking.
+ */
+ @Test
+ public void testConcurrentCreateTaskFromMultipleControllers() throws
Exception {
+ // Setup first controller with distributed locking enabled
+ Map<String, Object> properties1 = getDefaultControllerConfiguration();
+
properties1.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
true);
+
properties1.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
true);
+ properties1.put(ControllerConf.CONTROLLER_PORT, 21000);
+
+ // Setup second controller with distributed locking enabled (different
port)
+ Map<String, Object> properties2 = getDefaultControllerConfiguration();
+
properties2.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
true);
+
properties2.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
true);
+ properties2.put(ControllerConf.CONTROLLER_PORT, 21001);
+
+ // Start both controllers
+ startController(properties1);
+ BaseControllerStarter controller1 = _controllerStarter;
+ PinotTaskManager taskManager1 = controller1.getTaskManager();
+
+ // Start second controller instance
+ BaseControllerStarter controller2 =
startControllerOnDifferentPort(properties2);
+ PinotTaskManager taskManager2 = controller2.getTaskManager();
+
+ try {
+ // Setup test environment (using first controller)
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+ // Create and register task generators on both controllers FIRST
+ ControllableTaskGenerator generator1 = new
ControllableTaskGenerator(TEST_TASK_TYPE);
+ generator1.setGenerationDelay(2000); // 2 second delay
+ ClusterInfoAccessor clusterInfoAccessor1 =
Mockito.mock(ClusterInfoAccessor.class);
+ generator1.init(clusterInfoAccessor1);
+ taskManager1.registerTaskGenerator(generator1);
+
+ ControllableTaskGenerator generator2 = new
ControllableTaskGenerator(TEST_TASK_TYPE);
+ generator2.setGenerationDelay(2000); // 2 second delay
+ ClusterInfoAccessor clusterInfoAccessor2 =
Mockito.mock(ClusterInfoAccessor.class);
+ generator2.init(clusterInfoAccessor2);
+ taskManager2.registerTaskGenerator(generator2);
+
+ // Ensure task queues exist on both controllers
+
controller1.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
controller2.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+ // Create schema and table AFTER registering task generators
+ Schema schema = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema);
+ createSingleTestTable(RAW_TABLE_NAME_1);
+
+ // Wait a bit for controllers to be fully initialized
+ Thread.sleep(2000);
+
+ // Now test concurrent createTask from both controllers
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(2);
+
+ AtomicInteger successfulCreations = new AtomicInteger(0);
+ AtomicInteger failedCreations = new AtomicInteger(0);
+
+ // Controller 1 attempts to create task
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ Map<String, String> result = taskManager1.createTask(TEST_TASK_TYPE,
RAW_TABLE_NAME_1, null, new HashMap<>());
+ if (result != null && !result.isEmpty()) {
+ successfulCreations.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failedCreations.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Controller 2 attempts to create task for the same table
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ Thread.sleep(100); // Small delay to ensure some overlap
+ Map<String, String> result = taskManager2.createTask(TEST_TASK_TYPE,
RAW_TABLE_NAME_1, null, new HashMap<>());
+ if (result != null && !result.isEmpty()) {
+ successfulCreations.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failedCreations.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Start both controllers simultaneously
+ startLatch.countDown();
+
+ // Wait for completion
+ assertTrue(completionLatch.await(30, TimeUnit.SECONDS), "Tasks should
complete within 30 seconds");
+
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor
should terminate");
+
+ // With distributed locking between actual controllers, exactly one
should succeed
+ assertEquals(successfulCreations.get(), 1, "Exactly one createTask
should succeed due to distributed locking");
+ assertEquals(failedCreations.get(), 1, "Exactly one createTask should
fail due to distributed locking");
+
+ int totalGenerations = generator1.getTaskGenerationCount() +
generator2.getTaskGenerationCount();
+ assertEquals(totalGenerations, 1, "At least one task generation should
have occurred");
+ } finally {
+ // Cleanup
+ try {
+ // Cancel all running tasks before dropping tables
+ Map<String, TaskState> taskStates =
_controllerStarter.getHelixTaskResourceManager()
+ .getTaskStates(TEST_TASK_TYPE);
+ for (String taskName : taskStates.keySet()) {
+ try {
+
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+ } catch (Exception e) {
+ // Ignore individual task deletion errors
+ }
+ }
+ Thread.sleep(1000); // Give time for task cancellation
+
+ dropOfflineTable(RAW_TABLE_NAME_1);
+ } catch (Exception ignored) {
+ }
+
+ // Stop second controller first
+ controller2.stop();
+ }
+ }
+
+ /**
+ * Test scenario: One controller runs createTask for a specific table while
another controller
+ * runs scheduleTasks for multiple tables including the locked table.
+ * The scheduleTasks should succeed for other tables but fail for the locked
table.
+ */
+ @Test
+ public void testCreateTaskBlocksScheduleTaskForSpecificTable() throws
Exception {
+ // Setup first controller with distributed locking enabled
+ Map<String, Object> properties1 = getDefaultControllerConfiguration();
+
properties1.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
true);
+
properties1.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
true);
+ properties1.put(ControllerConf.CONTROLLER_PORT, 21002);
+
+ // Setup second controller with distributed locking enabled (different
port)
+ Map<String, Object> properties2 = getDefaultControllerConfiguration();
+ // Disable scheduler to avoid Quartz conflicts
+
properties2.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
false);
+
properties2.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
true);
+ properties2.put(ControllerConf.CONTROLLER_PORT, 21003);
+
+ // Start both controllers
+ startController(properties1);
+ BaseControllerStarter controller1 = _controllerStarter;
+ PinotTaskManager taskManager1 = controller1.getTaskManager();
+
+ // Start second controller instance
+ BaseControllerStarter controller2 =
startControllerOnDifferentPort(properties2);
+ PinotTaskManager taskManager2 = controller2.getTaskManager();
+
+ try {
+ // Setup test environment (using first controller)
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+ // Create and register task generators on both controllers
+ ControllableTaskGenerator generator1 = new
ControllableTaskGenerator(TEST_TASK_TYPE);
+ generator1.setGenerationDelay(3000); // 3 second delay for createTask
+ ClusterInfoAccessor clusterInfoAccessor1 =
Mockito.mock(ClusterInfoAccessor.class);
+ generator1.init(clusterInfoAccessor1);
+ taskManager1.registerTaskGenerator(generator1);
+
+ ControllableTaskGenerator generator2 = new
ControllableTaskGenerator(TEST_TASK_TYPE);
+ generator2.setGenerationDelay(500); // Shorter delay for scheduleTasks
+ ClusterInfoAccessor clusterInfoAccessor2 =
Mockito.mock(ClusterInfoAccessor.class);
+ generator2.init(clusterInfoAccessor2);
+ taskManager2.registerTaskGenerator(generator2);
+
+ // Ensure task queues exist on both controllers
+
controller1.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
controller2.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+ // Create schemas and all three tables ONLY on controller1 to avoid
Quartz job conflicts
+ // The tables will be visible to both controllers via ZooKeeper
+ Schema schema1 = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema1);
+ createSingleTestTable(RAW_TABLE_NAME_1);
+
+ Schema schema2 = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_2)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema2);
+ createSingleTestTable(RAW_TABLE_NAME_2);
+
+ Schema schema3 = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_3)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema3);
+ createSingleTestTable(RAW_TABLE_NAME_3);
+
+ // Wait for controllers to be fully initialized
+ Thread.sleep(2000);
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(2);
+
+ AtomicInteger createTaskSuccessful = new AtomicInteger(0);
+ AtomicInteger scheduleTasksCompleted = new AtomicInteger(0);
+ Map<String, TaskSchedulingInfo> scheduleTasksResult = new HashMap<>();
+
+ // Controller 1: Run createTask for table1 (this will hold the lock for
table1)
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ Map<String, String> result = taskManager1.createTask(TEST_TASK_TYPE,
RAW_TABLE_NAME_1, null, new HashMap<>());
+ if (result != null && !result.isEmpty()) {
+ createTaskSuccessful.incrementAndGet();
+ }
+ } catch (Exception ignored) {
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Controller 2: Run scheduleTasks for all tables (should be blocked for
table1 but succeed for others)
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ Thread.sleep(500); // Start after createTask begins
+
+ TaskSchedulingContext context = new TaskSchedulingContext();
+ Set<String> tablesToSchedule = new HashSet<>();
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1));
// This should be blocked
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_2));
// This should succeed
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_3));
// This should succeed
+ context.setTablesToSchedule(tablesToSchedule);
+ context.setLeader(true);
+
+ Map<String, TaskSchedulingInfo> result =
taskManager2.scheduleTasks(context);
+ scheduleTasksResult.putAll(result);
+ scheduleTasksCompleted.incrementAndGet();
+ } catch (Exception ignored) {
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Start both operations
+ startLatch.countDown();
+
+ // Wait for completion
+ assertTrue(completionLatch.await(45, TimeUnit.SECONDS), "Tasks should
complete within 45 seconds");
+
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor
should terminate");
+
+ // Verify results
+ assertEquals(createTaskSuccessful.get(), 1, "createTask should succeed");
+
+ // scheduleTasks currently fails due to RuntimeException when generation
errors occur
+ // This is the current behavior - it throws exception instead of
returning partial results
+ assertEquals(scheduleTasksCompleted.get(), 1, "scheduleTasks should
complete");
+
+ // Verify that both controllers generated tasks
+ int totalGenerations = generator1.getTaskGenerationCount() +
generator2.getTaskGenerationCount();
+ assertTrue(totalGenerations >= 3, "Both createTask and scheduleTasks
should've succeeded on both controllers");
+
+ assertNotNull(scheduleTasksResult);
+ assertEquals(scheduleTasksResult.size(), 1);
+
+ TaskSchedulingInfo taskSchedulingInfo =
scheduleTasksResult.get(TEST_TASK_TYPE);
+ assertNotNull(taskSchedulingInfo);
+ assertNotNull(taskSchedulingInfo.getScheduledTaskNames());
+ assertNotNull(taskSchedulingInfo.getSchedulingErrors());
+ assertNotNull(taskSchedulingInfo.getGenerationErrors());
+ // Should see one task scheduled for both tables
+ assertEquals(taskSchedulingInfo.getScheduledTaskNames().size(), 1);
+ // Should see one generation error, indicating that testTable1's lock
couldn't be taken
+ assertEquals(taskSchedulingInfo.getGenerationErrors().size(), 1);
+ assertEquals(taskSchedulingInfo.getGenerationErrors().get(0), "Could not
acquire table level distributed lock "
+ + "for scheduled task type: TestDistributedLockTaskType, table:
testTable1_OFFLINE. Another controller is "
+ + "likely generating tasks for this table. Please try again later.");
+ assertEquals(taskSchedulingInfo.getSchedulingErrors().size(), 0);
+ } finally {
+ // Cleanup
+ try {
+ // Cancel all running tasks before dropping tables
+ Map<String, TaskState> taskStates =
_controllerStarter.getHelixTaskResourceManager()
+ .getTaskStates(TEST_TASK_TYPE);
+ for (String taskName : taskStates.keySet()) {
+ try {
+
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+ } catch (Exception e) {
+ // Ignore individual task deletion errors
+ }
+ }
+ Thread.sleep(1000); // Give time for task cancellation
+
+ dropOfflineTable(RAW_TABLE_NAME_1);
+ dropOfflineTable(RAW_TABLE_NAME_2);
+ dropOfflineTable(RAW_TABLE_NAME_3);
+ } catch (Exception ignored) {
+ }
+
+ // Stop second controller first
+ controller2.stop();
+ }
+ }
+
+ /**
+ * Test scenario: Both controllers scheduleTasks for the same table.
+ * The scheduleTasks should succeed for one of the controllers.
+ */
+ @Test
+ public void testMultipleScheduleTaskForSpecificTableOnBothControllers()
throws Exception {
+ // Setup first controller with distributed locking enabled
+ Map<String, Object> properties1 = getDefaultControllerConfiguration();
+
properties1.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
true);
+
properties1.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
true);
+ properties1.put(ControllerConf.CONTROLLER_PORT, 21002);
+
+ // Setup second controller with distributed locking enabled (different
port)
+ Map<String, Object> properties2 = getDefaultControllerConfiguration();
+ // Disable scheduler to avoid Quartz conflicts
+
properties2.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
false);
+
properties2.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
true);
+ properties2.put(ControllerConf.CONTROLLER_PORT, 21003);
+
+ // Start both controllers
+ startController(properties1);
+ BaseControllerStarter controller1 = _controllerStarter;
+ PinotTaskManager taskManager1 = controller1.getTaskManager();
+
+ // Start second controller instance
+ BaseControllerStarter controller2 =
startControllerOnDifferentPort(properties2);
+ PinotTaskManager taskManager2 = controller2.getTaskManager();
+
+ try {
+ // Setup test environment (using first controller)
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+ // Create and register task generators on both controllers
+ ControllableTaskGenerator generator1 = new
ControllableTaskGenerator(TEST_TASK_TYPE);
+ generator1.setGenerationDelay(3000); // 3 second delay for createTask
+ ClusterInfoAccessor clusterInfoAccessor1 =
Mockito.mock(ClusterInfoAccessor.class);
+ generator1.init(clusterInfoAccessor1);
+ taskManager1.registerTaskGenerator(generator1);
+
+ ControllableTaskGenerator generator2 = new
ControllableTaskGenerator(TEST_TASK_TYPE);
+ generator2.setGenerationDelay(500); // Shorter delay for scheduleTasks
+ ClusterInfoAccessor clusterInfoAccessor2 =
Mockito.mock(ClusterInfoAccessor.class);
+ generator2.init(clusterInfoAccessor2);
+ taskManager2.registerTaskGenerator(generator2);
+
+ // Ensure task queues exist on both controllers
+
controller1.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
controller2.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+ // Create schemas and all three tables ONLY on controller1 to avoid
Quartz job conflicts
+ // The tables will be visible to both controllers via ZooKeeper
+ Schema schema1 = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema1);
+ createSingleTestTable(RAW_TABLE_NAME_1);
+
+ Schema schema2 = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_2)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema2);
+ createSingleTestTable(RAW_TABLE_NAME_2);
+
+ Schema schema3 = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_3)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema3);
+ createSingleTestTable(RAW_TABLE_NAME_3);
+
+ // Wait for controllers to be fully initialized
+ Thread.sleep(2000);
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(2);
+
+ AtomicInteger scheduleTasksCompleted1 = new AtomicInteger(0);
+ AtomicInteger scheduleTasksCompleted2 = new AtomicInteger(0);
+ Map<String, TaskSchedulingInfo> scheduleTasksResult1 = new HashMap<>();
+ Map<String, TaskSchedulingInfo> scheduleTasksResult2 = new HashMap<>();
+
+ AtomicInteger failedCreations = new AtomicInteger(0);
+
+ // Controller 1: Run scheduleTasks for all tables (this will hold the
lock for all tables)
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+
+ TaskSchedulingContext context = new TaskSchedulingContext();
+ Set<String> tablesToSchedule = new HashSet<>();
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1));
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_2));
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_3));
+ context.setTablesToSchedule(tablesToSchedule);
+ context.setLeader(true);
+
+ Map<String, TaskSchedulingInfo> result =
taskManager1.scheduleTasks(context);
+ scheduleTasksResult1.putAll(result);
+ scheduleTasksCompleted1.incrementAndGet();
+ } catch (Exception ignored) {
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Controller 2: Run scheduleTasks for all tables (should fail)
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ Thread.sleep(500); // Start after controller1 begins
+
+ TaskSchedulingContext context = new TaskSchedulingContext();
+ Set<String> tablesToSchedule = new HashSet<>();
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1));
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_2));
+
tablesToSchedule.add(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_3));
+ context.setTablesToSchedule(tablesToSchedule);
+ context.setLeader(true);
+
+ Map<String, TaskSchedulingInfo> result =
taskManager2.scheduleTasks(context);
+ scheduleTasksResult2.putAll(result);
+ scheduleTasksCompleted2.incrementAndGet();
+ } catch (Exception e) {
+ failedCreations.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Start both operations
+ startLatch.countDown();
+
+ // Wait for completion
+ assertTrue(completionLatch.await(45, TimeUnit.SECONDS), "Tasks should
complete within 45 seconds");
+
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor
should terminate");
+
+ // Verify results
+ assertEquals(scheduleTasksCompleted1.get(), 1, "scheduleTasks on
controller1 should succeed");
+
+ // scheduleTasks currently fails due to RuntimeException when generation
errors occur
+ // This is the current behavior - it throws exception instead of
returning partial results
+ assertEquals(scheduleTasksCompleted2.get(), 0, "scheduleTasks on
controller2 shouldn't complete");
+
+ // Validate controller 2's scheduleTasks failed due to being unable to
get any locks
+ assertEquals(failedCreations.get(), 1, "Controller 2's scheduleTasks
should've failed");
+
+ // Verify that only 1 controller generated tasks
+ int totalGenerations = generator1.getTaskGenerationCount() +
generator2.getTaskGenerationCount();
+ assertEquals(totalGenerations, 3, "Only 1 scheduleTasks succeeded for
all three tables");
+ assertEquals(generator2.getTaskGenerationCount(), 0, "Controller 2's
task shouldn't have passed");
+
+ assertNotNull(scheduleTasksResult1);
+ assertEquals(scheduleTasksResult1.size(), 1);
+
+ assertNotNull(scheduleTasksResult2);
+ assertEquals(scheduleTasksResult2.size(), 0);
+
+ TaskSchedulingInfo taskSchedulingInfo =
scheduleTasksResult1.get(TEST_TASK_TYPE);
+ assertNotNull(taskSchedulingInfo);
+ assertNotNull(taskSchedulingInfo.getScheduledTaskNames());
+ assertNotNull(taskSchedulingInfo.getSchedulingErrors());
+ assertNotNull(taskSchedulingInfo.getGenerationErrors());
+ // Should see one task scheduled for both tables
+ assertEquals(taskSchedulingInfo.getScheduledTaskNames().size(), 1);
+ // Should see 0 errors
+ assertEquals(taskSchedulingInfo.getGenerationErrors().size(), 0);
+ assertEquals(taskSchedulingInfo.getSchedulingErrors().size(), 0);
+ } finally {
+ // Cleanup
+ try {
+ // Cancel all running tasks before dropping tables
+ Map<String, TaskState> taskStates =
_controllerStarter.getHelixTaskResourceManager()
+ .getTaskStates(TEST_TASK_TYPE);
+ for (String taskName : taskStates.keySet()) {
+ try {
+
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+ } catch (Exception e) {
+ // Ignore individual task deletion errors
+ }
+ }
+ Thread.sleep(1000); // Give time for task cancellation
+
+ dropOfflineTable(RAW_TABLE_NAME_1);
+ dropOfflineTable(RAW_TABLE_NAME_2);
+ dropOfflineTable(RAW_TABLE_NAME_3);
+ } catch (Exception ignored) {
+ }
+
+ // Stop second controller first
+ controller2.stop();
+ }
+ }
+
+ /**
+ * Helper method to start a second controller instance on a different port
+ */
+ private BaseControllerStarter startControllerOnDifferentPort(Map<String,
Object> properties) throws Exception {
+ BaseControllerStarter controllerStarter = createControllerStarter();
+ controllerStarter.init(new
org.apache.pinot.spi.env.PinotConfiguration(properties));
+ controllerStarter.start();
+ return controllerStarter;
+ }
+
+ /**
+ * Helper to create a single test table
+ */
+ private void createSingleTestTable(String rawTableName) throws Exception {
+ Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
+ taskTypeConfigsMap.put(TEST_TASK_TYPE, ImmutableMap.of("schedule", "0 */10
* ? * * *"));
+
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(rawTableName)
+ .setTaskConfig(new TableTaskConfig(taskTypeConfigsMap))
+ .build();
+
+ addTableConfig(tableConfig);
+ }
+
+ /**
+ * Task generator with controllable behavior
+ */
+ private static class ControllableTaskGenerator implements PinotTaskGenerator
{
+ private final String _taskType;
+ private int _taskGenerationCount = 0;
+ private long _generationDelay = 0;
+
+ public ControllableTaskGenerator(String taskType) {
+ _taskType = taskType;
+ }
+
+ @Override
+ public String getTaskType() {
+ return _taskType;
+ }
+
+ @Override
+ public void init(ClusterInfoAccessor clusterInfoAccessor) {
+ // No-op
+ }
+
+ public void setGenerationDelay(long delayMs) {
+ _generationDelay = delayMs;
+ }
+
+ public int getTaskGenerationCount() {
+ return _taskGenerationCount;
+ }
+
+ @Override
+ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs)
{
+ if (_generationDelay > 0) {
+ try {
+ Thread.sleep(_generationDelay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ _taskGenerationCount++;
+ List<PinotTaskConfig> tasks = new ArrayList<>();
+ for (TableConfig tableConfig : tableConfigs) {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("tableName", tableConfig.getTableName());
+ tasks.add(new PinotTaskConfig(_taskType, configs));
+ }
+ return tasks;
+ }
+
+ @Override
+ public List<PinotTaskConfig> generateTasks(TableConfig tableConfig,
Map<String, String> taskConfigs) {
+ if (_generationDelay > 0) {
+ try {
+ Thread.sleep(_generationDelay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ _taskGenerationCount++;
+ Map<String, String> configs = new HashMap<>();
+ configs.put("tableName", tableConfig.getTableName());
+ return Collections.singletonList(new PinotTaskConfig(_taskType,
configs));
+ }
+
+ @Override
+ public void generateTasks(List<TableConfig> tableConfigs,
List<PinotTaskConfig> pinotTaskConfigs) {
+ if (_generationDelay > 0) {
+ try {
+ Thread.sleep(_generationDelay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ _taskGenerationCount++;
+ Map<String, String> configs = new HashMap<>();
+ configs.put("tableName", tableConfigs.get(0).getTableName());
+ pinotTaskConfigs.add(new PinotTaskConfig(_taskType, configs));
+ }
+ }
+
+ /**
+ * Test scenario: Tests the forceReleaseLock API functionality.
+ * Verifies that when forceReleaseLock is called during task execution,
+ * the lock is released and another createTask can be started for the same
table.
+ */
+ @Test
+ public void testForceReleaseLockDuringTaskExecution() throws Exception {
+ // Setup controller with distributed locking enabled
+ Map<String, Object> properties = getDefaultControllerConfiguration();
+
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
true);
+
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
true);
+
+ startController(properties);
+ PinotTaskManager taskManager = _controllerStarter.getTaskManager();
+
+ try {
+ // Setup test environment
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+ // Create and register a slow task generator that will hold the lock for
a long time
+ ControllableTaskGenerator slowGenerator = new
ControllableTaskGenerator(TEST_TASK_TYPE);
+ slowGenerator.setGenerationDelay(5000); // 5 second delay to simulate
long-running task
+ ClusterInfoAccessor clusterInfoAccessor =
Mockito.mock(ClusterInfoAccessor.class);
+ slowGenerator.init(clusterInfoAccessor);
+ taskManager.registerTaskGenerator(slowGenerator);
+
+ // Ensure task queue exists
+
_controllerStarter.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+ // Create schema and table
+ Schema schema = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema);
+ createSingleTestTable(RAW_TABLE_NAME_1);
+
+ String tableNameWithType =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1);
+
+ // Wait for controller to be fully initialized
+ Thread.sleep(1000);
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch firstTaskStarted = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(2);
+
+ AtomicInteger firstTaskCompleted = new AtomicInteger(0);
+ AtomicInteger secondTaskCompleted = new AtomicInteger(0);
+ AtomicInteger forceReleaseResult = new AtomicInteger(0);
+ AtomicInteger forceReleaseResultException = new AtomicInteger(0);
+
+ // First createTask - this will acquire the lock and run for 5 seconds
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ firstTaskStarted.countDown(); // Signal that first task has started
+ Map<String, String> result = taskManager.createTask(TEST_TASK_TYPE,
RAW_TABLE_NAME_1, null, new HashMap<>());
+ if (result != null && !result.isEmpty()) {
+ firstTaskCompleted.incrementAndGet();
+ }
+ } catch (Exception ignored) {
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Second thread: Wait for first task to start, then force release lock
and try another createTask
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ firstTaskStarted.await(); // Wait for first task to start
+ Thread.sleep(1000); // Let first task run for a bit to ensure it has
the lock
+
+ // Force release the lock while first task is still running
+ try {
+ taskManager.forceReleaseLock(tableNameWithType);
+ forceReleaseResult.incrementAndGet();
+ } catch (Exception e) {
+ forceReleaseResultException.incrementAndGet();
+ }
+
+ // Now try to create another task for the same table - this should
succeed since lock was released
+ Thread.sleep(500); // Small delay to ensure lock release is processed
+ Map<String, String> result = taskManager.createTask(TEST_TASK_TYPE,
RAW_TABLE_NAME_1, null, new HashMap<>());
+ if (result != null && !result.isEmpty()) {
+ secondTaskCompleted.incrementAndGet();
+ }
+ } catch (Exception ignored) {
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Start both operations
+ startLatch.countDown();
+
+ // Wait for completion
+ assertTrue(completionLatch.await(15, TimeUnit.SECONDS), "Tasks should
complete within 15 seconds");
+
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS), "Executor
should terminate");
+
+ // Verify results
+ assertEquals(forceReleaseResult.get(), 1, "forceReleaseLock should
succeed");
+ assertEquals(forceReleaseResultException.get(), 0, "forceReleaseLock
shouldn't throw exception");
+ assertEquals(firstTaskCompleted.get(), 1, "First createTask should
complete successfully");
+ assertEquals(secondTaskCompleted.get(), 1, "Second createTask should
succeed after lock was force released");
+
+ // Verify that both tasks were generated (first task continues even
after lock release)
+ assertEquals(slowGenerator.getTaskGenerationCount(), 2, "Both createTask
calls should have generated tasks");
+ } finally {
+ // Cleanup
+ try {
+ // Cancel all running tasks before dropping tables
+ Map<String, TaskState> taskStates =
_controllerStarter.getHelixTaskResourceManager()
+ .getTaskStates(TEST_TASK_TYPE);
+ for (String taskName : taskStates.keySet()) {
+ try {
+
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+ } catch (Exception e) {
+ // Ignore individual task deletion errors
+ }
+ }
+ Thread.sleep(1000); // Give time for task cancellation
+
+ dropOfflineTable(RAW_TABLE_NAME_1);
+ } catch (Exception ignored) {
+ }
+ }
+ }
+
+ /**
+ * Test scenario: Tests the forceReleaseLock API functionality to ensure it
throws exception if no lock is present
+ */
+ @Test
+ public void testForceReleaseLockWhenNoLockExists() throws Exception {
+ // Setup controller with distributed locking enabled
+ Map<String, Object> properties = getDefaultControllerConfiguration();
+
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
true);
+
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DISTRIBUTED_LOCKING,
true);
+
+ startController(properties);
+ PinotTaskManager taskManager = _controllerStarter.getTaskManager();
+
+ try {
+ // Setup test environment
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeMinionInstancesToAutoJoinHelixCluster(1);
+
+ // Create and register a slow task generator that will hold the lock for
a long time
+ ControllableTaskGenerator slowGenerator = new
ControllableTaskGenerator(TEST_TASK_TYPE);
+ slowGenerator.setGenerationDelay(1000); // 1 second delay to simulate
long-running task
+ ClusterInfoAccessor clusterInfoAccessor =
Mockito.mock(ClusterInfoAccessor.class);
+ slowGenerator.init(clusterInfoAccessor);
+ taskManager.registerTaskGenerator(slowGenerator);
+
+ // Ensure task queue exists
+
_controllerStarter.getHelixTaskResourceManager().ensureTaskQueueExists(TEST_TASK_TYPE);
+
+ // Create schema and table
+ Schema schema = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME_1)
+ .addSingleValueDimension("testCol",
FieldSpec.DataType.STRING).build();
+ addSchema(schema);
+ createSingleTestTable(RAW_TABLE_NAME_1);
+
+ String tableNameWithType =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_1);
+
+ // Wait for controller to be fully initialized
+ Thread.sleep(1000);
+
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(1);
+
+ AtomicInteger firstTaskCompleted = new AtomicInteger(0);
+ AtomicInteger forceReleaseResult = new AtomicInteger(0);
+ AtomicInteger forceReleaseResultException = new AtomicInteger(0);
+
+ // Force release lock and try a createTask
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+
+ // Force release the lock while first task is still running
+ try {
+ taskManager.forceReleaseLock(tableNameWithType);
+ forceReleaseResult.incrementAndGet();
+ } catch (Exception e) {
+ forceReleaseResultException.incrementAndGet();
+ }
+
+ // Now try to create another task for the same table - this should
succeed since lock was released
+ Thread.sleep(500); // Small delay to ensure lock release is processed
+ Map<String, String> result = taskManager.createTask(TEST_TASK_TYPE,
RAW_TABLE_NAME_1, null, new HashMap<>());
+ if (result != null && !result.isEmpty()) {
+ firstTaskCompleted.incrementAndGet();
+ }
+ } catch (Exception ignored) {
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Start operation
+ startLatch.countDown();
+
+ // Wait for completion
+ assertTrue(completionLatch.await(15, TimeUnit.SECONDS), "Tasks should
complete within 15 seconds");
+
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS), "Executor
should terminate");
+
+ // Verify results
+ assertEquals(forceReleaseResult.get(), 0,
+ "forceReleaseLock should have thrown exception as no lock should be
held");
+ assertEquals(forceReleaseResultException.get(), 1, "forceReleaseLock
should throw an exception");
+ assertEquals(firstTaskCompleted.get(), 1, "First createTask should
complete successfully");
+
+ // Verify that both tasks were generated (first task continues even
after lock release)
+ assertEquals(slowGenerator.getTaskGenerationCount(), 1, "Both createTask
calls should have generated tasks");
+ } finally {
+ // Cleanup
+ try {
+ // Cancel all running tasks before dropping tables
+ Map<String, TaskState> taskStates =
_controllerStarter.getHelixTaskResourceManager()
+ .getTaskStates(TEST_TASK_TYPE);
+ for (String taskName : taskStates.keySet()) {
+ try {
+
_controllerStarter.getHelixTaskResourceManager().deleteTask(taskName, true);
+ } catch (Exception e) {
+ // Ignore individual task deletion errors
+ }
+ }
+ Thread.sleep(1000); // Give time for task cancellation
+
+ dropOfflineTable(RAW_TABLE_NAME_1);
+ } catch (Exception ignored) {
+ }
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 000aab90c80..b8fc93363df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -166,5 +166,6 @@ public class Actions {
public static final String UPLOAD_SEGMENT = "UploadSegment";
public static final String VALIDATE_SCHEMA = "ValidateSchema";
public static final String VALIDATE_TABLE_CONFIGS = "ValidateTableConfigs";
+ public static final String FORCE_RELEASE_TASK_GENERATION_LOCK =
"ForceReleaseTaskGenerationLock";
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]