This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bf80df1768 Allow manual scheudling of PeriodicTask even if they have 
auto-scheduling turned off (#16934)
2bf80df1768 is described below

commit 2bf80df1768fbde1aad3a3e55518b43c984d40cf
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Oct 1 15:10:56 2025 -0700

    Allow manual scheudling of PeriodicTask even if they have auto-scheduling 
turned off (#16934)
---
 .../controller/helix/RealtimeConsumerMonitor.java  |  5 --
 .../controller/helix/SegmentStatusChecker.java     |  8 --
 .../core/periodictask/PeriodicTaskScheduler.java   | 89 +++++++++-------------
 .../periodictask/PeriodicTaskSchedulerTest.java    | 34 +++++++--
 4 files changed, 60 insertions(+), 76 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
index 50e530bfc5b..b4246f05123 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
@@ -63,11 +63,6 @@ public class RealtimeConsumerMonitor extends 
ControllerPeriodicTask<RealtimeCons
             pinotHelixResourceManager));
   }
 
-  @Override
-  protected void setUpTask() {
-    LOGGER.info("Setting up RealtimeConsumerMonitor task");
-  }
-
   @Override
   protected void processTable(String tableNameWithType) {
     if 
(!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType)))
 {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 9a09761b815..ab20f6fb745 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -99,10 +99,6 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
     _tableSizeReader = tableSizeReader;
   }
 
-  @Override
-  protected void setUpTask() {
-  }
-
   @Override
   protected Context preprocess(Properties periodicTaskProperties) {
     Context context = new Context();
@@ -523,10 +519,6 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
     }
   }
 
-  @Override
-  public void cleanUpTask() {
-  }
-
   public static final class Context {
     private boolean _logDisabledTables;
     private int _realTimeTableCount;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
index 977f72effa0..37b63dd6f41 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -18,8 +18,12 @@
  */
 package org.apache.pinot.core.periodictask;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -36,33 +40,19 @@ public class PeriodicTaskScheduler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PeriodicTaskScheduler.class);
 
   private ScheduledExecutorService _executorService;
-  private List<PeriodicTask> _tasksWithValidInterval;
-  private volatile int _taskCount;
+  private Map<String, PeriodicTask> _periodicTasks;
 
   /**
    * Initializes the periodic task scheduler with a list of periodic tasks.
    */
   public void init(List<PeriodicTask> periodicTasks) {
-    _tasksWithValidInterval = new ArrayList<>();
+    _periodicTasks = Maps.newHashMapWithExpectedSize(periodicTasks.size());
     for (PeriodicTask periodicTask : periodicTasks) {
-      if (periodicTask.getIntervalInSeconds() > 0) {
-        LOGGER.info("Adding periodic task: {}", periodicTask);
-        _tasksWithValidInterval.add(periodicTask);
-      } else {
-        LOGGER.info("Skipping periodic task: {}", periodicTask);
-      }
+      LOGGER.info("Adding periodic task: {}", periodicTask);
+      String periodicTaskName = periodicTask.getTaskName();
+      Preconditions.checkState(_periodicTasks.put(periodicTaskName, 
periodicTask) == null,
+          "Duplicate periodic task name: %s", periodicTaskName);
     }
-
-    _taskCount = _tasksWithValidInterval.size();
-  }
-
-  /**
-   * Get number of tasks scheduled. Method is thread safe since task list is 
not modified after it is
-   * initialized in {@link #init} method.
-   * @return
-   */
-  public int getPeriodicTaskCount() {
-    return _taskCount;
   }
 
   /**
@@ -73,17 +63,24 @@ public class PeriodicTaskScheduler {
       LOGGER.warn("Periodic task scheduler already started");
     }
 
-    if (_tasksWithValidInterval.isEmpty()) {
+    if (_periodicTasks.isEmpty()) {
       LOGGER.warn("No periodic task scheduled");
     } else {
-      LOGGER.info("Starting periodic task scheduler with tasks: {}", 
_tasksWithValidInterval);
-      _executorService = 
Executors.newScheduledThreadPool(_tasksWithValidInterval.size());
-      for (PeriodicTask periodicTask : _tasksWithValidInterval) {
+      Collection<PeriodicTask> periodicTasks = _periodicTasks.values();
+      LOGGER.info("Starting periodic task scheduler with tasks: {}", 
periodicTasks);
+      _executorService = 
Executors.newScheduledThreadPool(_periodicTasks.size());
+      for (PeriodicTask periodicTask : periodicTasks) {
         periodicTask.start();
+        String periodicTaskTaskName = periodicTask.getTaskName();
+        long intervalInSeconds = periodicTask.getIntervalInSeconds();
+        if (intervalInSeconds <= 0) {
+          LOGGER.info("Skip scheduling periodic task: {} for periodic 
execution (it can be manually triggered)",
+              periodicTaskTaskName);
+          continue;
+        }
         _executorService.scheduleWithFixedDelay(() -> {
           try {
-            LOGGER.info("Starting {} with running frequency of {} seconds.", 
periodicTask.getTaskName(),
-                periodicTask.getIntervalInSeconds());
+            LOGGER.info("Starting {} with running frequency of {} seconds.", 
periodicTaskTaskName, intervalInSeconds);
             periodicTask.run();
           } catch (Throwable e) {
             // catch all errors to prevent subsequent executions from being 
silently suppressed
@@ -91,9 +88,9 @@ public class PeriodicTaskScheduler {
             // See <a 
href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService
             // 
.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-">Ref</a>
             // </pre>
-            LOGGER.warn("Caught exception while running Task: {}", 
periodicTask.getTaskName(), e);
+            LOGGER.warn("Caught exception while running Task: {}", 
periodicTaskTaskName, e);
           }
-        }, periodicTask.getInitialDelayInSeconds(), 
periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS);
+        }, periodicTask.getInitialDelayInSeconds(), intervalInSeconds, 
TimeUnit.SECONDS);
       }
     }
   }
@@ -108,38 +105,20 @@ public class PeriodicTaskScheduler {
       _executorService = null;
     }
 
-    if (_tasksWithValidInterval != null) {
-      LOGGER.info("Stopping all periodic tasks: {}", _tasksWithValidInterval);
-      _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
+    if (_periodicTasks != null) {
+      LOGGER.info("Stopping all periodic tasks: {}", _periodicTasks);
+      _periodicTasks.values().parallelStream().forEach(PeriodicTask::stop);
     }
   }
 
-  /** @return true if task with given name exists; otherwise, false. */
+  /// Returns true if the task exists (regardless of whether it is scheduled 
to run periodically or not).
   public boolean hasTask(String periodicTaskName) {
-    for (PeriodicTask task : _tasksWithValidInterval) {
-      if (task.getTaskName().equals(periodicTaskName)) {
-        return true;
-      }
-    }
-    return false;
+    return _periodicTasks.containsKey(periodicTaskName);
   }
 
