amrishlal commented on a change in pull request #7174: URL: https://github.com/apache/pinot/pull/7174#discussion_r693645689
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java ########## @@ -55,14 +57,25 @@ public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long } @Override - protected final void runTask() { + protected final void runTask(Properties periodicTaskProperties) { _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L); try { + // Check if we have a specific table against which this task needs to be run. + String propTableNameWithType = (String) periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME); + // Process the tables that are managed by this controller List<String> tablesToProcess = new ArrayList<>(); - for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) { - if (_leadControllerManager.isLeaderForTable(tableNameWithType)) { - tablesToProcess.add(tableNameWithType); + if (propTableNameWithType == null){ + // Table name is not available, so task should run on all tables for which this controller is the lead. + for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) { + if (_leadControllerManager.isLeaderForTable(tableNameWithType)) { + tablesToProcess.add(tableNameWithType); + } + } + } else { + // Table name is available, so task should run only on the specified table. + if (_leadControllerManager.isLeaderForTable(propTableNameWithType)) { + tablesToProcess.add(propTableNameWithType); } } processTables(tablesToProcess); Review comment: Fixed. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java ########## @@ -29,21 +32,33 @@ @ThreadSafe public abstract class BasePeriodicTask implements PeriodicTask { private static final Logger LOGGER = LoggerFactory.getLogger(BasePeriodicTask.class); + private static final String DEFAULT_REQUEST_ID = "auto"; // Wait for at most 30 seconds while calling stop() for task to terminate private static final long MAX_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L; protected final String _taskName; protected final long _intervalInSeconds; protected final long _initialDelayInSeconds; + protected final ReentrantLock _runLock; private volatile boolean _started; private volatile boolean _running; + // Default properties that tasks may use during execution. This variable is private and does not have any get or set + // methods to prevent subclasses from gaining direct access to this variable. See run(Properties) method to see how + // properties are passed and used during task execution. + private Properties _activePeriodicTaskProperties; Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org