This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a44f4a  Manually trigger PeriodicTask (#7174)
7a44f4a is described below

commit 7a44f4af2c5e2cb5dfb96c33d2010efb8e5f1e74
Author: Amrish Lal <amrish.k....@gmail.com>
AuthorDate: Mon Aug 23 12:06:14 2021 -0700

    Manually trigger PeriodicTask (#7174)
    
    * Don't allow a PeriodicTask to execute more than once at a time.
    
    * Revert pom.xml change.
    
    * Add controller message handler.
    
    * Cleanup imports.
    
    * Execute periodic task after controller receives message.
    
    * Add test case.
    
    * Additional log messages.
    
    * Fix test case.
    
    * Trigger task against specific table.
    
    * Cleanup.
    
    * Rebuild.
    
    * Code review changes.
    
    * Cleanup.
    
    * Controller api to get list of available task names.
    
    * Cleanup.
    
    * Cleanup.
    
    * Cleanup.
    
    * Codereview changes.
    
    * Cleanup.
    
    * Controller API to set only one table and table type for manual periodic 
task execution.
    
    * Codereview changes.
    
    * Cleanup.
    
    * Codereview changes.
    
    * Update test case to use countdown latch.
    
    * Cleanup.
    
    * Cleanup.
    
    * Codreview changes.
    
    * Add request id to link PeriodicTask execution reqeust with its log 
entries.
    
    * Cleanup.
    
    * Rebuild.
    
    * Fix NPE.
    
    * Cleanup.
    
    * Codereview changes.
    
    * Cleanup.
    
    * Codereview changes.
    
    * Rebuild.
    
    * Fix test case.
    
    * Rebuild.
    
    * Fix test case.
    
    * Codereview changes.
    
    * Cleanup.
    
    * Cleanup /periodictask/run API
    
    * Cleanup.
    
    * Pass Properties object on the stack.
    
    * Codereview changes.
    
    * Fix checkstyle violations.
    
    * fix checkstyle.
---
 .../common/messages/RunPeriodicTaskMessage.java    |  66 ++++++++++
 .../pinot/controller/BaseControllerStarter.java    |   6 +
 ...ControllerUserDefinedMessageHandlerFactory.java | 134 +++++++++++++++++++++
 .../pinot/controller/api/resources/Constants.java  |   1 +
 ...PinotControllerPeriodicTaskRestletResource.java | 123 +++++++++++++++++++
 .../core/minion/MinionInstancesCleanupTask.java    |   3 +-
 .../helix/core/minion/TaskMetricsEmitter.java      |   3 +-
 .../core/periodictask/ControllerPeriodicTask.java  |  26 +++-
 .../minion/MinionInstancesCleanupTaskTest.java     |   9 +-
 .../pinot/core/periodictask/BasePeriodicTask.java  |  93 ++++++++------
 .../pinot/core/periodictask/PeriodicTask.java      |  11 ++
 .../core/periodictask/PeriodicTaskScheduler.java   |  62 +++++++++-
 .../periodictask/PeriodicTaskSchedulerTest.java    |  85 ++++++++++++-
 13 files changed, 574 insertions(+), 48 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
new file mode 100644
index 0000000..11ee6cf
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.Message;
+
+
+/**
+ * Upon receiving this message, Controller will execute the specified 
PeriodicTask against the tables for which it is
+ * the lead controller. The message is sent whenever API call for executing a 
PeriodicTask is invoked.
+ */
+public class RunPeriodicTaskMessage extends Message {
+  public static final String RUN_PERIODIC_TASK_MSG_SUB_TYPE = 
"RUN_PERIODIC_TASK";
+  private static final String PERIODIC_TASK_REQUEST_ID = "requestId";
+  private static final String PERIODIC_TASK_NAME_KEY = "taskName";
+  private static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType";
+
+  /**
+   * @param taskRequestId Request Id that will be appended to log messages.
+   * @param periodicTaskName Name of the task that will be run.
+   * @param tableNameWithType Table (names with type suffix) on which task 
will run.
+   */
+  public RunPeriodicTaskMessage(String taskRequestId, String periodicTaskName, 
String tableNameWithType) {
+    super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+    setMsgSubType(RUN_PERIODIC_TASK_MSG_SUB_TYPE);
+    setExecutionTimeout(-1);
+    ZNRecord znRecord = getRecord();
+    znRecord.setSimpleField(PERIODIC_TASK_REQUEST_ID, taskRequestId);
+    znRecord.setSimpleField(PERIODIC_TASK_NAME_KEY, periodicTaskName);
+    znRecord.setSimpleField(TABLE_NAME_WITH_TYPE_KEY, tableNameWithType);
+  }
+
+  public RunPeriodicTaskMessage(Message message) {
+    super(message.getRecord());
+  }
+
+  public String getPeriodicTaskRequestId() {
+    return getRecord().getSimpleField(PERIODIC_TASK_REQUEST_ID);
+  }
+
+  public String getPeriodicTaskName() {
+    return getRecord().getSimpleField(PERIODIC_TASK_NAME_KEY);
+  }
+
+  public String getTableNameWithType() {
+    return getRecord().getSimpleField(TABLE_NAME_WITH_TYPE_KEY);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index b6471b1..39b9b1e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -411,6 +411,11 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     _periodicTaskScheduler.init(controllerPeriodicTasks);
     _periodicTaskScheduler.start();
 
+    // Register message handler for incoming user-defined helix messages.
+    _helixParticipantManager.getMessagingService()
+        
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+            new 
ControllerUserDefinedMessageHandlerFactory(_periodicTaskScheduler));
+
     String accessControlFactoryClass = _config.getAccessControlFactoryClass();
     LOGGER.info("Use class: {} as the AccessControlFactory", 
accessControlFactoryClass);
     final AccessControlFactory accessControlFactory;
@@ -443,6 +448,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         bind(accessControlFactory).to(AccessControlFactory.class);
         
bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
         bind(_leadControllerManager).to(LeadControllerManager.class);
+        bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class);
       }
     });
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java
new file mode 100644
index 0000000..fd9dfb8
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller;
+
+import java.util.Properties;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** Factory class for creating message handlers for incoming helix messages. */
+public class ControllerUserDefinedMessageHandlerFactory implements 
MessageHandlerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerUserDefinedMessageHandlerFactory.class);
+  private static final String USER_DEFINED_MSG_STRING = 
Message.MessageType.USER_DEFINE_MSG.toString();
+
+  private final PeriodicTaskScheduler _periodicTaskScheduler;
+
+  public ControllerUserDefinedMessageHandlerFactory(PeriodicTaskScheduler 
periodicTaskScheduler) {
+    _periodicTaskScheduler = periodicTaskScheduler;
+  }
+
+  @Override
+  public MessageHandler createHandler(Message message, NotificationContext 
notificationContext) {
+    String messageType = message.getMsgSubType();
+    if 
(messageType.equals(RunPeriodicTaskMessage.RUN_PERIODIC_TASK_MSG_SUB_TYPE)) {
+      return new RunPeriodicTaskMessageHandler(new 
RunPeriodicTaskMessage(message), notificationContext,
+          _periodicTaskScheduler);
+    }
+
+    // Log a warning and return no-op message handler for unsupported message 
sub-types. This can happen when
+    // a new message sub-type is added, and the sender gets deployed first 
while receiver is still running the
+    // old version.
+    LOGGER.warn("Received message with unsupported sub-type: {}, using no-op 
message handler", messageType);
+    return new NoOpMessageHandler(message, notificationContext);
+  }
+
+  @Override
+  public String getMessageType() {
+    return USER_DEFINED_MSG_STRING;
+  }
+
+  @Override
+  public void reset() {
+  }
+
+  /** Message handler for {@link RunPeriodicTaskMessage} message. */
+  private static class RunPeriodicTaskMessageHandler extends MessageHandler {
+    private final String _periodicTaskRequestId;
+    private final String _periodicTaskName;
+    private final String _tableNameWithType;
+    private final PeriodicTaskScheduler _periodicTaskScheduler;
+
+    RunPeriodicTaskMessageHandler(RunPeriodicTaskMessage message, 
NotificationContext context,
+        PeriodicTaskScheduler periodicTaskScheduler) {
+      super(message, context);
+      _periodicTaskRequestId = message.getPeriodicTaskRequestId();
+      _periodicTaskName = message.getPeriodicTaskName();
+      _tableNameWithType = message.getTableNameWithType();
+      _periodicTaskScheduler = periodicTaskScheduler;
+    }
+
+    @Override
+    public HelixTaskResult handleMessage()
+        throws InterruptedException {
+      LOGGER.info("[TaskRequestId: {}] Handling RunPeriodicTaskMessage by 
executing task {}", _periodicTaskRequestId,
+          _periodicTaskName);
+      _periodicTaskScheduler
+          .scheduleNow(_periodicTaskName, 
createTaskProperties(_periodicTaskRequestId, _tableNameWithType));
+      HelixTaskResult helixTaskResult = new HelixTaskResult();
+      helixTaskResult.setSuccess(true);
+      return helixTaskResult;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) 
{
+      LOGGER.error("[TaskRequestId: {}] Message handling error.", 
_periodicTaskRequestId, e);
+    }
+
+    private static Properties createTaskProperties(String 
periodicTaskRequestId, String tableNameWithType) {
+      Properties periodicTaskParameters = new Properties();
+      if (periodicTaskRequestId != null) {
+        
periodicTaskParameters.setProperty(PeriodicTask.PROPERTY_KEY_REQUEST_ID, 
periodicTaskRequestId);
+      }
+
+      if (tableNameWithType != null) {
+        
periodicTaskParameters.setProperty(PeriodicTask.PROPERTY_KEY_TABLE_NAME, 
tableNameWithType);
+      }
+
+      return periodicTaskParameters;
+    }
+  }
+
+  /** Message handler for unknown messages */
+  private static class NoOpMessageHandler extends MessageHandler {
+    NoOpMessageHandler(Message message, NotificationContext context) {
+      super(message, context);
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      HelixTaskResult result = new HelixTaskResult();
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+      LOGGER.error("Got error for no-op message handling (error code: {}, 
error type: {})", code, type, e);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 099651a..3e9b33c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -45,6 +45,7 @@ public class Constants {
   public static final String TABLE_NAME = "tableName";
   public static final String ZOOKEEPER = "Zookeeper";
   public static final String APP_CONFIGS = "AppConfigs";
+  public static final String PERIODIC_TASK_TAG = "PeriodicTask";
 
   public static TableType validateTableType(String tableTypeStr) {
     if (tableTypeStr == null || tableTypeStr.isEmpty()) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
new file mode 100644
index 0000000..99a597b
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.List;
+import java.util.UUID;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.PERIODIC_TASK_TAG)
+@Path("/periodictask")
+public class PinotControllerPeriodicTaskRestletResource {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class);
+  private static final String API_REQUEST_ID_PREFIX = "api-";
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  PeriodicTaskScheduler _periodicTaskScheduler;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/run")
+  @ApiOperation(value = "Run periodic task against table. If table name is 
missing, task will run against all tables.")
+  public String runPeriodicTask(
+      @ApiParam(value = "Periodic task name", required = true) 
@QueryParam("taskname") String periodicTaskName,
+      @ApiParam(value = "Name of the table") @QueryParam("tableName") String 
tableName,
+      @ApiParam(value = "OFFLINE | REALTIME") @QueryParam("type") String 
tableType) {
+
+    if (!_periodicTaskScheduler.hasTask(periodicTaskName)) {
+      throw new WebApplicationException("Periodic task '" + periodicTaskName + 
"' not found.",
+          Response.Status.NOT_FOUND);
+    }
+
+    if (tableName != null) {
+      tableName = tableName.trim();
+      List<String> matchingTableNamesWithType = ResourceUtils
+          .getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, Constants.validateTableType(tableType),
+              LOGGER);
+
+      if (matchingTableNamesWithType.size() > 1) {
+        throw new WebApplicationException(
+            "More than one table matches Table '" + tableName + "'. Matching 
names: " + matchingTableNamesWithType
+                .toString());
+      }
+
+      tableName = matchingTableNamesWithType.get(0);
+    }
+
+    // Generate an id for this request by taking first eight characters of a 
randomly generated UUID. This request id
+    // is returned to the user and also appended to log messages so that user 
can locate all log messages associated
+    // with this PeriodicTask's execution.
+    String periodicTaskRequestId = API_REQUEST_ID_PREFIX + 
UUID.randomUUID().toString().substring(0, 8);
+
+    LOGGER.info(
+        "[TaskRequestId: {}] Sending periodic task message to all controllers 
for running task {} against {}.",
+        periodicTaskRequestId, periodicTaskName, tableName != null ? " table 
'" + tableName + "'" : "all tables");
+
+    // Create and send message to send to all controllers (including this one)
+    Criteria recipientCriteria = new Criteria();
+    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setSessionSpecific(true);
+    
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+    recipientCriteria.setSelfExcluded(false);
+    RunPeriodicTaskMessage runPeriodicTaskMessage =
+        new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, 
tableName);
+
+    ClusterMessagingService clusterMessagingService =
+        _pinotHelixResourceManager.getHelixZkManager().getMessagingService();
+    int messageCount = clusterMessagingService.send(recipientCriteria, 
runPeriodicTaskMessage, null, -1);
+    LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to 
{} controllers.", periodicTaskRequestId,
+        messageCount);
+
+    return "{\"Log Request Id\": \"" + periodicTaskRequestId + 
"\",\"Controllers notified\":" + (messageCount > 0)
+        + "}";
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/names")
+  @ApiOperation(value = "Get comma-delimited list of all available periodic 
task names.")
+  public List<String> getPeriodicTaskNames() {
+    return _periodicTaskScheduler.getTaskNames();
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
index a4b6baf..0f14924 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.minion;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
@@ -55,7 +56,7 @@ public class MinionInstancesCleanupTask extends 
BasePeriodicTask {
   }
 
   @Override
-  protected void runTask() {
+  protected void runTask(Properties periodicTaskProperties) {
     // Make it so that only one controller is responsible for cleaning up 
minion instances.
     if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) {
       return;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
index 64e7303..d9dea38 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.minion;
 
 import java.util.List;
+import java.util.Properties;
 import java.util.Set;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -59,7 +60,7 @@ public class TaskMetricsEmitter extends BasePeriodicTask {
   }
 
   @Override
-  protected final void runTask() {
+  protected final void runTask(Properties periodicTaskProperties) {
     // Make it so that only one controller returns the metric for all the 
tasks.
     if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) {
       return;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 5aff79c..9d7a676 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.periodictask;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -27,6 +28,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.BasePeriodicTask;
+import org.apache.pinot.core.periodictask.PeriodicTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,17 +57,31 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
   }
 
   @Override
-  protected final void runTask() {
+  protected final void runTask(Properties periodicTaskProperties) {
     _controllerMetrics.addMeteredTableValue(_taskName, 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
     try {
+      // Check if we have a specific table against which this task needs to be 
run.
+      String propTableNameWithType = (String) 
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
+
       // Process the tables that are managed by this controller
       List<String> tablesToProcess = new ArrayList<>();
-      for (String tableNameWithType : 
_pinotHelixResourceManager.getAllTables()) {
-        if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
-          tablesToProcess.add(tableNameWithType);
+      if (propTableNameWithType == null) {
+        // Table name is not available, so task should run on all tables for 
which this controller is the lead.
+        for (String tableNameWithType : 
_pinotHelixResourceManager.getAllTables()) {
+          if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+            tablesToProcess.add(tableNameWithType);
+          }
+        }
+      } else {
+        // Table name is available, so task should run only on the specified 
table.
+        if (_leadControllerManager.isLeaderForTable(propTableNameWithType)) {
+          tablesToProcess.add(propTableNameWithType);
         }
       }
-      processTables(tablesToProcess);
+
+      if (!tablesToProcess.isEmpty()) {
+        processTables(tablesToProcess);
+      }
     } catch (Exception e) {
       LOGGER.error("Caught exception while running task: {}", _taskName, e);
       _controllerMetrics.addMeteredTableValue(_taskName, 
ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
index 363c795..d6587af 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix.core.minion;
 
+import java.util.Properties;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.testng.Assert;
@@ -38,22 +39,22 @@ public class MinionInstancesCleanupTaskTest extends 
ControllerTest {
   public void testMinionInstancesCleanupTask()
       throws Exception {
     MinionInstancesCleanupTask minionInstancesCleanupTask = 
_controllerStarter.getMinionInstancesCleanupTask();
-    minionInstancesCleanupTask.runTask();
+    minionInstancesCleanupTask.runTask(new Properties());
     Assert.assertEquals(
         
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 0);
     addFakeMinionInstancesToAutoJoinHelixCluster(3);
     Assert.assertEquals(
         
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 0);
     stopFakeInstance("Minion_localhost_0");
-    minionInstancesCleanupTask.runTask();
+    minionInstancesCleanupTask.runTask(new Properties());
     Assert.assertEquals(
         
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 1);
     stopFakeInstance("Minion_localhost_1");
-    minionInstancesCleanupTask.runTask();
+    minionInstancesCleanupTask.runTask(new Properties());
     Assert.assertEquals(
         
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 2);
     stopFakeInstance("Minion_localhost_2");
-    minionInstancesCleanupTask.runTask();
+    minionInstancesCleanupTask.runTask(new Properties());
     Assert.assertEquals(
         
_controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES),
 3);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
index eb15aa9..f7a14de 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.core.periodictask;
 
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.concurrent.ThreadSafe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,6 +32,7 @@ import org.slf4j.LoggerFactory;
 @ThreadSafe
 public abstract class BasePeriodicTask implements PeriodicTask {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BasePeriodicTask.class);
+  private static final String DEFAULT_REQUEST_ID = "auto";
 
   // Wait for at most 30 seconds while calling stop() for task to terminate
   private static final long MAX_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
@@ -36,14 +40,26 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
   protected final String _taskName;
   protected final long _intervalInSeconds;
   protected final long _initialDelayInSeconds;
+  protected final ReentrantLock _runLock;
 
   private volatile boolean _started;
   private volatile boolean _running;
 
+  // Default properties that tasks may use during execution. This variable is 
private and does not have any get or set
+  // methods to prevent subclasses from gaining direct access to this 
variable. See run(Properties) method to see how
+  // properties are passed and used during task execution.
+  private static final Properties DEFAULT_PERIODIC_TASK_PROPERTIES;
+  static {
+    // Default properties for PeriodicTask execution.
+    DEFAULT_PERIODIC_TASK_PROPERTIES = new Properties();
+    DEFAULT_PERIODIC_TASK_PROPERTIES.put(PeriodicTask.PROPERTY_KEY_REQUEST_ID, 
DEFAULT_REQUEST_ID);
+  }
+
   public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long 
initialDelayInSeconds) {
     _taskName = taskName;
     _intervalInSeconds = runFrequencyInSeconds;
     _initialDelayInSeconds = initialDelayInSeconds;
+    _runLock = new ReentrantLock();
   }
 
   @Override
@@ -87,13 +103,16 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
       LOGGER.warn("Task: {} is already started", _taskName);
       return;
     }
-    _started = true;
 
     try {
       setUpTask();
     } catch (Exception e) {
       LOGGER.error("Caught exception while setting up task: {}", _taskName, e);
     }
+
+    // mark _started as true only after state has completely initialized, so 
that run method doesn't end up seeing
+    // partially initialized state.
+    _started = true;
   }
 
   /**
@@ -111,29 +130,41 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
    */
   @Override
   public final void run() {
-    _running = true;
+    // Pass default properties object to the actual run method.
+    run(DEFAULT_PERIODIC_TASK_PROPERTIES);
+  }
 
-    if (_started) {
-      long startTime = System.currentTimeMillis();
-      LOGGER.info("Start running task: {}", _taskName);
-      try {
-        runTask();
-      } catch (Exception e) {
-        LOGGER.error("Caught exception while running task: {}", _taskName, e);
+  @Override
+  public final void run(Properties periodicTaskProperties) {
+    try {
+      // Don't allow a task to run more than once at a time.
+      _runLock.lock();
+      _running = true;
+
+      String periodicTaskRequestId = 
periodicTaskProperties.getProperty(PeriodicTask.PROPERTY_KEY_REQUEST_ID);
+      if (_started) {
+        long startTime = System.currentTimeMillis();
+        LOGGER.info("[TaskRequestId: {}] Start running task: {}", 
periodicTaskRequestId, _taskName);
+        try {
+          runTask(periodicTaskProperties);
+        } catch (Exception e) {
+          LOGGER.error("[TaskRequestId: {}] Caught exception while running 
task: {}", periodicTaskRequestId, _taskName, e);
+        }
+        LOGGER.info("[TaskRequestId: {}] Finish running task: {} in {}ms", 
periodicTaskRequestId, _taskName, System.currentTimeMillis() - startTime);
+      } else {
+        LOGGER.warn("[TaskRequestId: {}] Task: {} is skipped because it is not 
started or already stopped", periodicTaskRequestId, _taskName);
       }
-      LOGGER.info("Finish running task: {} in {}ms", _taskName, 
System.currentTimeMillis() - startTime);
-    } else {
-      LOGGER.warn("Task: {} is skipped because it is not started or already 
stopped", _taskName);
+    } finally {
+       _runLock.unlock();
+       _running = false;
     }
-
-    _running = false;
   }
 
   /**
    * Executes the task. This method should early terminate if {@code started} 
flag is set to false by {@link #stop()}
    * during execution.
    */
-  protected abstract void runTask();
+  protected abstract void runTask(Properties periodicTaskProperties);
 
   /**
    * {@inheritDoc}
@@ -147,28 +178,22 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
       LOGGER.warn("Task: {} is not started", _taskName);
       return;
     }
+    long startTimeMs = System.currentTimeMillis();
     _started = false;
 
-    if (_running) {
-      long startTimeMs = System.currentTimeMillis();
-      long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS;
-      LOGGER.info("Task: {} is running, wait for at most {}ms for it to 
finish", _taskName, remainingTimeMs);
-      while (_running && remainingTimeMs > 0L) {
-        long sleepTimeMs = Long.min(remainingTimeMs, 1000L);
-        remainingTimeMs -= sleepTimeMs;
-        try {
-          Thread.sleep(sleepTimeMs);
-        } catch (InterruptedException e) {
-          LOGGER.error("Caught InterruptedException while waiting for task: {} 
to finish", _taskName);
-          Thread.currentThread().interrupt();
-          break;
-        }
-      }
-      long waitTimeMs = System.currentTimeMillis() - startTimeMs;
-      if (_running) {
-        LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs);
+    try {
+      // check if task is done running, or wait for the task to get done, by 
trying to acquire runLock.
+      if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, 
TimeUnit.MILLISECONDS)) {
+        LOGGER.warn("Task {} could not be stopped within timeout of {}ms", 
_taskName, MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
       } else {
-        LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
+        LOGGER.info("Task {} successfully stopped in {}ms", _taskName, 
System.currentTimeMillis() - startTimeMs);
+      }
+    } catch (InterruptedException ie) {
+      LOGGER.error("Caught InterruptedException while waiting for task: {} to 
finish", _taskName);
+      Thread.currentThread().interrupt();
+    } finally {
+      if (_runLock.isHeldByCurrentThread()) {
+        _runLock.unlock();
       }
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
index ee6c68b..4a5e120 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.periodictask;
 
+import java.util.Properties;
 import javax.annotation.concurrent.ThreadSafe;
 
 
@@ -28,6 +29,10 @@ import javax.annotation.concurrent.ThreadSafe;
 @ThreadSafe
 public interface PeriodicTask extends Runnable {
 
+  // PeriodicTask objects may take a {@link Properties} object. Define all the 
keys property keys here.
+  String PROPERTY_KEY_REQUEST_ID = "requestId";
+  String PROPERTY_KEY_TABLE_NAME = "tableNameWithType";
+
   /**
    * Returns the periodic task name.
    * @return task name.
@@ -60,6 +65,12 @@ public interface PeriodicTask extends Runnable {
   void run();
 
   /**
+   * Execute the task with specified {@link Properties}.
+   * @param periodicTaskProperties Properties used by {@link PeriodicTask} 
during execution.
+   */
+  void run(Properties periodicTaskProperties);
+
+  /**
    * Stops the periodic task and performs necessary cleanups. Should be called 
after removing the periodic task from the
    * scheduler. Should be called after {@link #start()} getting called.
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
index ffbc34a..8e42adf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.periodictask;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -28,7 +29,8 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Periodic task scheduler will schedule a list of tasks based on their 
initial delay time and interval time.
+ * Periodic task scheduler will schedule a list of tasks based on their 
initial delay time and interval time. Tasks
+ * can also scheduled of immediate execution by calling the scheduleNow() 
method.
  */
 public class PeriodicTaskScheduler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PeriodicTaskScheduler.class);
@@ -109,4 +111,62 @@ public class PeriodicTaskScheduler {
       _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop);
     }
   }
+
+  /** @return true if task with given name exists; otherwise, false. */
+  public boolean hasTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** @return List of tasks name that will run periodically. */
+  public List<String> getTaskNames() {
+    List<String> taskNameList = new ArrayList<>();
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      taskNameList.add(task.getTaskName());
+    }
+    return taskNameList;
+  }
+
+  private PeriodicTask getPeriodicTask(String periodicTaskName) {
+    for (PeriodicTask task : _tasksWithValidInterval) {
+      if (task.getTaskName().equals(periodicTaskName)) {
+        return task;
+      }
+    }
+    return null;
+  }
+
+  /** Execute {@link PeriodicTask} immediately on the specified table. */
+  public void scheduleNow(String periodicTaskName, Properties 
periodicTaskProperties) {
+    // During controller deployment, each controller can have a slightly 
different list of periodic tasks if we add,
+    // remove, or rename periodic task. To avoid this situation, we check 
again (besides the check at controller API
+    // level) whether the periodic task exists.
+    PeriodicTask periodicTask = getPeriodicTask(periodicTaskName);
+    if (periodicTask == null) {
+      LOGGER.error("Unknown Periodic Task " + periodicTaskName);
+      return;
+    }
+
+    String taskRequestId = 
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_REQUEST_ID).toString();
+    LOGGER.info(
+        "[TaskRequestId: {}] Schedule task '{}' to run immediately. If the 
task is already running, this run will wait until the current run finishes.",
+        taskRequestId, periodicTaskName);
+    _executorService.schedule(() -> {
+      try {
+        // Run the periodic task using the specified parameters. The call to 
run() method will block if another thread
+        // (the periodic execution thread or another thread calling this 
method) is already in the process of
+        // running the same task.
+        periodicTask.run(periodicTaskProperties);
+      } catch (Throwable t) {
+        // catch all errors to prevent subsequent executions from being 
silently suppressed
+        // Ref: 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
+        LOGGER.error("[TaskRequestId: {}] Caught exception while attempting to 
execute named periodic task: {}",
+            taskRequestId, periodicTask.getTaskName(), t);
+      }
+    }, 0, TimeUnit.SECONDS);
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index a4d581d..a033e06 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -21,8 +21,11 @@ package org.apache.pinot.core.periodictask;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -45,7 +48,7 @@ public class PeriodicTaskSchedulerTest {
       }
 
       @Override
-      protected void runTask() {
+      protected void runTask(Properties periodicTaskProperties) {
         runCalled.set(true);
       }
 
@@ -83,7 +86,7 @@ public class PeriodicTaskSchedulerTest {
         }
 
         @Override
-        protected void runTask() {
+        protected void runTask(Properties periodicTaskProperties) {
           numTimesRunCalled.getAndIncrement();
         }
 
@@ -104,4 +107,82 @@ public class PeriodicTaskSchedulerTest {
     assertEquals(numTimesRunCalled.get(), numTasks * 2);
     assertEquals(numTimesStopCalled.get(), numTasks);
   }
+
+  /**
+   * Test that {@link PeriodicTaskScheduler} is thread safe and does not run 
the same task more than once at any time.
+   * This is done by attempting to run the same task object in 20 different 
threads at the same time. While the test
+   * case launches 20 threads to keep {@link PeriodicTaskScheduler} busy, it 
waits for only around half of them to
+   * complete. The test case then checks whether the threads that did not 
complete execution were waiting to execute
+   * (i.e they had requested execution, but had not executed yet). This 
"waiting" indicates that task execution was
+   * being properly synchronized (otherwise all the tasks would have just run 
immediately). 'isRunning' variable within
+   * the task is used to check that the task is not executing more than once 
at any given time.
+   */
+  @Test
+  public void testConcurrentExecutionOfSameTask() throws Exception {
+    // Number of threads to run
+    final int numThreads = 20;
+
+    // Count number of threads that requested execution.
+    final AtomicInteger attempts = new AtomicInteger();
+
+    // Countdown latch to ensure that this test case will wait only for around 
half the tasks to complete.
+    final CountDownLatch countDownLatch = new CountDownLatch(numThreads/2);
+
+    // Create periodic task.
+    PeriodicTask task = new BasePeriodicTask("TestTask", 1L, 0L) {
+      private volatile boolean isRunning = false;
+      @Override
+      protected void runTask(Properties periodicTaskProperties) {
+        try {
+          if (isRunning) {
+            // fail since task is already running in another thread.
+            Assert.fail("More than one thread attempting to execute task at 
the same time.");
+          }
+          isRunning = true;
+          Thread.sleep(200);
+          countDownLatch.countDown();
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        } finally {
+          isRunning = false;
+        }
+      }
+    };
+
+    // Start scheduler with periodic task.
+    List<PeriodicTask> periodicTasks = new ArrayList<>();
+    periodicTasks.add(task);
+
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.init(periodicTasks);
+    taskScheduler.start();
+
+    // Create multiple "execute" threads that try to run the same task that is 
already being run by scheduler
+    // on a periodic basis.
+    Thread[] threads = new Thread[numThreads];
+    Properties taskProperties = new Properties();
+    taskProperties.put(PeriodicTask.PROPERTY_KEY_REQUEST_ID, 
getClass().getSimpleName());
+    for (int i = 0; i < threads.length; i++) {
+      threads[i] = new Thread(() -> {
+          attempts.incrementAndGet();
+          taskScheduler.scheduleNow("TestTask", taskProperties);
+      });
+
+      threads[i].start();
+      try {
+        threads[i].join();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    // Wait for around half the threads to finish running.
+    countDownLatch.await();
+
+    // stop task scheduler.
+    taskScheduler.stop();
+
+    // Confirm that all threads requested execution, even though only half the 
threads completed execution.
+    Assert.assertEquals(attempts.get(), numThreads);
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to