Jackie-Jiang commented on a change in pull request #7174:
URL: https://github.com/apache/pinot/pull/7174#discussion_r689779004



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerMessageHandlerFactory.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller;
+
+import java.util.Properties;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** Factory class for creating message handlers for incoming helix messages. */
+public class ControllerMessageHandlerFactory implements MessageHandlerFactory {

Review comment:
       Suggest renaming it to `ControllerUserDefinedMessageHandlerFactory` to 
be more specific

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
##########
@@ -149,27 +190,17 @@ public final synchronized void stop() {
     }
     _started = false;
 
-    if (_running) {
-      long startTimeMs = System.currentTimeMillis();
-      long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS;
-      LOGGER.info("Task: {} is running, wait for at most {}ms for it to 
finish", _taskName, remainingTimeMs);
-      while (_running && remainingTimeMs > 0L) {
-        long sleepTimeMs = Long.min(remainingTimeMs, 1000L);
-        remainingTimeMs -= sleepTimeMs;
-        try {
-          Thread.sleep(sleepTimeMs);
-        } catch (InterruptedException e) {
-          LOGGER.error("Caught InterruptedException while waiting for task: {} 
to finish", _taskName);
-          Thread.currentThread().interrupt();
-          break;
-        }
-      }
-      long waitTimeMs = System.currentTimeMillis() - startTimeMs;
-      if (_running) {
-        LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs);
+    try {
+      // check if task is done running, or wait for the task to get done, by 
trying to acquire runLock.
+      if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, 
TimeUnit.MILLISECONDS)) {
+        LOGGER.warn("Task: {} did not finish within timeout of {}ms", 
MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
       } else {
-        LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
+        LOGGER.info("Task: {} finished within timeout of {}ms", 
MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
       }
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    } finally {
+      _runLock.unlock();

Review comment:
       (Critical) The `_runLock.unlock();` should be called only when the 
`tryLock()` succeeds, or it will throw exception

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
##########
@@ -108,4 +111,62 @@ public synchronized void stop() {
       _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
     }
   }
+
+  /** @return true if task with given name exists; otherwise, false. */
+  public boolean hasTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** @return List of tasks name that will run periodically. */
+  public List<String> getTaskNameList() {
+    List<String> taskNameList = new ArrayList<>();
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      taskNameList.add(task.getTaskName());
+    }
+    return taskNameList;
+  }
+
+  private PeriodicTask getPeriodicTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return task;
+      }
+    }
+    return null;
+  }
+
+  /** Execute {@link PeriodicTask} immediately on the specified table. */
+  public void scheduleNow(String periodicTaskName, @Nullable Properties 
periodicTaskProperties) {

Review comment:
       `properties` cannot be `null` here, or it will throw NPE

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
##########
@@ -108,4 +111,62 @@ public synchronized void stop() {
       _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
     }
   }
