praveenc7 commented on code in PR #16340: URL: https://github.com/apache/pinot/pull/16340#discussion_r2243480650
########## pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java: ########## @@ -18,137 +18,50 @@ */ package org.apache.pinot.core.accounting; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.utils.CommonConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class WorkloadBudgetManager { - private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadBudgetManager.class); - - private long _enforcementWindowMs; - private final ConcurrentHashMap<String, Budget> _workloadBudgets = new ConcurrentHashMap<>(); - private final ScheduledExecutorService _resetScheduler = Executors.newSingleThreadScheduledExecutor(); - private volatile boolean _isEnabled; - - public WorkloadBudgetManager(PinotConfiguration config) { - _isEnabled = config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION, - CommonConstants.Accounting.DEFAULT_WORKLOAD_ENABLE_COST_COLLECTION); - // Return an object even if disabled. All functionalities of this class will be noops. - if (!_isEnabled) { - LOGGER.info("WorkloadBudgetManager is disabled. Creating a no-op instance."); - return; - } - _enforcementWindowMs = config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS, - CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS); - startBudgetResetTask(); - LOGGER.info("WorkloadBudgetManager initialized with enforcement window: {}ms", _enforcementWindowMs); - } - - public void shutdown() { - if (!_isEnabled) { - return; - } - _isEnabled = false; - _resetScheduler.shutdownNow(); - try { - if (!_resetScheduler.awaitTermination(5, TimeUnit.SECONDS)) { - LOGGER.warn("Reset scheduler did not terminate in time"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - LOGGER.info("WorkloadBudgetManager has been shut down."); - } +public interface WorkloadBudgetManager { + /** Shut the manager down and release resources. */ + void shutdown(); /** * Adds or updates budget for a workload (Thread-Safe). + * + * @param workload Logical workload identifier + * @param cpuBudgetNs CPU budget for the enforcement window, in ns + * @param memoryBudgetBytes Memory budget for the window, in bytes */ - public void addOrUpdateWorkload(String workload, long cpuBudgetNs, long memoryBudgetBytes) { - if (!_isEnabled) { - LOGGER.info("WorkloadBudgetManager is disabled. Not adding/updating workload: {}", workload); - return; - } - - _workloadBudgets.compute(workload, (key, existingBudget) -> new Budget(cpuBudgetNs, memoryBudgetBytes)); - LOGGER.info("Updated budget for workload: {} -> CPU: {}ns, Memory: {} bytes", workload, cpuBudgetNs, - memoryBudgetBytes); - } + void addOrUpdateWorkload(String workload, long cpuBudgetNs, long memoryBudgetBytes); /** * Attempts to charge CPU and memory usage against the workload budget (Thread-Safe). - * Returns the remaining budget for CPU and memory after charge. + * @return the remaining budget for CPU and memory after charge. */ - public BudgetStats tryCharge(String workload, long cpuUsedNs, long memoryUsedBytes) { - if (!_isEnabled) { - return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE); - } - - Budget budget = _workloadBudgets.get(workload); - if (budget == null) { - LOGGER.warn("No budget found for workload: {}", workload); - return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE); - } - return budget.tryCharge(cpuUsedNs, memoryUsedBytes); - } + WorkloadBudgetManager.BudgetStats tryCharge(String workload, long cpuUsedNs, long memoryUsedBytes); /** - * Retrieves the remaining budget for a specific workload. + * @return the remaining budget for a specific workload. */ - public BudgetStats getRemainingBudgetForWorkload(String workload) { - if (!_isEnabled) { - return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE); - } - - Budget budget = _workloadBudgets.get(workload); - return budget != null ? budget.getStats() : new BudgetStats(0, 0); - } + WorkloadBudgetManager.BudgetStats getRemainingBudgetForWorkload(String workload); /** - * Retrieves the total remaining budget across all workloads (Thread-Safe). + * @return the total remaining budget across all workloads (Thread-Safe). */ - public BudgetStats getRemainingBudgetAcrossAllWorkloads() { - if (!_isEnabled) { - return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE); - } - - long totalCpuRemaining = - _workloadBudgets.values().stream().mapToLong(budget -> budget.getStats()._cpuRemaining).sum(); - long totalMemRemaining = - _workloadBudgets.values().stream().mapToLong(budget -> budget.getStats()._memoryRemaining).sum(); - return new BudgetStats(totalCpuRemaining, totalMemRemaining); - } + WorkloadBudgetManager.BudgetStats getRemainingBudgetAcrossAllWorkloads(); /** - * Periodically resets budgets at the end of each enforcement window (Thread-Safe). + * Collects workload stats for CPU and memory usage. + * + * @param workload Workload identifier + * @param cpuUsedNs CPU used by the workload, in ns + * @param memoryUsedBytes Memory used by the workload, in bytes */ - private void startBudgetResetTask() { - // TODO(Vivek): Reduce logging verbosity. Maybe make it debug logs. - LOGGER.info("Starting budget reset task with enforcement window: {}ms", _enforcementWindowMs); - _resetScheduler.scheduleAtFixedRate(() -> { - LOGGER.debug("Resetting all workload budgets."); - // Also print the budget used in the last enforcement window. - _workloadBudgets.forEach((workload, budget) -> { - BudgetStats stats = budget.getStats(); - LOGGER.debug("Workload: {} -> CPU: {}ns, Memory: {} bytes", workload, stats._cpuRemaining, - stats._memoryRemaining); - // Reset the budget. - budget.reset(); - }); - }, _enforcementWindowMs, _enforcementWindowMs, TimeUnit.MILLISECONDS); - } + void collectWorkloadStats(String workload, long cpuUsedNs, long memoryUsedBytes); /** * Represents remaining budget stats. */ - public static class BudgetStats { Review Comment: Modifier 'static' is redundant for inner classes of interfaces -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org