amrishlal commented on a change in pull request #7174:
URL: https://github.com/apache/pinot/pull/7174#discussion_r687075235



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
##########
@@ -54,14 +54,22 @@ public ControllerPeriodicTask(String taskName, long 
runFrequencyInSeconds, long
     _controllerMetrics = controllerMetrics;
   }
 
+  // Determine if this task can run on the specified table. Task can run on 
all tables for which the controller is lead
+  // if "tablename"property is not set. However, if "tablename" property is 
set (by calling the /periodictask/run

Review comment:
       Fixed.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
##########
@@ -111,22 +126,49 @@ protected void setUpTask() {
    */
   @Override
   public final void run() {
-    _running = true;
+    try {
+      // Don't allow a task to run more than once at a time.
+      _runLock.lock();
+      _running = true;
 
-    if (_started) {
-      long startTime = System.currentTimeMillis();
-      LOGGER.info("Start running task: {}", _taskName);
-      try {
-        runTask();
-      } catch (Exception e) {
-        LOGGER.error("Caught exception while running task: {}", _taskName, e);
+      if (_started) {
+        long startTime = System.currentTimeMillis();
+        LOGGER.info("Start running task: {}", _taskName);
+        try {
+          runTask();
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while running task: {}", _taskName, 
e);
+        }
+        LOGGER.info("Finish running task: {} in {}ms", _taskName, 
System.currentTimeMillis() - startTime);
+      } else {
+        LOGGER.warn("Task: {} is skipped because it is not started or already 
stopped", _taskName);
       }
-      LOGGER.info("Finish running task: {} in {}ms", _taskName, 
System.currentTimeMillis() - startTime);
-    } else {
-      LOGGER.warn("Task: {} is skipped because it is not started or already 
stopped", _taskName);
+
+      _running = false;
+    } finally {
+       _runLock.unlock();
+       _running = false;
     }
+  }
 
-    _running = false;
+  @Override
+  public void run(@Nullable java.util.Properties periodicTaskProperties) {
+    java.util.Properties savedPeriodicTaskProperties = 
_activePeriodicTaskProperties;

Review comment:
       This is for initializing `savedPeriodicTaskProperties` which is used in 
the `finally` block.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
##########
@@ -108,4 +111,59 @@ public synchronized void stop() {
       _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
     }
   }
+
+  /** @return true if task with given name exists; otherwise, false. */
+  public boolean hasTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /** @return List of tasks name that will run periodically. */
+  public List<String> getTaskNameList() {
+    List<String> taskNameList = new ArrayList<>();
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      taskNameList.add(task.getTaskName());
+    }
+
+    return taskNameList;
+  }
+
+  private PeriodicTask getPeriodicTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return task;
+      }
+    }
+    return null;
+  }
+
+  /** Execute specified {@link PeriodicTask} immediately. */

Review comment:
       This is valid javadoc comment and follows coding convention. Writing 
short javadoc comments this way saves vertical space.

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
##########
@@ -104,4 +105,79 @@ protected void cleanUpTask() {
     assertEquals(numTimesRunCalled.get(), numTasks * 2);
     assertEquals(numTimesStopCalled.get(), numTasks);
   }
+
+
+  /** Test that {@link PeriodicTaskScheduler} does not run the same task more 
than once at any time. */
+  @Test
+  public void testConcurrentExecutionOfSameTask() throws Exception {
+    // Count how many tasks were run.
+    final AtomicInteger counter = new AtomicInteger();
+
+    // Count how many attempts were made to run task
+    final AtomicInteger attempts = new AtomicInteger();
+
+
+    // Create periodic task.
+    PeriodicTask task = new BasePeriodicTask("TestTask", 1L, 0L) {
+      private volatile boolean isRunning = false;
+      @Override
+      protected void runTask() {
+        try {
+          if (isRunning) {
+            Assert.fail("More than one thread attempting to execute task at 
the same time.");
+          }
+          isRunning = true;
+          counter.incrementAndGet();
+          Thread.sleep(250);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        } finally {
+          isRunning = false;
+        }
+      }
+    };
+
+    // Start scheduler with periodic task.
+    List<PeriodicTask> periodicTasks = new ArrayList<>();
+    periodicTasks.add(task);
+
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.init(periodicTasks);
+    taskScheduler.start();
+
+    // Create multiple "execute" threads that try to run the same task that is 
already being run by scheduler
+    // on a periodic basis.
+    final int threadCount = 20;
+    Thread[] threads = new Thread[threadCount];
+    for (int i = 0; i < threads.length; i++) {
+      threads[i] = new Thread(() -> {
+          attempts.incrementAndGet();
+          taskScheduler.scheduleNow("TestTask", null);
+      });
+
+      threads[i].start();
+      try {
+        threads[i].join();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    //  Run for 3 seconds to let as many "execute" threads finish as possible.
+    Thread.sleep(3000);

Review comment:
       I played around with this a bit and 3 seconds appeared to be smallest 
reasonable value to set for this. It will slow down build time a bit, but will 
verify correctness.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to