+
+  /** @return true if task with given name exists; otherwise, false. */
+  public boolean hasTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** @return List of tasks name that will run periodically. */
+  public List<String> getTaskNameList() {
+    List<String> taskNameList = new ArrayList<>();
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      taskNameList.add(task.getTaskName());
+    }
+    return taskNameList;
+  }
+
+  private PeriodicTask getPeriodicTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return task;
+      }
+    }
+    return null;
+  }
+
+  /** Execute {@link PeriodicTask} immediately on the specified table. */
+  public void scheduleNow(String periodicTaskName, @Nullable Properties 
periodicTaskProperties) {
+    // During controller deployment, each controller can have a slightly 
different list of periodic tasks if we add,
+    // remove, or rename periodic task. To avoid this situation, we check 
again (besides the check at controller API
+    // level) whether the periodic task exists.
+    PeriodicTask periodicTask = getPeriodicTask(periodicTaskName);
+    if (periodicTask == null) {
+      throw new IllegalArgumentException("Unknown Periodic Task " + 
periodicTaskName);
+    }
+
+    String taskRequestId = 
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_REQUEST_ID).toString();
+
+    LOGGER.info(
+        "[TaskRequestId: {}] Schedule task '{}' to run immediately. If the 
task is already running, this run will wait until the current run finishes.",
+        taskRequestId, periodicTaskName);
+    _executorService.schedule(() -> {
+      try {
+        // Run the periodic task using the specified parameters. The call to 
run() method will block if another thread
+        // (the periodic execution thread or another thread calling this 
method) is already in the process of
+        // running the same task.
+        periodicTask.run(periodicTaskProperties);
+      } catch (Throwable t) {
+        // catch all errors to prevent subsequent executions from being 
silently suppressed
+        // Ref: 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
+        LOGGER.warn("[TaskRequestId: {}] Caught exception while attempting to 
execute named periodic task: {}",

Review comment:
       I feel we should log `error` instead

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
##########
@@ -28,6 +29,10 @@
 @ThreadSafe
 public interface PeriodicTask extends Runnable {
 
+  // PeriodicTask objects may take a {@link Properties} object. Define all the 
keys property keys here.
+  String PROPERTY_KEY_REQUEST_ID = "requestid";

Review comment:
       Can we match these 2 keys with the ones we used in the message?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
##########
@@ -149,27 +190,17 @@ public final synchronized void stop() {
     }
     _started = false;
 
-    if (_running) {
-      long startTimeMs = System.currentTimeMillis();
-      long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS;
-      LOGGER.info("Task: {} is running, wait for at most {}ms for it to 
finish", _taskName, remainingTimeMs);
-      while (_running && remainingTimeMs > 0L) {
-        long sleepTimeMs = Long.min(remainingTimeMs, 1000L);
-        remainingTimeMs -= sleepTimeMs;
-        try {
-          Thread.sleep(sleepTimeMs);
-        } catch (InterruptedException e) {
-          LOGGER.error("Caught InterruptedException while waiting for task: {} 
to finish", _taskName);
-          Thread.currentThread().interrupt();
-          break;
-        }
-      }
-      long waitTimeMs = System.currentTimeMillis() - startTimeMs;
-      if (_running) {
-        LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs);
+    try {
+      // check if task is done running, or wait for the task to get done, by 
trying to acquire runLock.
+      if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, 
TimeUnit.MILLISECONDS)) {
+        LOGGER.warn("Task: {} did not finish within timeout of {}ms", 
MAX_PERIODIC_TASK_STOP_TIME_MILLIS);

Review comment:
       Missing `_taskName`

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.List;
+import java.util.UUID;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.PERIODIC_TASK_TAG)
+@Path("/periodictask")
+public class PinotControllerPeriodicTaskRestletResource {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class);
+  private static final String API_REQUEST_ID_PREFIX = "api-";
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  PeriodicTaskScheduler _periodicTaskScheduler;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/run")
+  @ApiOperation(value = "Run a periodic task against specified table. If no 
table name is specified, task will run against all tables.")
+  public String runPeriodicTask(
+      @ApiParam(value = "Periodic task name", required = true) 
@QueryParam("taskname") String periodicTaskName,
+      @ApiParam(value = "Table name", required = false) 
@QueryParam("tablename") String tableName,
+      @ApiParam(value = "Table type suffix", required = false, example = 
"OFFLINE | REALTIME", defaultValue = "OFFLINE") @QueryParam("tabletype") String 
tableType) {
+    if (!_periodicTaskScheduler.hasTask(periodicTaskName)) {
+      throw new WebApplicationException("Periodic task '" + periodicTaskName + 
"' not found.",
+          Response.Status.NOT_FOUND);
+    }
+
+    if (tableName != null) {
+      tableName = tableName.trim();
+      if (tableName.length() > 0 && 
!_pinotHelixResourceManager.getAllRawTables().contains(tableName)) {
+        throw new WebApplicationException("Table '" + tableName + "' not 
found.", Response.Status.NOT_FOUND);
+      }
+    }
+
+    // Generate an id for this request by taking first eight characters of a 
randomly generated UUID. This request id
+    // is returned to the user and also appended to log messages so that user 
can locate all log messages associated
+    // with this PeriodicTask's execution.
+    String periodicTaskRequestId = API_REQUEST_ID_PREFIX + 
UUID.randomUUID().toString().substring(0,8);
+
+    LOGGER.info("[TaskRequestId: {}] Sending periodic task execution message 
to all controllers for running task {} against {}.",
+        periodicTaskRequestId, periodicTaskName, tableName != null ? " table 
'" + tableName + "'" : "all tables");
+
+    // Create and send message to send to all controllers (including this one)
+    Criteria recipientCriteria = new Criteria();
+    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setSessionSpecific(true);
+    
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+    recipientCriteria.setSelfExcluded(false);
+    RunPeriodicTaskMessage runPeriodicTaskMessage = new 
RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName,
+        (tableName != null && tableName.length() > 0) ? tableName + "_" + 
tableType : null);
+
+    ClusterMessagingService clusterMessagingService =
+        _pinotHelixResourceManager.getHelixZkManager().getMessagingService();
+    int messageCount = clusterMessagingService.send(recipientCriteria, 
runPeriodicTaskMessage, null, -1);
+    LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to 
{} controllers.", periodicTaskRequestId, messageCount);
+
+    return "Log Request Id: " + periodicTaskRequestId + ", Controllers 
notified: " + (messageCount > 0) + ".";

Review comment:
       Let's return a valid json as the response

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerMessageHandlerFactory.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller;
+
+import java.util.Properties;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** Factory class for creating message handlers for incoming helix messages. */
+public class ControllerMessageHandlerFactory implements MessageHandlerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerMessageHandlerFactory.class);
+  private static final String USER_DEFINED_MSG_STRING = 
Message.MessageType.USER_DEFINE_MSG.toString();
+
+  private final PeriodicTaskScheduler _periodicTaskScheduler;
+
+  public ControllerMessageHandlerFactory(PeriodicTaskScheduler 
periodicTaskScheduler) {
+    _periodicTaskScheduler = periodicTaskScheduler;
+  }
+
+  @Override
+  public MessageHandler createHandler(Message message, NotificationContext 
notificationContext) {
+    String messageType = message.getMsgSubType();
+    if 
(messageType.equals(RunPeriodicTaskMessage.RUN_PERIODIC_TASK_MSG_SUB_TYPE)) {
+      return new RunPeriodicTaskMessageHandler(new 
RunPeriodicTaskMessage(message), notificationContext,
+          _periodicTaskScheduler);
+    }
+
+    LOGGER.warn("Unknown message type {} received by controller. ", 
messageType);
+    return null;

Review comment:
       I think you may need a no-op message handler to avoid NPE (see 
`BrokerUserDefinedMessageHandlerFactory` as an example)

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.List;
+import java.util.UUID;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.PERIODIC_TASK_TAG)
+@Path("/periodictask")
+public class PinotControllerPeriodicTaskRestletResource {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class);
+  private static final String API_REQUEST_ID_PREFIX = "api-";
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  PeriodicTaskScheduler _periodicTaskScheduler;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/run")
+  @ApiOperation(value = "Run a periodic task against specified table. If no 
table name is specified, task will run against all tables.")
+  public String runPeriodicTask(
+      @ApiParam(value = "Periodic task name", required = true) 
@QueryParam("taskname") String periodicTaskName,
+      @ApiParam(value = "Table name", required = false) 
@QueryParam("tablename") String tableName,
+      @ApiParam(value = "Table type suffix", required = false, example = 
"OFFLINE | REALTIME", defaultValue = "OFFLINE") @QueryParam("tabletype") String 
tableType) {
+    if (!_periodicTaskScheduler.hasTask(periodicTaskName)) {
+      throw new WebApplicationException("Periodic task '" + periodicTaskName + 
"' not found.",
+          Response.Status.NOT_FOUND);
+    }
+
+    if (tableName != null) {
+      tableName = tableName.trim();
+      if (tableName.length() > 0 && 
!_pinotHelixResourceManager.getAllRawTables().contains(tableName)) {
+        throw new WebApplicationException("Table '" + tableName + "' not 
found.", Response.Status.NOT_FOUND);
+      }
+    }
+
+    // Generate an id for this request by taking first eight characters of a 
randomly generated UUID. This request id
+    // is returned to the user and also appended to log messages so that user 
can locate all log messages associated
+    // with this PeriodicTask's execution.
+    String periodicTaskRequestId = API_REQUEST_ID_PREFIX + 
UUID.randomUUID().toString().substring(0,8);

Review comment:
       (code format) reformat

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.List;
+import java.util.UUID;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.PERIODIC_TASK_TAG)
+@Path("/periodictask")
+public class PinotControllerPeriodicTaskRestletResource {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class);
+  private static final String API_REQUEST_ID_PREFIX = "api-";
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  PeriodicTaskScheduler _periodicTaskScheduler;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/run")
+  @ApiOperation(value = "Run a periodic task against specified table. If no 
table name is specified, task will run against all tables.")
+  public String runPeriodicTask(
+      @ApiParam(value = "Periodic task name", required = true) 
@QueryParam("taskname") String periodicTaskName,
+      @ApiParam(value = "Table name", required = false) 
@QueryParam("tablename") String tableName,
+      @ApiParam(value = "Table type suffix", required = false, example = 
"OFFLINE | REALTIME", defaultValue = "OFFLINE") @QueryParam("tabletype") String 
tableType) {

Review comment:
       Let's match the convention for other APIs (Check 
`PinotSegmentRestletResource.java` for some examples
   ```suggestion
         @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
         @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr) {
   ```
   
   Don't give default value to table type because that will prevent the api to 
match the realtime table

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
##########
@@ -108,4 +111,62 @@ public synchronized void stop() {
       _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
     }
   }
+
+  /** @return true if task with given name exists; otherwise, false. */
+  public boolean hasTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** @return List of tasks name that will run periodically. */
+  public List<String> getTaskNameList() {

Review comment:
       (nit) `getTaskNames()`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
##########
@@ -29,21 +33,31 @@
 @ThreadSafe
 public abstract class BasePeriodicTask implements PeriodicTask {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BasePeriodicTask.class);
+  private static final String DEFAULT_REQUEST_ID = "auto";
 
   // Wait for at most 30 seconds while calling stop() for task to terminate
   private static final long MAX_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
 
   protected final String _taskName;
   protected final long _intervalInSeconds;
   protected final long _initialDelayInSeconds;
+  protected final ReentrantLock _runLock;
 
   private volatile boolean _started;
   private volatile boolean _running;
 
+  // Properties that task may use during execution. null by default.
+  protected Properties _activePeriodicTaskProperties;

Review comment:
       (Major)
   Keep swapping the `_activePeriodicTaskProperties` and expect the child class 
to use it to identify the table name is hard to manage.
   We should add an abstract method `runTask(Properties 
periodicTaskProperties)` and the child class can implement accordingly

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
##########
@@ -108,4 +111,62 @@ public synchronized void stop() {
       _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
     }
   }
+
+  /** @return true if task with given name exists; otherwise, false. */
+  public boolean hasTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** @return List of tasks name that will run periodically. */
+  public List<String> getTaskNameList() {
+    List<String> taskNameList = new ArrayList<>();
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      taskNameList.add(task.getTaskName());
+    }
+    return taskNameList;
+  }
+
+  private PeriodicTask getPeriodicTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return task;
+      }
+    }
+    return null;
+  }
+
+  /** Execute {@link PeriodicTask} immediately on the specified table. */
+  public void scheduleNow(String periodicTaskName, @Nullable Properties 
periodicTaskProperties) {
+    // During controller deployment, each controller can have a slightly 
different list of periodic tasks if we add,
+    // remove, or rename periodic task. To avoid this situation, we check 
again (besides the check at controller API
+    // level) whether the periodic task exists.
+    PeriodicTask periodicTask = getPeriodicTask(periodicTaskName);
+    if (periodicTask == null) {
+      throw new IllegalArgumentException("Unknown Periodic Task " + 
periodicTaskName);

Review comment:
       Log a warning and return instead of throwing exception as this can 
happen in normal case?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
##########
@@ -54,14 +55,27 @@ public ControllerPeriodicTask(String taskName, long 
runFrequencyInSeconds, long
     _controllerMetrics = controllerMetrics;
   }
 
+  // Determine if this task can run on the specified table. Task can run on 
all tables for which the controller is lead
+  // if "tablename" property is not set. However, if "tablename" property is 
set (by calling the /periodictask/run
+  // controller API), then the task will only run on the specified by the 
"tablename" property key.
+  private boolean shouldRunTaskForTable(String tableNameWithType) {
+    if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+      String propTableNameWithType = (String) 
_activePeriodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
+      return propTableNameWithType == null || 
propTableNameWithType.equals(tableNameWithType);
+    }
+    return false;
+  }
+
   @Override
   protected final void runTask() {
     _controllerMetrics.addMeteredTableValue(_taskName, 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
     try {
       // Process the tables that are managed by this controller
+      // TODO: creating tablesToProcess list below is redundant since 
processTables will unroll the list anyway. This
+      // needs to be cleaned up sometime.
       List<String> tablesToProcess = new ArrayList<>();
       for (String tableNameWithType : 
_pinotHelixResourceManager.getAllTables()) {
-        if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+        if (shouldRunTaskForTable(tableNameWithType)) {

Review comment:
       This will be very inefficient when table name is already provided. We 
should only check for the provided table name instead.
   If we have a separate method `runTask(Properties periodicTaskProperties)` it 
will be much cleaner

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
##########
@@ -149,27 +190,17 @@ public final synchronized void stop() {
     }
     _started = false;
 
-    if (_running) {
-      long startTimeMs = System.currentTimeMillis();
-      long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS;
-      LOGGER.info("Task: {} is running, wait for at most {}ms for it to 
finish", _taskName, remainingTimeMs);
-      while (_running && remainingTimeMs > 0L) {
-        long sleepTimeMs = Long.min(remainingTimeMs, 1000L);
-        remainingTimeMs -= sleepTimeMs;
-        try {
-          Thread.sleep(sleepTimeMs);
-        } catch (InterruptedException e) {
-          LOGGER.error("Caught InterruptedException while waiting for task: {} 
to finish", _taskName);
-          Thread.currentThread().interrupt();
-          break;
-        }
-      }
-      long waitTimeMs = System.currentTimeMillis() - startTimeMs;
-      if (_running) {
-        LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs);
+    try {
+      // check if task is done running, or wait for the task to get done, by 
trying to acquire runLock.
+      if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, 
TimeUnit.MILLISECONDS)) {
+        LOGGER.warn("Task: {} did not finish within timeout of {}ms", 
MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
       } else {
-        LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
+        LOGGER.info("Task: {} finished within timeout of {}ms", 
MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
       }
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();

Review comment:
       Please keep the error message

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.List;
+import java.util.UUID;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.PERIODIC_TASK_TAG)
+@Path("/periodictask")
+public class PinotControllerPeriodicTaskRestletResource {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class);
+  private static final String API_REQUEST_ID_PREFIX = "api-";
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  PeriodicTaskScheduler _periodicTaskScheduler;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/run")
+  @ApiOperation(value = "Run a periodic task against specified table. If no 
table name is specified, task will run against all tables.")
+  public String runPeriodicTask(
+      @ApiParam(value = "Periodic task name", required = true) 
@QueryParam("taskname") String periodicTaskName,
+      @ApiParam(value = "Table name", required = false) 
@QueryParam("tablename") String tableName,
+      @ApiParam(value = "Table type suffix", required = false, example = 
"OFFLINE | REALTIME", defaultValue = "OFFLINE") @QueryParam("tabletype") String 
tableType) {
+    if (!_periodicTaskScheduler.hasTask(periodicTaskName)) {
+      throw new WebApplicationException("Periodic task '" + periodicTaskName + 
"' not found.",
+          Response.Status.NOT_FOUND);
+    }
+
+    if (tableName != null) {

Review comment:
       Use 
`ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, Constants.validateTableType(tableTypeStr), LOGGER)`. We want to 
handle table name with type suffix as well. If we only allow triggering 
periodic task for one single table, we can check if there is only one table 
matching.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
##########
@@ -59,6 +64,12 @@
   @Override
   void run();
 
+  /**
+   * Execute the task once. This method will calls the {@link #run} method.

Review comment:
       Don't put implementation detail as the interface javadoc. It is 
essentially the same as `run()` but just takes some extra properties.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerMessageHandlerFactory.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller;
+
+import java.util.Properties;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** Factory class for creating message handlers for incoming helix messages. */
+public class ControllerMessageHandlerFactory implements MessageHandlerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerMessageHandlerFactory.class);
+  private static final String USER_DEFINED_MSG_STRING = 
Message.MessageType.USER_DEFINE_MSG.toString();
+
+  private final PeriodicTaskScheduler _periodicTaskScheduler;
+
+  public ControllerMessageHandlerFactory(PeriodicTaskScheduler 
periodicTaskScheduler) {
+    _periodicTaskScheduler = periodicTaskScheduler;
+  }
+
+  @Override
+  public MessageHandler createHandler(Message message, NotificationContext 
notificationContext) {
+    String messageType = message.getMsgSubType();
+    if 
(messageType.equals(RunPeriodicTaskMessage.RUN_PERIODIC_TASK_MSG_SUB_TYPE)) {
+      return new RunPeriodicTaskMessageHandler(new 
RunPeriodicTaskMessage(message), notificationContext,
+          _periodicTaskScheduler);
+    }
+
+    LOGGER.warn("Unknown message type {} received by controller. ", 
messageType);
+    return null;
+  }
+
+  @Override
+  public String getMessageType() {
+    return USER_DEFINED_MSG_STRING;
+  }
+
+  @Override
+  public void reset() {
+  }
+
+  /** Message handler for {@link RunPeriodicTaskMessage} message. */
+  private static class RunPeriodicTaskMessageHandler extends MessageHandler {
+    private final String _periodicTaskReqeustId;
+    private final String _periodicTaskName;
+    private final String _tableNameWithType;
+    private final PeriodicTaskScheduler _periodicTaskScheduler;
+
+    RunPeriodicTaskMessageHandler(RunPeriodicTaskMessage message, 
NotificationContext context,
+        PeriodicTaskScheduler periodicTaskScheduler) {
+      super(message, context);
+      _periodicTaskReqeustId = message.getPeriodicTaskRequestId();
+      _periodicTaskName = message.getPeriodicTaskName();
+      _tableNameWithType = message.getTableNameWithType();
+      _periodicTaskScheduler = periodicTaskScheduler;
+    }
+
+    @Override
+    public HelixTaskResult handleMessage()
+        throws InterruptedException {
+      LOGGER.info("[TaskRequestId: {}] Handling RunPeriodicTaskMessage by 
executing task {}", _periodicTaskReqeustId,
+          _periodicTaskName);
+      _periodicTaskScheduler
+          .scheduleNow(_periodicTaskName, 
createTaskProperties(_periodicTaskReqeustId, _tableNameWithType));
+      HelixTaskResult helixTaskResult = new HelixTaskResult();
+      helixTaskResult.setSuccess(true);
+      return helixTaskResult;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) 
{
+      LOGGER.error("[TaskRequestId: {}] Message handling error.", 
_periodicTaskReqeustId, e);
+    }
+
+    private static Properties createTaskProperties(String 
periodicTaskReqeustId, String tableNameWithType) {

Review comment:
       typo `Request`

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
##########
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import javax.annotation.Nonnull;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.Message;
+
+
+/**
+ * Upon receiving this message, Controller will execute the specified 
PeriodicTask against the tables for which it is
+ * the lead controller. The message is sent whenever API call for executing a 
PeriodicTask is invoked.
+ */
+public class RunPeriodicTaskMessage extends Message {
+  public static final String RUN_PERIODIC_TASK_MSG_SUB_TYPE = "PERIODIC_TASK";
+  private static final String PERIODIC_TASK_REQUEST_ID = "taskRequestId";
+  private static final String PERIODIC_TASK_NAME_KEY = "periodicTaskName";
+  private static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType";

Review comment:
       All the user-defined messages always have table name with type (resource 
name) but use key `tableName`. I kind of prefer matching them, but either way 
is okay

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
##########
@@ -149,27 +190,17 @@ public final synchronized void stop() {
     }
     _started = false;
 
-    if (_running) {
-      long startTimeMs = System.currentTimeMillis();
-      long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS;
-      LOGGER.info("Task: {} is running, wait for at most {}ms for it to 
finish", _taskName, remainingTimeMs);
-      while (_running && remainingTimeMs > 0L) {
-        long sleepTimeMs = Long.min(remainingTimeMs, 1000L);
-        remainingTimeMs -= sleepTimeMs;
-        try {
-          Thread.sleep(sleepTimeMs);
-        } catch (InterruptedException e) {
-          LOGGER.error("Caught InterruptedException while waiting for task: {} 
to finish", _taskName);
-          Thread.currentThread().interrupt();
-          break;
-        }
-      }
-      long waitTimeMs = System.currentTimeMillis() - startTimeMs;
-      if (_running) {
-        LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs);
+    try {
+      // check if task is done running, or wait for the task to get done, by 
trying to acquire runLock.
+      if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, 
TimeUnit.MILLISECONDS)) {
+        LOGGER.warn("Task: {} did not finish within timeout of {}ms", 
MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
       } else {
-        LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
+        LOGGER.info("Task: {} finished within timeout of {}ms", 
MAX_PERIODIC_TASK_STOP_TIME_MILLIS);

Review comment:
       Missing `_taskName`. Also would suggest put the actual waiting time 
instead of the timeout

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.List;
+import java.util.UUID;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.PERIODIC_TASK_TAG)
+@Path("/periodictask")
+public class PinotControllerPeriodicTaskRestletResource {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class);
+  private static final String API_REQUEST_ID_PREFIX = "api-";
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  PeriodicTaskScheduler _periodicTaskScheduler;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/run")
+  @ApiOperation(value = "Run a periodic task against specified table. If no 
table name is specified, task will run against all tables.")
+  public String runPeriodicTask(
+      @ApiParam(value = "Periodic task name", required = true) 
@QueryParam("taskname") String periodicTaskName,
+      @ApiParam(value = "Table name", required = false) 
@QueryParam("tablename") String tableName,
+      @ApiParam(value = "Table type suffix", required = false, example = 
"OFFLINE | REALTIME", defaultValue = "OFFLINE") @QueryParam("tabletype") String 
tableType) {
+    if (!_periodicTaskScheduler.hasTask(periodicTaskName)) {
+      throw new WebApplicationException("Periodic task '" + periodicTaskName + 
"' not found.",
+          Response.Status.NOT_FOUND);
+    }
+
+    if (tableName != null) {
+      tableName = tableName.trim();
+      if (tableName.length() > 0 && 
!_pinotHelixResourceManager.getAllRawTables().contains(tableName)) {
+        throw new WebApplicationException("Table '" + tableName + "' not 
found.", Response.Status.NOT_FOUND);
+      }
+    }
+
+    // Generate an id for this request by taking first eight characters of a 
randomly generated UUID. This request id
+    // is returned to the user and also appended to log messages so that user 
can locate all log messages associated
+    // with this PeriodicTask's execution.
+    String periodicTaskRequestId = API_REQUEST_ID_PREFIX + 
UUID.randomUUID().toString().substring(0,8);
+
+    LOGGER.info("[TaskRequestId: {}] Sending periodic task execution message 
to all controllers for running task {} against {}.",
+        periodicTaskRequestId, periodicTaskName, tableName != null ? " table 
'" + tableName + "'" : "all tables");
+
+    // Create and send message to send to all controllers (including this one)
+    Criteria recipientCriteria = new Criteria();
+    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setSessionSpecific(true);
+    
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+    recipientCriteria.setSelfExcluded(false);
+    RunPeriodicTaskMessage runPeriodicTaskMessage = new 
RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName,

Review comment:
       Don't manually construct the table name, use the one returned from the 
`getExistingTableNamesWithType()`

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
##########
@@ -54,14 +55,27 @@ public ControllerPeriodicTask(String taskName, long 
runFrequencyInSeconds, long
     _controllerMetrics = controllerMetrics;
   }
 
+  // Determine if this task can run on the specified table. Task can run on 
all tables for which the controller is lead
+  // if "tablename" property is not set. However, if "tablename" property is 
set (by calling the /periodictask/run
+  // controller API), then the task will only run on the specified by the 
"tablename" property key.
+  private boolean shouldRunTaskForTable(String tableNameWithType) {
+    if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+      String propTableNameWithType = (String) 
_activePeriodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
+      return propTableNameWithType == null || 
propTableNameWithType.equals(tableNameWithType);
+    }
+    return false;
+  }
+
   @Override
   protected final void runTask() {
     _controllerMetrics.addMeteredTableValue(_taskName, 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
     try {
       // Process the tables that are managed by this controller
+      // TODO: creating tablesToProcess list below is redundant since 
processTables will unroll the list anyway. This

Review comment:
       I don't quite follow this TODO. `tablesToProcess` gathers all the tables 
that are managed by this controller (this controller is the leader for these 
tables), and `processTables()` does not perform leader check




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to