Jackie-Jiang commented on a change in pull request #7174: URL: https://github.com/apache/pinot/pull/7174#discussion_r693266380
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java ########## @@ -55,14 +57,25 @@ public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long } @Override - protected final void runTask() { + protected final void runTask(Properties periodicTaskProperties) { _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L); try { + // Check if we have a specific table against which this task needs to be run. + String propTableNameWithType = (String) periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME); + // Process the tables that are managed by this controller List<String> tablesToProcess = new ArrayList<>(); - for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) { - if (_leadControllerManager.isLeaderForTable(tableNameWithType)) { - tablesToProcess.add(tableNameWithType); + if (propTableNameWithType == null){ + // Table name is not available, so task should run on all tables for which this controller is the lead. + for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) { + if (_leadControllerManager.isLeaderForTable(tableNameWithType)) { + tablesToProcess.add(tableNameWithType); + } + } + } else { + // Table name is available, so task should run only on the specified table. + if (_leadControllerManager.isLeaderForTable(propTableNameWithType)) { + tablesToProcess.add(propTableNameWithType); } } processTables(tablesToProcess); Review comment: Only call `processTables` when `tablesToProcess` is not empty to avoid the overhead ########## File path: pinot-common/src/main/java/org/apache/pinot/common/messages/RunPeriodicTaskMessage.java ########## @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.messages; + +import java.util.UUID; +import javax.annotation.Nonnull; +import org.apache.helix.ZNRecord; +import org.apache.helix.model.Message; + + +/** + * Upon receiving this message, Controller will execute the specified PeriodicTask against the tables for which it is + * the lead controller. The message is sent whenever API call for executing a PeriodicTask is invoked. + */ +public class RunPeriodicTaskMessage extends Message { + public static final String RUN_PERIODIC_TASK_MSG_SUB_TYPE = "PERIODIC_TASK"; + private static final String PERIODIC_TASK_REQUEST_ID = "taskRequestId"; + private static final String PERIODIC_TASK_NAME_KEY = "periodicTaskName"; + private static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType"; Review comment: @mcvsubbu @amrishlal Please take a look at `SegmentRefreshMessage` and `RoutingTableRebuildMessage` and see whether we want to match them ########## File path: pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java ########## @@ -29,21 +32,33 @@ @ThreadSafe public abstract class BasePeriodicTask implements PeriodicTask { private static final Logger LOGGER = LoggerFactory.getLogger(BasePeriodicTask.class); + private static final String DEFAULT_REQUEST_ID = "auto"; // Wait for at most 30 seconds while calling stop() for task to terminate private static final long MAX_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L; protected final String _taskName; protected final long _intervalInSeconds; protected final long _initialDelayInSeconds; + protected final ReentrantLock _runLock; private volatile boolean _started; private volatile boolean _running; + // Default properties that tasks may use during execution. This variable is private and does not have any get or set + // methods to prevent subclasses from gaining direct access to this variable. See run(Properties) method to see how + // properties are passed and used during task execution. + private Properties _activePeriodicTaskProperties; Review comment: Make this a constant and rename it ```suggestion private static final Properties DEFAULT_PERIODIC_TASK_PROPERTIES; static { DEFAULT_PERIODIC_TASK_PROPERTIES = new Properties(); DEFAULT_PERIODIC_TASK_PROPERTIES.put(PeriodicTask.PROPERTY_KEY_REQUEST_ID, DEFAULT_REQUEST_ID); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org