This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2bf80df1768 Allow manual scheudling of PeriodicTask even if they have
auto-scheduling turned off (#16934)
2bf80df1768 is described below
commit 2bf80df1768fbde1aad3a3e55518b43c984d40cf
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Oct 1 15:10:56 2025 -0700
Allow manual scheudling of PeriodicTask even if they have auto-scheduling
turned off (#16934)
---
.../controller/helix/RealtimeConsumerMonitor.java | 5 --
.../controller/helix/SegmentStatusChecker.java | 8 --
.../core/periodictask/PeriodicTaskScheduler.java | 89 +++++++++-------------
.../periodictask/PeriodicTaskSchedulerTest.java | 34 +++++++--
4 files changed, 60 insertions(+), 76 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
index 50e530bfc5b..b4246f05123 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
@@ -63,11 +63,6 @@ public class RealtimeConsumerMonitor extends
ControllerPeriodicTask<RealtimeCons
pinotHelixResourceManager));
}
- @Override
- protected void setUpTask() {
- LOGGER.info("Setting up RealtimeConsumerMonitor task");
- }
-
@Override
protected void processTable(String tableNameWithType) {
if
(!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType)))
{
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 9a09761b815..ab20f6fb745 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -99,10 +99,6 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
_tableSizeReader = tableSizeReader;
}
- @Override
- protected void setUpTask() {
- }
-
@Override
protected Context preprocess(Properties periodicTaskProperties) {
Context context = new Context();
@@ -523,10 +519,6 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
}
}
- @Override
- public void cleanUpTask() {
- }
-
public static final class Context {
private boolean _logDisabledTables;
private int _realTimeTableCount;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
index 977f72effa0..37b63dd6f41 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -18,8 +18,12 @@
*/
package org.apache.pinot.core.periodictask;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -36,33 +40,19 @@ public class PeriodicTaskScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PeriodicTaskScheduler.class);
private ScheduledExecutorService _executorService;
- private List<PeriodicTask> _tasksWithValidInterval;
- private volatile int _taskCount;
+ private Map<String, PeriodicTask> _periodicTasks;
/**
* Initializes the periodic task scheduler with a list of periodic tasks.
*/
public void init(List<PeriodicTask> periodicTasks) {
- _tasksWithValidInterval = new ArrayList<>();
+ _periodicTasks = Maps.newHashMapWithExpectedSize(periodicTasks.size());
for (PeriodicTask periodicTask : periodicTasks) {
- if (periodicTask.getIntervalInSeconds() > 0) {
- LOGGER.info("Adding periodic task: {}", periodicTask);
- _tasksWithValidInterval.add(periodicTask);
- } else {
- LOGGER.info("Skipping periodic task: {}", periodicTask);
- }
+ LOGGER.info("Adding periodic task: {}", periodicTask);
+ String periodicTaskName = periodicTask.getTaskName();
+ Preconditions.checkState(_periodicTasks.put(periodicTaskName,
periodicTask) == null,
+ "Duplicate periodic task name: %s", periodicTaskName);
}
-
- _taskCount = _tasksWithValidInterval.size();
- }
-
- /**
- * Get number of tasks scheduled. Method is thread safe since task list is
not modified after it is
- * initialized in {@link #init} method.
- * @return
- */
- public int getPeriodicTaskCount() {
- return _taskCount;
}
/**
@@ -73,17 +63,24 @@ public class PeriodicTaskScheduler {
LOGGER.warn("Periodic task scheduler already started");
}
- if (_tasksWithValidInterval.isEmpty()) {
+ if (_periodicTasks.isEmpty()) {
LOGGER.warn("No periodic task scheduled");
} else {
- LOGGER.info("Starting periodic task scheduler with tasks: {}",
_tasksWithValidInterval);
- _executorService =
Executors.newScheduledThreadPool(_tasksWithValidInterval.size());
- for (PeriodicTask periodicTask : _tasksWithValidInterval) {
+ Collection<PeriodicTask> periodicTasks = _periodicTasks.values();
+ LOGGER.info("Starting periodic task scheduler with tasks: {}",
periodicTasks);
+ _executorService =
Executors.newScheduledThreadPool(_periodicTasks.size());
+ for (PeriodicTask periodicTask : periodicTasks) {
periodicTask.start();
+ String periodicTaskTaskName = periodicTask.getTaskName();
+ long intervalInSeconds = periodicTask.getIntervalInSeconds();
+ if (intervalInSeconds <= 0) {
+ LOGGER.info("Skip scheduling periodic task: {} for periodic
execution (it can be manually triggered)",
+ periodicTaskTaskName);
+ continue;
+ }
_executorService.scheduleWithFixedDelay(() -> {
try {
- LOGGER.info("Starting {} with running frequency of {} seconds.",
periodicTask.getTaskName(),
- periodicTask.getIntervalInSeconds());
+ LOGGER.info("Starting {} with running frequency of {} seconds.",
periodicTaskTaskName, intervalInSeconds);
periodicTask.run();
} catch (Throwable e) {
// catch all errors to prevent subsequent executions from being
silently suppressed
@@ -91,9 +88,9 @@ public class PeriodicTaskScheduler {
// See <a
href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService
//
.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-">Ref</a>
// </pre>
- LOGGER.warn("Caught exception while running Task: {}",
periodicTask.getTaskName(), e);
+ LOGGER.warn("Caught exception while running Task: {}",
periodicTaskTaskName, e);
}
- }, periodicTask.getInitialDelayInSeconds(),
periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS);
+ }, periodicTask.getInitialDelayInSeconds(), intervalInSeconds,
TimeUnit.SECONDS);
}
}
}
@@ -108,38 +105,20 @@ public class PeriodicTaskScheduler {
_executorService = null;
}
- if (_tasksWithValidInterval != null) {
- LOGGER.info("Stopping all periodic tasks: {}", _tasksWithValidInterval);
- _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
+ if (_periodicTasks != null) {
+ LOGGER.info("Stopping all periodic tasks: {}", _periodicTasks);
+ _periodicTasks.values().parallelStream().forEach(PeriodicTask::stop);
}
}
- /** @return true if task with given name exists; otherwise, false. */
+ /// Returns true if the task exists (regardless of whether it is scheduled
to run periodically or not).
public boolean hasTask(String periodicTaskName) {
- for (PeriodicTask task : _tasksWithValidInterval) {
- if (task.getTaskName().equals(periodicTaskName)) {
- return true;
- }
- }
- return false;
+ return _periodicTasks.containsKey(periodicTaskName);
}
- /** @return List of tasks name that will run periodically. */
+ /// Returns the list of all registered task names.
public List<String> getTaskNames() {
- List<String> taskNameList = new ArrayList<>();
- for (PeriodicTask task : _tasksWithValidInterval) {
- taskNameList.add(task.getTaskName());
- }
- return taskNameList;
- }
-
- private PeriodicTask getPeriodicTask(String periodicTaskName) {
- for (PeriodicTask task : _tasksWithValidInterval) {
- if (task.getTaskName().equals(periodicTaskName)) {
- return task;
- }
- }
- return null;
+ return new ArrayList<>(_periodicTasks.keySet());
}
/** Execute {@link PeriodicTask} immediately on the specified table. */
@@ -147,9 +126,9 @@ public class PeriodicTaskScheduler {
// During controller deployment, each controller can have a slightly
different list of periodic tasks if we add,
// remove, or rename periodic task. To avoid this situation, we check
again (besides the check at controller API
// level) whether the periodic task exists.
- PeriodicTask periodicTask = getPeriodicTask(periodicTaskName);
+ PeriodicTask periodicTask = _periodicTasks.get(periodicTaskName);
if (periodicTask == null) {
- LOGGER.error("Unknown Periodic Task {}", periodicTaskName);
+ LOGGER.error("Unknown periodic task: {}", periodicTaskName);
return;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index 466a95c5698..d8486e8ee83 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -19,17 +19,17 @@
package org.apache.pinot.core.periodictask;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.testng.Assert;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
public class PeriodicTaskSchedulerTest {
@@ -41,7 +41,7 @@ public class PeriodicTaskSchedulerTest {
AtomicBoolean runCalled = new AtomicBoolean();
AtomicBoolean stopCalled = new AtomicBoolean();
- List<PeriodicTask> periodicTasks = Collections.singletonList(new
BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
+ List<PeriodicTask> periodicTasks = List.of(new
BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
@Override
protected void setUpTask() {
startCalled.set(true);
@@ -60,13 +60,31 @@ public class PeriodicTaskSchedulerTest {
PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
taskScheduler.init(periodicTasks);
+ assertTrue(taskScheduler.hasTask("TestTask"));
+ assertEquals(taskScheduler.getTaskNames(), List.of("TestTask"));
+
taskScheduler.start();
Thread.sleep(100L);
taskScheduler.stop();
- assertFalse(startCalled.get());
+ assertTrue(startCalled.get());
assertFalse(runCalled.get());
- assertFalse(stopCalled.get());
+ assertTrue(stopCalled.get());
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
+ expectedExceptionsMessageRegExp = "Duplicate periodic task name:
TestTask")
+ public void testTasksWithDuplicateName() {
+ List<PeriodicTask> periodicTasks = new ArrayList<>(2);
+ for (int i = 0; i < 2; i++) {
+ periodicTasks.add(new BasePeriodicTask("TestTask", 1L, 0L) {
+ @Override
+ protected void runTask(Properties periodicTaskProperties) {
+ }
+ });
+ }
+ PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+ taskScheduler.init(periodicTasks);
}
@Test
@@ -79,7 +97,7 @@ public class PeriodicTaskSchedulerTest {
List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
for (int i = 0; i < numTasks; i++) {
- periodicTasks.add(new BasePeriodicTask("TestTask", 1L, 0L) {
+ periodicTasks.add(new BasePeriodicTask("TestTask" + i, 1L, 0L) {
@Override
protected void setUpTask() {
numTimesStartCalled.getAndIncrement();
@@ -138,7 +156,7 @@ public class PeriodicTaskSchedulerTest {
try {
if (_isRunning) {
// fail since task is already running in another thread.
- Assert.fail("More than one thread attempting to execute task at
the same time.");
+ fail("More than one thread attempting to execute task at the same
time.");
}
_isRunning = true;
Thread.sleep(200);
@@ -185,6 +203,6 @@ public class PeriodicTaskSchedulerTest {
taskScheduler.stop();
// Confirm that all threads requested execution, even though only half the
threads completed execution.
- Assert.assertEquals(attempts.get(), numThreads);
+ assertEquals(attempts.get(), numThreads);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]