praveenc7 commented on code in PR #16340:
URL: https://github.com/apache/pinot/pull/16340#discussion_r2243480650


##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -18,137 +18,50 @@
  */
 package org.apache.pinot.core.accounting;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class WorkloadBudgetManager {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(WorkloadBudgetManager.class);
-
-  private long _enforcementWindowMs;
-  private final ConcurrentHashMap<String, Budget> _workloadBudgets = new 
ConcurrentHashMap<>();
-  private final ScheduledExecutorService _resetScheduler = 
Executors.newSingleThreadScheduledExecutor();
-  private volatile boolean _isEnabled;
-
-  public WorkloadBudgetManager(PinotConfiguration config) {
-    _isEnabled = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION,
-        CommonConstants.Accounting.DEFAULT_WORKLOAD_ENABLE_COST_COLLECTION);
-    // Return an object even if disabled. All functionalities of this class 
will be noops.
-    if (!_isEnabled) {
-      LOGGER.info("WorkloadBudgetManager is disabled. Creating a no-op 
instance.");
-      return;
-    }
-    _enforcementWindowMs = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
-        CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
-    startBudgetResetTask();
-    LOGGER.info("WorkloadBudgetManager initialized with enforcement window: 
{}ms", _enforcementWindowMs);
-  }
-
-  public void shutdown() {
-    if (!_isEnabled) {
-      return;
-    }
-    _isEnabled = false;
-    _resetScheduler.shutdownNow();
-    try {
-      if (!_resetScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
-        LOGGER.warn("Reset scheduler did not terminate in time");
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-    LOGGER.info("WorkloadBudgetManager has been shut down.");
-  }
 
+public interface WorkloadBudgetManager {
+  /** Shut the manager down and release resources. */
+  void shutdown();
 
   /**
    * Adds or updates budget for a workload (Thread-Safe).
+   *
+   * @param workload           Logical workload identifier
+   * @param cpuBudgetNs        CPU budget for the enforcement window, in ns
+   * @param memoryBudgetBytes  Memory budget for the window, in bytes
    */
-  public void addOrUpdateWorkload(String workload, long cpuBudgetNs, long 
memoryBudgetBytes) {
-    if (!_isEnabled) {
-      LOGGER.info("WorkloadBudgetManager is disabled. Not adding/updating 
workload: {}", workload);
-      return;
-    }
-
-    _workloadBudgets.compute(workload, (key, existingBudget) -> new 
Budget(cpuBudgetNs, memoryBudgetBytes));
-    LOGGER.info("Updated budget for workload: {} -> CPU: {}ns, Memory: {} 
bytes", workload, cpuBudgetNs,
-        memoryBudgetBytes);
-  }
+  void addOrUpdateWorkload(String workload, long cpuBudgetNs, long 
memoryBudgetBytes);
 
   /**
    * Attempts to charge CPU and memory usage against the workload budget 
(Thread-Safe).
-   * Returns the remaining budget for CPU and memory after charge.
+   * @return the remaining budget for CPU and memory after charge.
    */
-  public BudgetStats tryCharge(String workload, long cpuUsedNs, long 
memoryUsedBytes) {
-    if (!_isEnabled) {
-      return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE);
-    }
-
-    Budget budget = _workloadBudgets.get(workload);
-    if (budget == null) {
-      LOGGER.warn("No budget found for workload: {}", workload);
-      return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE);
-    }
-    return budget.tryCharge(cpuUsedNs, memoryUsedBytes);
-  }
+  WorkloadBudgetManager.BudgetStats tryCharge(String workload, long cpuUsedNs, 
long memoryUsedBytes);
 
   /**
-   * Retrieves the remaining budget for a specific workload.
+   * @return the remaining budget for a specific workload.
    */
-  public BudgetStats getRemainingBudgetForWorkload(String workload) {
-    if (!_isEnabled) {
-      return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE);
-    }
-
-    Budget budget = _workloadBudgets.get(workload);
-    return budget != null ? budget.getStats() : new BudgetStats(0, 0);
-  }
+  WorkloadBudgetManager.BudgetStats getRemainingBudgetForWorkload(String 
workload);
 
   /**
-   * Retrieves the total remaining budget across all workloads (Thread-Safe).
+   * @return the total remaining budget across all workloads (Thread-Safe).
    */
-  public BudgetStats getRemainingBudgetAcrossAllWorkloads() {
-    if (!_isEnabled) {
-      return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE);
-    }
-
-    long totalCpuRemaining =
-        _workloadBudgets.values().stream().mapToLong(budget -> 
budget.getStats()._cpuRemaining).sum();
-    long totalMemRemaining =
-        _workloadBudgets.values().stream().mapToLong(budget -> 
budget.getStats()._memoryRemaining).sum();
-    return new BudgetStats(totalCpuRemaining, totalMemRemaining);
-  }
+  WorkloadBudgetManager.BudgetStats getRemainingBudgetAcrossAllWorkloads();
 
   /**
-   * Periodically resets budgets at the end of each enforcement window 
(Thread-Safe).
+   * Collects workload stats for CPU and memory usage.
+   *
+   * @param workload        Workload identifier
+   * @param cpuUsedNs       CPU used by the workload, in ns
+   * @param memoryUsedBytes Memory used by the workload, in bytes
    */
-  private void startBudgetResetTask() {
-    // TODO(Vivek): Reduce logging verbosity. Maybe make it debug logs.
-    LOGGER.info("Starting budget reset task with enforcement window: {}ms", 
_enforcementWindowMs);
-    _resetScheduler.scheduleAtFixedRate(() -> {
-      LOGGER.debug("Resetting all workload budgets.");
-      // Also print the budget used in the last enforcement window.
-      _workloadBudgets.forEach((workload, budget) -> {
-        BudgetStats stats = budget.getStats();
-        LOGGER.debug("Workload: {} -> CPU: {}ns, Memory: {} bytes", workload, 
stats._cpuRemaining,
-            stats._memoryRemaining);
-        // Reset the budget.
-        budget.reset();
-      });
-    }, _enforcementWindowMs, _enforcementWindowMs, TimeUnit.MILLISECONDS);
-  }
+  void collectWorkloadStats(String workload, long cpuUsedNs, long 
memoryUsedBytes);
 
   /**
    * Represents remaining budget stats.
    */
-  public static class BudgetStats {

Review Comment:
   Modifier 'static' is redundant for inner classes of interfaces 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to