-  /** @return List of tasks name that will run periodically. */
+  /// Returns the list of all registered task names.
   public List<String> getTaskNames() {
-    List<String> taskNameList = new ArrayList<>();
-    for (PeriodicTask task : _tasksWithValidInterval) {
-      taskNameList.add(task.getTaskName());
-    }
-    return taskNameList;
-  }
-
-  private PeriodicTask getPeriodicTask(String periodicTaskName) {
-    for (PeriodicTask task : _tasksWithValidInterval) {
-      if (task.getTaskName().equals(periodicTaskName)) {
-        return task;
-      }
-    }
-    return null;
+    return new ArrayList<>(_periodicTasks.keySet());
   }
 
   /** Execute {@link PeriodicTask} immediately on the specified table. */
@@ -147,9 +126,9 @@ public class PeriodicTaskScheduler {
     // During controller deployment, each controller can have a slightly 
different list of periodic tasks if we add,
     // remove, or rename periodic task. To avoid this situation, we check 
again (besides the check at controller API
     // level) whether the periodic task exists.
-    PeriodicTask periodicTask = getPeriodicTask(periodicTaskName);
+    PeriodicTask periodicTask = _periodicTasks.get(periodicTaskName);
     if (periodicTask == null) {
-      LOGGER.error("Unknown Periodic Task {}", periodicTaskName);
+      LOGGER.error("Unknown periodic task: {}", periodicTaskName);
       return;
     }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index 466a95c5698..d8486e8ee83 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -19,17 +19,17 @@
 package org.apache.pinot.core.periodictask;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 
 public class PeriodicTaskSchedulerTest {
@@ -41,7 +41,7 @@ public class PeriodicTaskSchedulerTest {
     AtomicBoolean runCalled = new AtomicBoolean();
     AtomicBoolean stopCalled = new AtomicBoolean();
 
-    List<PeriodicTask> periodicTasks = Collections.singletonList(new 
BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
+    List<PeriodicTask> periodicTasks = List.of(new 
BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
       @Override
       protected void setUpTask() {
         startCalled.set(true);
@@ -60,13 +60,31 @@ public class PeriodicTaskSchedulerTest {
 
     PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
     taskScheduler.init(periodicTasks);
+    assertTrue(taskScheduler.hasTask("TestTask"));
+    assertEquals(taskScheduler.getTaskNames(), List.of("TestTask"));
+
     taskScheduler.start();
     Thread.sleep(100L);
     taskScheduler.stop();
 
-    assertFalse(startCalled.get());
+    assertTrue(startCalled.get());
     assertFalse(runCalled.get());
-    assertFalse(stopCalled.get());
+    assertTrue(stopCalled.get());
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class,
+      expectedExceptionsMessageRegExp = "Duplicate periodic task name: 
TestTask")
+  public void testTasksWithDuplicateName() {
+    List<PeriodicTask> periodicTasks = new ArrayList<>(2);
+    for (int i = 0; i < 2; i++) {
+      periodicTasks.add(new BasePeriodicTask("TestTask", 1L, 0L) {
+        @Override
+        protected void runTask(Properties periodicTaskProperties) {
+        }
+      });
+    }
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.init(periodicTasks);
   }
 
   @Test
@@ -79,7 +97,7 @@ public class PeriodicTaskSchedulerTest {
 
     List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
     for (int i = 0; i < numTasks; i++) {
-      periodicTasks.add(new BasePeriodicTask("TestTask", 1L, 0L) {
+      periodicTasks.add(new BasePeriodicTask("TestTask" + i, 1L, 0L) {
         @Override
         protected void setUpTask() {
           numTimesStartCalled.getAndIncrement();
@@ -138,7 +156,7 @@ public class PeriodicTaskSchedulerTest {
         try {
           if (_isRunning) {
             // fail since task is already running in another thread.
-            Assert.fail("More than one thread attempting to execute task at 
the same time.");
+            fail("More than one thread attempting to execute task at the same 
time.");
           }
           _isRunning = true;
           Thread.sleep(200);
@@ -185,6 +203,6 @@ public class PeriodicTaskSchedulerTest {
     taskScheduler.stop();
 
     // Confirm that all threads requested execution, even though only half the 
threads completed execution.
-    Assert.assertEquals(attempts.get(), numThreads);
+    assertEquals(attempts.get(), numThreads);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to