amrishlal commented on a change in pull request #7174: URL: https://github.com/apache/pinot/pull/7174#discussion_r687075235
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java ########## @@ -54,14 +54,22 @@ public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long _controllerMetrics = controllerMetrics; } + // Determine if this task can run on the specified table. Task can run on all tables for which the controller is lead + // if "tablename"property is not set. However, if "tablename" property is set (by calling the /periodictask/run Review comment: Fixed. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java ########## @@ -111,22 +126,49 @@ protected void setUpTask() { */ @Override public final void run() { - _running = true; + try { + // Don't allow a task to run more than once at a time. + _runLock.lock(); + _running = true; - if (_started) { - long startTime = System.currentTimeMillis(); - LOGGER.info("Start running task: {}", _taskName); - try { - runTask(); - } catch (Exception e) { - LOGGER.error("Caught exception while running task: {}", _taskName, e); + if (_started) { + long startTime = System.currentTimeMillis(); + LOGGER.info("Start running task: {}", _taskName); + try { + runTask(); + } catch (Exception e) { + LOGGER.error("Caught exception while running task: {}", _taskName, e); + } + LOGGER.info("Finish running task: {} in {}ms", _taskName, System.currentTimeMillis() - startTime); + } else { + LOGGER.warn("Task: {} is skipped because it is not started or already stopped", _taskName); } - LOGGER.info("Finish running task: {} in {}ms", _taskName, System.currentTimeMillis() - startTime); - } else { - LOGGER.warn("Task: {} is skipped because it is not started or already stopped", _taskName); + + _running = false; + } finally { + _runLock.unlock(); + _running = false; } + } - _running = false; + @Override + public void run(@Nullable java.util.Properties periodicTaskProperties) { + java.util.Properties savedPeriodicTaskProperties = _activePeriodicTaskProperties; Review comment: This is for initializing `savedPeriodicTaskProperties` which is used in the `finally` block. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java ########## @@ -108,4 +111,59 @@ public synchronized void stop() { _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop); } } + + /** @return true if task with given name exists; otherwise, false. */ + public boolean hasTask(String periodicTaskName) { + for (PeriodicTask task : _tasksWithValidInterval) { + if (task.getTaskName().equals(periodicTaskName)) { + return true; + } + } + + return false; + } + + /** @return List of tasks name that will run periodically. */ + public List<String> getTaskNameList() { + List<String> taskNameList = new ArrayList<>(); + for (PeriodicTask task : _tasksWithValidInterval) { + taskNameList.add(task.getTaskName()); + } + + return taskNameList; + } + + private PeriodicTask getPeriodicTask(String periodicTaskName) { + for (PeriodicTask task : _tasksWithValidInterval) { + if (task.getTaskName().equals(periodicTaskName)) { + return task; + } + } + return null; + } + + /** Execute specified {@link PeriodicTask} immediately. */ Review comment: This is valid javadoc comment and follows coding convention. Writing short javadoc comments this way saves vertical space. ########## File path: pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java ########## @@ -104,4 +105,79 @@ protected void cleanUpTask() { assertEquals(numTimesRunCalled.get(), numTasks * 2); assertEquals(numTimesStopCalled.get(), numTasks); } + + + /** Test that {@link PeriodicTaskScheduler} does not run the same task more than once at any time. */ + @Test + public void testConcurrentExecutionOfSameTask() throws Exception { + // Count how many tasks were run. + final AtomicInteger counter = new AtomicInteger(); + + // Count how many attempts were made to run task + final AtomicInteger attempts = new AtomicInteger(); + + + // Create periodic task. + PeriodicTask task = new BasePeriodicTask("TestTask", 1L, 0L) { + private volatile boolean isRunning = false; + @Override + protected void runTask() { + try { + if (isRunning) { + Assert.fail("More than one thread attempting to execute task at the same time."); + } + isRunning = true; + counter.incrementAndGet(); + Thread.sleep(250); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + isRunning = false; + } + } + }; + + // Start scheduler with periodic task. + List<PeriodicTask> periodicTasks = new ArrayList<>(); + periodicTasks.add(task); + + PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler(); + taskScheduler.init(periodicTasks); + taskScheduler.start(); + + // Create multiple "execute" threads that try to run the same task that is already being run by scheduler + // on a periodic basis. + final int threadCount = 20; + Thread[] threads = new Thread[threadCount]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + attempts.incrementAndGet(); + taskScheduler.scheduleNow("TestTask", null); + }); + + threads[i].start(); + try { + threads[i].join(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + // Run for 3 seconds to let as many "execute" threads finish as possible. + Thread.sleep(3000); Review comment: I played around with this a bit and 3 seconds appeared to be smallest reasonable value to set for this. It will slow down build time a bit, but will verify correctness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org