amrishlal commented on a change in pull request #7174: URL: https://github.com/apache/pinot/pull/7174#discussion_r691550086
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/ControllerMessageHandlerFactory.java ########## @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller; + +import java.util.Properties; +import org.apache.helix.NotificationContext; +import org.apache.helix.messaging.handling.HelixTaskResult; +import org.apache.helix.messaging.handling.MessageHandler; +import org.apache.helix.messaging.handling.MessageHandlerFactory; +import org.apache.helix.model.Message; +import org.apache.pinot.common.messages.RunPeriodicTaskMessage; +import org.apache.pinot.core.periodictask.PeriodicTask; +import org.apache.pinot.core.periodictask.PeriodicTaskScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** Factory class for creating message handlers for incoming helix messages. */ +public class ControllerMessageHandlerFactory implements MessageHandlerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerMessageHandlerFactory.class); + private static final String USER_DEFINED_MSG_STRING = Message.MessageType.USER_DEFINE_MSG.toString(); + + private final PeriodicTaskScheduler _periodicTaskScheduler; + + public ControllerMessageHandlerFactory(PeriodicTaskScheduler periodicTaskScheduler) { + _periodicTaskScheduler = periodicTaskScheduler; + } + + @Override + public MessageHandler createHandler(Message message, NotificationContext notificationContext) { + String messageType = message.getMsgSubType(); + if (messageType.equals(RunPeriodicTaskMessage.RUN_PERIODIC_TASK_MSG_SUB_TYPE)) { + return new RunPeriodicTaskMessageHandler(new RunPeriodicTaskMessage(message), notificationContext, + _periodicTaskScheduler); + } + + LOGGER.warn("Unknown message type {} received by controller. ", messageType); + return null; Review comment: Fixed. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/ControllerMessageHandlerFactory.java ########## @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller; + +import java.util.Properties; +import org.apache.helix.NotificationContext; +import org.apache.helix.messaging.handling.HelixTaskResult; +import org.apache.helix.messaging.handling.MessageHandler; +import org.apache.helix.messaging.handling.MessageHandlerFactory; +import org.apache.helix.model.Message; +import org.apache.pinot.common.messages.RunPeriodicTaskMessage; +import org.apache.pinot.core.periodictask.PeriodicTask; +import org.apache.pinot.core.periodictask.PeriodicTaskScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** Factory class for creating message handlers for incoming helix messages. */ +public class ControllerMessageHandlerFactory implements MessageHandlerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerMessageHandlerFactory.class); + private static final String USER_DEFINED_MSG_STRING = Message.MessageType.USER_DEFINE_MSG.toString(); + + private final PeriodicTaskScheduler _periodicTaskScheduler; + + public ControllerMessageHandlerFactory(PeriodicTaskScheduler periodicTaskScheduler) { + _periodicTaskScheduler = periodicTaskScheduler; + } + + @Override + public MessageHandler createHandler(Message message, NotificationContext notificationContext) { + String messageType = message.getMsgSubType(); + if (messageType.equals(RunPeriodicTaskMessage.RUN_PERIODIC_TASK_MSG_SUB_TYPE)) { + return new RunPeriodicTaskMessageHandler(new RunPeriodicTaskMessage(message), notificationContext, + _periodicTaskScheduler); + } + + LOGGER.warn("Unknown message type {} received by controller. ", messageType); + return null; + } + + @Override + public String getMessageType() { + return USER_DEFINED_MSG_STRING; + } + + @Override + public void reset() { + } + + /** Message handler for {@link RunPeriodicTaskMessage} message. */ + private static class RunPeriodicTaskMessageHandler extends MessageHandler { + private final String _periodicTaskReqeustId; + private final String _periodicTaskName; + private final String _tableNameWithType; + private final PeriodicTaskScheduler _periodicTaskScheduler; + + RunPeriodicTaskMessageHandler(RunPeriodicTaskMessage message, NotificationContext context, + PeriodicTaskScheduler periodicTaskScheduler) { + super(message, context); + _periodicTaskReqeustId = message.getPeriodicTaskRequestId(); + _periodicTaskName = message.getPeriodicTaskName(); + _tableNameWithType = message.getTableNameWithType(); + _periodicTaskScheduler = periodicTaskScheduler; + } + + @Override + public HelixTaskResult handleMessage() + throws InterruptedException { + LOGGER.info("[TaskRequestId: {}] Handling RunPeriodicTaskMessage by executing task {}", _periodicTaskReqeustId, + _periodicTaskName); + _periodicTaskScheduler + .scheduleNow(_periodicTaskName, createTaskProperties(_periodicTaskReqeustId, _tableNameWithType)); + HelixTaskResult helixTaskResult = new HelixTaskResult(); + helixTaskResult.setSuccess(true); + return helixTaskResult; + } + + @Override + public void onError(Exception e, ErrorCode errorCode, ErrorType errorType) { + LOGGER.error("[TaskRequestId: {}] Message handling error.", _periodicTaskReqeustId, e); + } + + private static Properties createTaskProperties(String periodicTaskReqeustId, String tableNameWithType) { Review comment: Fixed. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java ########## @@ -59,6 +64,12 @@ @Override void run(); + /** + * Execute the task once. This method will calls the {@link #run} method. Review comment: Fixed. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java ########## @@ -149,27 +190,17 @@ public final synchronized void stop() { } _started = false; - if (_running) { - long startTimeMs = System.currentTimeMillis(); - long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS; - LOGGER.info("Task: {} is running, wait for at most {}ms for it to finish", _taskName, remainingTimeMs); - while (_running && remainingTimeMs > 0L) { - long sleepTimeMs = Long.min(remainingTimeMs, 1000L); - remainingTimeMs -= sleepTimeMs; - try { - Thread.sleep(sleepTimeMs); - } catch (InterruptedException e) { - LOGGER.error("Caught InterruptedException while waiting for task: {} to finish", _taskName); - Thread.currentThread().interrupt(); - break; - } - } - long waitTimeMs = System.currentTimeMillis() - startTimeMs; - if (_running) { - LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs); + try { + // check if task is done running, or wait for the task to get done, by trying to acquire runLock. + if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Task: {} did not finish within timeout of {}ms", MAX_PERIODIC_TASK_STOP_TIME_MILLIS); } else { - LOGGER.info("Task: {} is finished in {}ms", waitTimeMs); + LOGGER.info("Task: {} finished within timeout of {}ms", MAX_PERIODIC_TASK_STOP_TIME_MILLIS); } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + _runLock.unlock(); Review comment: Good catch. Fixed. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java ########## @@ -149,27 +190,17 @@ public final synchronized void stop() { } _started = false; - if (_running) { - long startTimeMs = System.currentTimeMillis(); - long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS; - LOGGER.info("Task: {} is running, wait for at most {}ms for it to finish", _taskName, remainingTimeMs); - while (_running && remainingTimeMs > 0L) { - long sleepTimeMs = Long.min(remainingTimeMs, 1000L); - remainingTimeMs -= sleepTimeMs; - try { - Thread.sleep(sleepTimeMs); - } catch (InterruptedException e) { - LOGGER.error("Caught InterruptedException while waiting for task: {} to finish", _taskName); - Thread.currentThread().interrupt(); - break; - } - } - long waitTimeMs = System.currentTimeMillis() - startTimeMs; - if (_running) { - LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs); + try { + // check if task is done running, or wait for the task to get done, by trying to acquire runLock. + if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Task: {} did not finish within timeout of {}ms", MAX_PERIODIC_TASK_STOP_TIME_MILLIS); Review comment: Fixed. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java ########## @@ -149,27 +190,17 @@ public final synchronized void stop() { } _started = false; - if (_running) { - long startTimeMs = System.currentTimeMillis(); - long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS; - LOGGER.info("Task: {} is running, wait for at most {}ms for it to finish", _taskName, remainingTimeMs); - while (_running && remainingTimeMs > 0L) { - long sleepTimeMs = Long.min(remainingTimeMs, 1000L); - remainingTimeMs -= sleepTimeMs; - try { - Thread.sleep(sleepTimeMs); - } catch (InterruptedException e) { - LOGGER.error("Caught InterruptedException while waiting for task: {} to finish", _taskName); - Thread.currentThread().interrupt(); - break; - } - } - long waitTimeMs = System.currentTimeMillis() - startTimeMs; - if (_running) { - LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs); + try { + // check if task is done running, or wait for the task to get done, by trying to acquire runLock. + if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Task: {} did not finish within timeout of {}ms", MAX_PERIODIC_TASK_STOP_TIME_MILLIS); } else { - LOGGER.info("Task: {} is finished in {}ms", waitTimeMs); + LOGGER.info("Task: {} finished within timeout of {}ms", MAX_PERIODIC_TASK_STOP_TIME_MILLIS); } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); Review comment: Fixed. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java ########## @@ -29,21 +33,31 @@ @ThreadSafe public abstract class BasePeriodicTask implements PeriodicTask { private static final Logger LOGGER = LoggerFactory.getLogger(BasePeriodicTask.class); + private static final String DEFAULT_REQUEST_ID = "auto"; // Wait for at most 30 seconds while calling stop() for task to terminate private static final long MAX_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L; protected final String _taskName; protected final long _intervalInSeconds; protected final long _initialDelayInSeconds; + protected final ReentrantLock _runLock; private volatile boolean _started; private volatile boolean _running; + // Properties that task may use during execution. null by default. + protected Properties _activePeriodicTaskProperties; Review comment: Pending... ########## File path: pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java ########## @@ -149,27 +190,17 @@ public final synchronized void stop() { } _started = false; - if (_running) { - long startTimeMs = System.currentTimeMillis(); - long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS; - LOGGER.info("Task: {} is running, wait for at most {}ms for it to finish", _taskName, remainingTimeMs); - while (_running && remainingTimeMs > 0L) { - long sleepTimeMs = Long.min(remainingTimeMs, 1000L); - remainingTimeMs -= sleepTimeMs; - try { - Thread.sleep(sleepTimeMs); - } catch (InterruptedException e) { - LOGGER.error("Caught InterruptedException while waiting for task: {} to finish", _taskName); - Thread.currentThread().interrupt(); - break; - } - } - long waitTimeMs = System.currentTimeMillis() - startTimeMs; - if (_running) { - LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs); + try { + // check if task is done running, or wait for the task to get done, by trying to acquire runLock. + if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Task: {} did not finish within timeout of {}ms", MAX_PERIODIC_TASK_STOP_TIME_MILLIS); } else { - LOGGER.info("Task: {} is finished in {}ms", waitTimeMs); + LOGGER.info("Task: {} finished within timeout of {}ms", MAX_PERIODIC_TASK_STOP_TIME_MILLIS); Review comment: Fixed. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java ########## @@ -54,14 +55,27 @@ public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long _controllerMetrics = controllerMetrics; } + // Determine if this task can run on the specified table. Task can run on all tables for which the controller is lead + // if "tablename" property is not set. However, if "tablename" property is set (by calling the /periodictask/run + // controller API), then the task will only run on the specified by the "tablename" property key. + private boolean shouldRunTaskForTable(String tableNameWithType) { + if (_leadControllerManager.isLeaderForTable(tableNameWithType)) { + String propTableNameWithType = (String) _activePeriodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME); + return propTableNameWithType == null || propTableNameWithType.equals(tableNameWithType); + } + return false; + } + @Override protected final void runTask() { _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L); try { // Process the tables that are managed by this controller + // TODO: creating tablesToProcess list below is redundant since processTables will unroll the list anyway. This Review comment: Pending... ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java ########## @@ -54,14 +55,27 @@ public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long _controllerMetrics = controllerMetrics; } + // Determine if this task can run on the specified table. Task can run on all tables for which the controller is lead + // if "tablename" property is not set. However, if "tablename" property is set (by calling the /periodictask/run + // controller API), then the task will only run on the specified by the "tablename" property key. + private boolean shouldRunTaskForTable(String tableNameWithType) { + if (_leadControllerManager.isLeaderForTable(tableNameWithType)) { + String propTableNameWithType = (String) _activePeriodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME); + return propTableNameWithType == null || propTableNameWithType.equals(tableNameWithType); + } + return false; + } + @Override protected final void runTask() { _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L); try { // Process the tables that are managed by this controller + // TODO: creating tablesToProcess list below is redundant since processTables will unroll the list anyway. This + // needs to be cleaned up sometime. List<String> tablesToProcess = new ArrayList<>(); for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) { - if (_leadControllerManager.isLeaderForTable(tableNameWithType)) { + if (shouldRunTaskForTable(tableNameWithType)) { Review comment: Pending... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org