This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7a44f4a Manually trigger PeriodicTask (#7174) 7a44f4a is described below commit 7a44f4af2c5e2cb5dfb96c33d2010efb8e5f1e74 Author: Amrish Lal <amrish.k....@gmail.com> AuthorDate: Mon Aug 23 12:06:14 2021 -0700 Manually trigger PeriodicTask (#7174) * Don't allow a PeriodicTask to execute more than once at a time. * Revert pom.xml change. * Add controller message handler. * Cleanup imports. * Execute periodic task after controller receives message. * Add test case. * Additional log messages. * Fix test case. * Trigger task against specific table. * Cleanup. * Rebuild. * Code review changes. * Cleanup. * Controller api to get list of available task names. * Cleanup. * Cleanup. * Cleanup. * Codereview changes. * Cleanup. * Controller API to set only one table and table type for manual periodic task execution. * Codereview changes. * Cleanup. * Codereview changes. * Update test case to use countdown latch. * Cleanup. * Cleanup. * Codreview changes. * Add request id to link PeriodicTask execution reqeust with its log entries. * Cleanup. * Rebuild. * Fix NPE. * Cleanup. * Codereview changes. * Cleanup. * Codereview changes. * Rebuild. * Fix test case. * Rebuild. * Fix test case. * Codereview changes. * Cleanup. * Cleanup /periodictask/run API * Cleanup. * Pass Properties object on the stack. * Codereview changes. * Fix checkstyle violations. * fix checkstyle. --- .../common/messages/RunPeriodicTaskMessage.java | 66 ++++++++++ .../pinot/controller/BaseControllerStarter.java | 6 + ...ControllerUserDefinedMessageHandlerFactory.java | 134 +++++++++++++++++++++ .../pinot/controller/api/resources/Constants.java | 1 + ...PinotControllerPeriodicTaskRestletResource.java | 123 +++++++++++++++++++ .../core/minion/MinionInstancesCleanupTask.java | 3 +- .../helix/core/minion/TaskMetricsEmitter.java | 3 +- .../core/periodictask/ControllerPeriodicTask.java | 26 +++- .../minion/MinionInstancesCleanupTaskTest.java | 9 +- .../pinot/core/periodictask/BasePeriodicTask.java | 93 ++++++++------ .../pinot/core/periodictask/PeriodicTask.java | 11 ++ .../core/periodictask/PeriodicTaskScheduler.java | 62 +++++++++- .../periodictask/PeriodicTaskSchedulerTest.java | 85 ++++++++++++- 13 files changed, 574 insertions(+), 48 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java new file mode 100644 index 0000000..11ee6cf --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.messages; + +import java.util.UUID; +import org.apache.helix.ZNRecord; +import org.apache.helix.model.Message; + + +/** + * Upon receiving this message, Controller will execute the specified PeriodicTask against the tables for which it is + * the lead controller. The message is sent whenever API call for executing a PeriodicTask is invoked. + */ +public class RunPeriodicTaskMessage extends Message { + public static final String RUN_PERIODIC_TASK_MSG_SUB_TYPE = "RUN_PERIODIC_TASK"; + private static final String PERIODIC_TASK_REQUEST_ID = "requestId"; + private static final String PERIODIC_TASK_NAME_KEY = "taskName"; + private static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType"; + + /** + * @param taskRequestId Request Id that will be appended to log messages. + * @param periodicTaskName Name of the task that will be run. + * @param tableNameWithType Table (names with type suffix) on which task will run. + */ + public RunPeriodicTaskMessage(String taskRequestId, String periodicTaskName, String tableNameWithType) { + super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString()); + setMsgSubType(RUN_PERIODIC_TASK_MSG_SUB_TYPE); + setExecutionTimeout(-1); + ZNRecord znRecord = getRecord(); + znRecord.setSimpleField(PERIODIC_TASK_REQUEST_ID, taskRequestId); + znRecord.setSimpleField(PERIODIC_TASK_NAME_KEY, periodicTaskName); + znRecord.setSimpleField(TABLE_NAME_WITH_TYPE_KEY, tableNameWithType); + } + + public RunPeriodicTaskMessage(Message message) { + super(message.getRecord()); + } + + public String getPeriodicTaskRequestId() { + return getRecord().getSimpleField(PERIODIC_TASK_REQUEST_ID); + } + + public String getPeriodicTaskName() { + return getRecord().getSimpleField(PERIODIC_TASK_NAME_KEY); + } + + public String getTableNameWithType() { + return getRecord().getSimpleField(TABLE_NAME_WITH_TYPE_KEY); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index b6471b1..39b9b1e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -411,6 +411,11 @@ public abstract class BaseControllerStarter implements ServiceStartable { _periodicTaskScheduler.init(controllerPeriodicTasks); _periodicTaskScheduler.start(); + // Register message handler for incoming user-defined helix messages. + _helixParticipantManager.getMessagingService() + .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), + new ControllerUserDefinedMessageHandlerFactory(_periodicTaskScheduler)); + String accessControlFactoryClass = _config.getAccessControlFactoryClass(); LOGGER.info("Use class: {} as the AccessControlFactory", accessControlFactoryClass); final AccessControlFactory accessControlFactory; @@ -443,6 +448,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { bind(accessControlFactory).to(AccessControlFactory.class); bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class); bind(_leadControllerManager).to(LeadControllerManager.class); + bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class); } }); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java new file mode 100644 index 0000000..fd9dfb8 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerUserDefinedMessageHandlerFactory.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller; + +import java.util.Properties; +import org.apache.helix.NotificationContext; +import org.apache.helix.messaging.handling.HelixTaskResult; +import org.apache.helix.messaging.handling.MessageHandler; +import org.apache.helix.messaging.handling.MessageHandlerFactory; +import org.apache.helix.model.Message; +import org.apache.pinot.common.messages.RunPeriodicTaskMessage; +import org.apache.pinot.core.periodictask.PeriodicTask; +import org.apache.pinot.core.periodictask.PeriodicTaskScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** Factory class for creating message handlers for incoming helix messages. */ +public class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerUserDefinedMessageHandlerFactory.class); + private static final String USER_DEFINED_MSG_STRING = Message.MessageType.USER_DEFINE_MSG.toString(); + + private final PeriodicTaskScheduler _periodicTaskScheduler; + + public ControllerUserDefinedMessageHandlerFactory(PeriodicTaskScheduler periodicTaskScheduler) { + _periodicTaskScheduler = periodicTaskScheduler; + } + + @Override + public MessageHandler createHandler(Message message, NotificationContext notificationContext) { + String messageType = message.getMsgSubType(); + if (messageType.equals(RunPeriodicTaskMessage.RUN_PERIODIC_TASK_MSG_SUB_TYPE)) { + return new RunPeriodicTaskMessageHandler(new RunPeriodicTaskMessage(message), notificationContext, + _periodicTaskScheduler); + } + + // Log a warning and return no-op message handler for unsupported message sub-types. This can happen when + // a new message sub-type is added, and the sender gets deployed first while receiver is still running the + // old version. + LOGGER.warn("Received message with unsupported sub-type: {}, using no-op message handler", messageType); + return new NoOpMessageHandler(message, notificationContext); + } + + @Override + public String getMessageType() { + return USER_DEFINED_MSG_STRING; + } + + @Override + public void reset() { + } + + /** Message handler for {@link RunPeriodicTaskMessage} message. */ + private static class RunPeriodicTaskMessageHandler extends MessageHandler { + private final String _periodicTaskRequestId; + private final String _periodicTaskName; + private final String _tableNameWithType; + private final PeriodicTaskScheduler _periodicTaskScheduler; + + RunPeriodicTaskMessageHandler(RunPeriodicTaskMessage message, NotificationContext context, + PeriodicTaskScheduler periodicTaskScheduler) { + super(message, context); + _periodicTaskRequestId = message.getPeriodicTaskRequestId(); + _periodicTaskName = message.getPeriodicTaskName(); + _tableNameWithType = message.getTableNameWithType(); + _periodicTaskScheduler = periodicTaskScheduler; + } + + @Override + public HelixTaskResult handleMessage() + throws InterruptedException { + LOGGER.info("[TaskRequestId: {}] Handling RunPeriodicTaskMessage by executing task {}", _periodicTaskRequestId, + _periodicTaskName); + _periodicTaskScheduler + .scheduleNow(_periodicTaskName, createTaskProperties(_periodicTaskRequestId, _tableNameWithType)); + HelixTaskResult helixTaskResult = new HelixTaskResult(); + helixTaskResult.setSuccess(true); + return helixTaskResult; + } + + @Override + public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) { + LOGGER.error("[TaskRequestId: {}] Message handling error.", _periodicTaskRequestId, e); + } + + private static Properties createTaskProperties(String periodicTaskRequestId, String tableNameWithType) { + Properties periodicTaskParameters = new Properties(); + if (periodicTaskRequestId != null) { + periodicTaskParameters.setProperty(PeriodicTask.PROPERTY_KEY_REQUEST_ID, periodicTaskRequestId); + } + + if (tableNameWithType != null) { + periodicTaskParameters.setProperty(PeriodicTask.PROPERTY_KEY_TABLE_NAME, tableNameWithType); + } + + return periodicTaskParameters; + } + } + + /** Message handler for unknown messages */ + private static class NoOpMessageHandler extends MessageHandler { + NoOpMessageHandler(Message message, NotificationContext context) { + super(message, context); + } + + @Override + public HelixTaskResult handleMessage() { + HelixTaskResult result = new HelixTaskResult(); + result.setSuccess(true); + return result; + } + + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) { + LOGGER.error("Got error for no-op message handling (error code: {}, error type: {})", code, type, e); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java index 099651a..3e9b33c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java @@ -45,6 +45,7 @@ public class Constants { public static final String TABLE_NAME = "tableName"; public static final String ZOOKEEPER = "Zookeeper"; public static final String APP_CONFIGS = "AppConfigs"; + public static final String PERIODIC_TASK_TAG = "PeriodicTask"; public static TableType validateTableType(String tableTypeStr) { if (tableTypeStr == null || tableTypeStr.isEmpty()) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java new file mode 100644 index 0000000..99a597b --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.util.List; +import java.util.UUID; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.helix.ClusterMessagingService; +import org.apache.helix.Criteria; +import org.apache.helix.InstanceType; +import org.apache.pinot.common.messages.RunPeriodicTaskMessage; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.core.periodictask.PeriodicTaskScheduler; +import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@Api(tags = Constants.PERIODIC_TASK_TAG) +@Path("/periodictask") +public class PinotControllerPeriodicTaskRestletResource { + + private static final Logger LOGGER = LoggerFactory.getLogger(PinotControllerPeriodicTaskRestletResource.class); + private static final String API_REQUEST_ID_PREFIX = "api-"; + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @Inject + PeriodicTaskScheduler _periodicTaskScheduler; + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/run") + @ApiOperation(value = "Run periodic task against table. If table name is missing, task will run against all tables.") + public String runPeriodicTask( + @ApiParam(value = "Periodic task name", required = true) @QueryParam("taskname") String periodicTaskName, + @ApiParam(value = "Name of the table") @QueryParam("tableName") String tableName, + @ApiParam(value = "OFFLINE | REALTIME") @QueryParam("type") String tableType) { + + if (!_periodicTaskScheduler.hasTask(periodicTaskName)) { + throw new WebApplicationException("Periodic task '" + periodicTaskName + "' not found.", + Response.Status.NOT_FOUND); + } + + if (tableName != null) { + tableName = tableName.trim(); + List<String> matchingTableNamesWithType = ResourceUtils + .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, Constants.validateTableType(tableType), + LOGGER); + + if (matchingTableNamesWithType.size() > 1) { + throw new WebApplicationException( + "More than one table matches Table '" + tableName + "'. Matching names: " + matchingTableNamesWithType + .toString()); + } + + tableName = matchingTableNamesWithType.get(0); + } + + // Generate an id for this request by taking first eight characters of a randomly generated UUID. This request id + // is returned to the user and also appended to log messages so that user can locate all log messages associated + // with this PeriodicTask's execution. + String periodicTaskRequestId = API_REQUEST_ID_PREFIX + UUID.randomUUID().toString().substring(0, 8); + + LOGGER.info( + "[TaskRequestId: {}] Sending periodic task message to all controllers for running task {} against {}.", + periodicTaskRequestId, periodicTaskName, tableName != null ? " table '" + tableName + "'" : "all tables"); + + // Create and send message to send to all controllers (including this one) + Criteria recipientCriteria = new Criteria(); + recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + recipientCriteria.setInstanceName("%"); + recipientCriteria.setSessionSpecific(true); + recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME); + recipientCriteria.setSelfExcluded(false); + RunPeriodicTaskMessage runPeriodicTaskMessage = + new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, tableName); + + ClusterMessagingService clusterMessagingService = + _pinotHelixResourceManager.getHelixZkManager().getMessagingService(); + int messageCount = clusterMessagingService.send(recipientCriteria, runPeriodicTaskMessage, null, -1); + LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to {} controllers.", periodicTaskRequestId, + messageCount); + + return "{\"Log Request Id\": \"" + periodicTaskRequestId + "\",\"Controllers notified\":" + (messageCount > 0) + + "}"; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/names") + @ApiOperation(value = "Get comma-delimited list of all available periodic task names.") + public List<String> getPeriodicTaskNames() { + return _periodicTaskScheduler.getTaskNames(); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java index a4b6baf..0f14924 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTask.java @@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.minion; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; @@ -55,7 +56,7 @@ public class MinionInstancesCleanupTask extends BasePeriodicTask { } @Override - protected void runTask() { + protected void runTask(Properties periodicTaskProperties) { // Make it so that only one controller is responsible for cleaning up minion instances. if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) { return; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java index 64e7303..d9dea38 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller.helix.core.minion; import java.util.List; +import java.util.Properties; import java.util.Set; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -59,7 +60,7 @@ public class TaskMetricsEmitter extends BasePeriodicTask { } @Override - protected final void runTask() { + protected final void runTask(Properties periodicTaskProperties) { // Make it so that only one controller returns the metric for all the tasks. if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) { return; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java index 5aff79c..9d7a676 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java @@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.periodictask; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; @@ -27,6 +28,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.core.periodictask.PeriodicTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,17 +57,31 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { } @Override - protected final void runTask() { + protected final void runTask(Properties periodicTaskProperties) { _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L); try { + // Check if we have a specific table against which this task needs to be run. + String propTableNameWithType = (String) periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME); + // Process the tables that are managed by this controller List<String> tablesToProcess = new ArrayList<>(); - for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) { - if (_leadControllerManager.isLeaderForTable(tableNameWithType)) { - tablesToProcess.add(tableNameWithType); + if (propTableNameWithType == null) { + // Table name is not available, so task should run on all tables for which this controller is the lead. + for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) { + if (_leadControllerManager.isLeaderForTable(tableNameWithType)) { + tablesToProcess.add(tableNameWithType); + } + } + } else { + // Table name is available, so task should run only on the specified table. + if (_leadControllerManager.isLeaderForTable(propTableNameWithType)) { + tablesToProcess.add(propTableNameWithType); } } - processTables(tablesToProcess); + + if (!tablesToProcess.isEmpty()) { + processTables(tablesToProcess); + } } catch (Exception e) { LOGGER.error("Caught exception while running task: {}", _taskName, e); _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java index 363c795..d6587af 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/MinionInstancesCleanupTaskTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.helix.core.minion; +import java.util.Properties; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.controller.helix.ControllerTest; import org.testng.Assert; @@ -38,22 +39,22 @@ public class MinionInstancesCleanupTaskTest extends ControllerTest { public void testMinionInstancesCleanupTask() throws Exception { MinionInstancesCleanupTask minionInstancesCleanupTask = _controllerStarter.getMinionInstancesCleanupTask(); - minionInstancesCleanupTask.runTask(); + minionInstancesCleanupTask.runTask(new Properties()); Assert.assertEquals( _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0); addFakeMinionInstancesToAutoJoinHelixCluster(3); Assert.assertEquals( _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0); stopFakeInstance("Minion_localhost_0"); - minionInstancesCleanupTask.runTask(); + minionInstancesCleanupTask.runTask(new Properties()); Assert.assertEquals( _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 1); stopFakeInstance("Minion_localhost_1"); - minionInstancesCleanupTask.runTask(); + minionInstancesCleanupTask.runTask(new Properties()); Assert.assertEquals( _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 2); stopFakeInstance("Minion_localhost_2"); - minionInstancesCleanupTask.runTask(); + minionInstancesCleanupTask.runTask(new Properties()); Assert.assertEquals( _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 3); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java index eb15aa9..f7a14de 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.core.periodictask; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +32,7 @@ import org.slf4j.LoggerFactory; @ThreadSafe public abstract class BasePeriodicTask implements PeriodicTask { private static final Logger LOGGER = LoggerFactory.getLogger(BasePeriodicTask.class); + private static final String DEFAULT_REQUEST_ID = "auto"; // Wait for at most 30 seconds while calling stop() for task to terminate private static final long MAX_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L; @@ -36,14 +40,26 @@ public abstract class BasePeriodicTask implements PeriodicTask { protected final String _taskName; protected final long _intervalInSeconds; protected final long _initialDelayInSeconds; + protected final ReentrantLock _runLock; private volatile boolean _started; private volatile boolean _running; + // Default properties that tasks may use during execution. This variable is private and does not have any get or set + // methods to prevent subclasses from gaining direct access to this variable. See run(Properties) method to see how + // properties are passed and used during task execution. + private static final Properties DEFAULT_PERIODIC_TASK_PROPERTIES; + static { + // Default properties for PeriodicTask execution. + DEFAULT_PERIODIC_TASK_PROPERTIES = new Properties(); + DEFAULT_PERIODIC_TASK_PROPERTIES.put(PeriodicTask.PROPERTY_KEY_REQUEST_ID, DEFAULT_REQUEST_ID); + } + public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds) { _taskName = taskName; _intervalInSeconds = runFrequencyInSeconds; _initialDelayInSeconds = initialDelayInSeconds; + _runLock = new ReentrantLock(); } @Override @@ -87,13 +103,16 @@ public abstract class BasePeriodicTask implements PeriodicTask { LOGGER.warn("Task: {} is already started", _taskName); return; } - _started = true; try { setUpTask(); } catch (Exception e) { LOGGER.error("Caught exception while setting up task: {}", _taskName, e); } + + // mark _started as true only after state has completely initialized, so that run method doesn't end up seeing + // partially initialized state. + _started = true; } /** @@ -111,29 +130,41 @@ public abstract class BasePeriodicTask implements PeriodicTask { */ @Override public final void run() { - _running = true; + // Pass default properties object to the actual run method. + run(DEFAULT_PERIODIC_TASK_PROPERTIES); + } - if (_started) { - long startTime = System.currentTimeMillis(); - LOGGER.info("Start running task: {}", _taskName); - try { - runTask(); - } catch (Exception e) { - LOGGER.error("Caught exception while running task: {}", _taskName, e); + @Override + public final void run(Properties periodicTaskProperties) { + try { + // Don't allow a task to run more than once at a time. + _runLock.lock(); + _running = true; + + String periodicTaskRequestId = periodicTaskProperties.getProperty(PeriodicTask.PROPERTY_KEY_REQUEST_ID); + if (_started) { + long startTime = System.currentTimeMillis(); + LOGGER.info("[TaskRequestId: {}] Start running task: {}", periodicTaskRequestId, _taskName); + try { + runTask(periodicTaskProperties); + } catch (Exception e) { + LOGGER.error("[TaskRequestId: {}] Caught exception while running task: {}", periodicTaskRequestId, _taskName, e); + } + LOGGER.info("[TaskRequestId: {}] Finish running task: {} in {}ms", periodicTaskRequestId, _taskName, System.currentTimeMillis() - startTime); + } else { + LOGGER.warn("[TaskRequestId: {}] Task: {} is skipped because it is not started or already stopped", periodicTaskRequestId, _taskName); } - LOGGER.info("Finish running task: {} in {}ms", _taskName, System.currentTimeMillis() - startTime); - } else { - LOGGER.warn("Task: {} is skipped because it is not started or already stopped", _taskName); + } finally { + _runLock.unlock(); + _running = false; } - - _running = false; } /** * Executes the task. This method should early terminate if {@code started} flag is set to false by {@link #stop()} * during execution. */ - protected abstract void runTask(); + protected abstract void runTask(Properties periodicTaskProperties); /** * {@inheritDoc} @@ -147,28 +178,22 @@ public abstract class BasePeriodicTask implements PeriodicTask { LOGGER.warn("Task: {} is not started", _taskName); return; } + long startTimeMs = System.currentTimeMillis(); _started = false; - if (_running) { - long startTimeMs = System.currentTimeMillis(); - long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS; - LOGGER.info("Task: {} is running, wait for at most {}ms for it to finish", _taskName, remainingTimeMs); - while (_running && remainingTimeMs > 0L) { - long sleepTimeMs = Long.min(remainingTimeMs, 1000L); - remainingTimeMs -= sleepTimeMs; - try { - Thread.sleep(sleepTimeMs); - } catch (InterruptedException e) { - LOGGER.error("Caught InterruptedException while waiting for task: {} to finish", _taskName); - Thread.currentThread().interrupt(); - break; - } - } - long waitTimeMs = System.currentTimeMillis() - startTimeMs; - if (_running) { - LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs); + try { + // check if task is done running, or wait for the task to get done, by trying to acquire runLock. + if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Task {} could not be stopped within timeout of {}ms", _taskName, MAX_PERIODIC_TASK_STOP_TIME_MILLIS); } else { - LOGGER.info("Task: {} is finished in {}ms", waitTimeMs); + LOGGER.info("Task {} successfully stopped in {}ms", _taskName, System.currentTimeMillis() - startTimeMs); + } + } catch (InterruptedException ie) { + LOGGER.error("Caught InterruptedException while waiting for task: {} to finish", _taskName); + Thread.currentThread().interrupt(); + } finally { + if (_runLock.isHeldByCurrentThread()) { + _runLock.unlock(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java index ee6c68b..4a5e120 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.periodictask; +import java.util.Properties; import javax.annotation.concurrent.ThreadSafe; @@ -28,6 +29,10 @@ import javax.annotation.concurrent.ThreadSafe; @ThreadSafe public interface PeriodicTask extends Runnable { + // PeriodicTask objects may take a {@link Properties} object. Define all the keys property keys here. + String PROPERTY_KEY_REQUEST_ID = "requestId"; + String PROPERTY_KEY_TABLE_NAME = "tableNameWithType"; + /** * Returns the periodic task name. * @return task name. @@ -60,6 +65,12 @@ public interface PeriodicTask extends Runnable { void run(); /** + * Execute the task with specified {@link Properties}. + * @param periodicTaskProperties Properties used by {@link PeriodicTask} during execution. + */ + void run(Properties periodicTaskProperties); + + /** * Stops the periodic task and performs necessary cleanups. Should be called after removing the periodic task from the * scheduler. Should be called after {@link #start()} getting called. */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java index ffbc34a..8e42adf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.periodictask; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -28,7 +29,8 @@ import org.slf4j.LoggerFactory; /** - * Periodic task scheduler will schedule a list of tasks based on their initial delay time and interval time. + * Periodic task scheduler will schedule a list of tasks based on their initial delay time and interval time. Tasks + * can also scheduled of immediate execution by calling the scheduleNow() method. */ public class PeriodicTaskScheduler { private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskScheduler.class); @@ -109,4 +111,62 @@ public class PeriodicTaskScheduler { _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop); } } + + /** @return true if task with given name exists; otherwise, false. */ + public boolean hasTask(String periodicTaskName) { + for (PeriodicTask task : _tasksWithValidInterval) { + if (task.getTaskName().equals(periodicTaskName)) { + return true; + } + } + return false; + } + + /** @return List of tasks name that will run periodically. */ + public List<String> getTaskNames() { + List<String> taskNameList = new ArrayList<>(); + for (PeriodicTask task : _tasksWithValidInterval) { + taskNameList.add(task.getTaskName()); + } + return taskNameList; + } + + private PeriodicTask getPeriodicTask(String periodicTaskName) { + for (PeriodicTask task : _tasksWithValidInterval) { + if (task.getTaskName().equals(periodicTaskName)) { + return task; + } + } + return null; + } + + /** Execute {@link PeriodicTask} immediately on the specified table. */ + public void scheduleNow(String periodicTaskName, Properties periodicTaskProperties) { + // During controller deployment, each controller can have a slightly different list of periodic tasks if we add, + // remove, or rename periodic task. To avoid this situation, we check again (besides the check at controller API + // level) whether the periodic task exists. + PeriodicTask periodicTask = getPeriodicTask(periodicTaskName); + if (periodicTask == null) { + LOGGER.error("Unknown Periodic Task " + periodicTaskName); + return; + } + + String taskRequestId = periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_REQUEST_ID).toString(); + LOGGER.info( + "[TaskRequestId: {}] Schedule task '{}' to run immediately. If the task is already running, this run will wait until the current run finishes.", + taskRequestId, periodicTaskName); + _executorService.schedule(() -> { + try { + // Run the periodic task using the specified parameters. The call to run() method will block if another thread + // (the periodic execution thread or another thread calling this method) is already in the process of + // running the same task. + periodicTask.run(periodicTaskProperties); + } catch (Throwable t) { + // catch all errors to prevent subsequent executions from being silently suppressed + // Ref: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit- + LOGGER.error("[TaskRequestId: {}] Caught exception while attempting to execute named periodic task: {}", + taskRequestId, periodicTask.getTaskName(), t); + } + }, 0, TimeUnit.SECONDS); + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java index a4d581d..a033e06 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java @@ -21,8 +21,11 @@ package org.apache.pinot.core.periodictask; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.testng.Assert; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -45,7 +48,7 @@ public class PeriodicTaskSchedulerTest { } @Override - protected void runTask() { + protected void runTask(Properties periodicTaskProperties) { runCalled.set(true); } @@ -83,7 +86,7 @@ public class PeriodicTaskSchedulerTest { } @Override - protected void runTask() { + protected void runTask(Properties periodicTaskProperties) { numTimesRunCalled.getAndIncrement(); } @@ -104,4 +107,82 @@ public class PeriodicTaskSchedulerTest { assertEquals(numTimesRunCalled.get(), numTasks * 2); assertEquals(numTimesStopCalled.get(), numTasks); } + + /** + * Test that {@link PeriodicTaskScheduler} is thread safe and does not run the same task more than once at any time. + * This is done by attempting to run the same task object in 20 different threads at the same time. While the test + * case launches 20 threads to keep {@link PeriodicTaskScheduler} busy, it waits for only around half of them to + * complete. The test case then checks whether the threads that did not complete execution were waiting to execute + * (i.e they had requested execution, but had not executed yet). This "waiting" indicates that task execution was + * being properly synchronized (otherwise all the tasks would have just run immediately). 'isRunning' variable within + * the task is used to check that the task is not executing more than once at any given time. + */ + @Test + public void testConcurrentExecutionOfSameTask() throws Exception { + // Number of threads to run + final int numThreads = 20; + + // Count number of threads that requested execution. + final AtomicInteger attempts = new AtomicInteger(); + + // Countdown latch to ensure that this test case will wait only for around half the tasks to complete. + final CountDownLatch countDownLatch = new CountDownLatch(numThreads/2); + + // Create periodic task. + PeriodicTask task = new BasePeriodicTask("TestTask", 1L, 0L) { + private volatile boolean isRunning = false; + @Override + protected void runTask(Properties periodicTaskProperties) { + try { + if (isRunning) { + // fail since task is already running in another thread. + Assert.fail("More than one thread attempting to execute task at the same time."); + } + isRunning = true; + Thread.sleep(200); + countDownLatch.countDown(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + isRunning = false; + } + } + }; + + // Start scheduler with periodic task. + List<PeriodicTask> periodicTasks = new ArrayList<>(); + periodicTasks.add(task); + + PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler(); + taskScheduler.init(periodicTasks); + taskScheduler.start(); + + // Create multiple "execute" threads that try to run the same task that is already being run by scheduler + // on a periodic basis. + Thread[] threads = new Thread[numThreads]; + Properties taskProperties = new Properties(); + taskProperties.put(PeriodicTask.PROPERTY_KEY_REQUEST_ID, getClass().getSimpleName()); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + attempts.incrementAndGet(); + taskScheduler.scheduleNow("TestTask", taskProperties); + }); + + threads[i].start(); + try { + threads[i].join(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + // Wait for around half the threads to finish running. + countDownLatch.await(); + + // stop task scheduler. + taskScheduler.stop(); + + // Confirm that all threads requested execution, even though only half the threads completed execution. + Assert.assertEquals(attempts.get(), numThreads); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org