Jackie-Jiang commented on code in PR #16728:
URL: https://github.com/apache/pinot/pull/16728#discussion_r2373777256
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadAggregator.java:
##########
@@ -18,165 +18,134 @@
*/
package org.apache.pinot.core.accounting;
-import java.util.ArrayList;
+import com.google.common.collect.Sets;
+import it.unimi.dsi.fastutil.longs.LongLongMutablePair;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
import org.apache.pinot.spi.config.instance.InstanceType;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.trace.Tracing;
-import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WorkloadAggregator implements ResourceAggregator {
private static final Logger LOGGER =
LoggerFactory.getLogger(WorkloadAggregator.class);
- private final boolean _isThreadCPUSamplingEnabled;
- private final boolean _isThreadMemorySamplingEnabled;
- private final PinotConfiguration _config;
- private final InstanceType _instanceType;
- private final String _instanceId;
-
- // For one time concurrent update of stats. This is to provide stats
collection for parts that are not
- // performance sensitive and workload_name is not known earlier (eg: broker
inbound netty request)
- private final ConcurrentHashMap<String, Long>
_concurrentTaskCPUStatsAggregator = new ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, Long>
_concurrentTaskMemStatsAggregator = new ConcurrentHashMap<>();
+ // Tracks the CPU and memory usage for workloads not tracked by any thread.
+ // E.g. request ser/de where thread execution context cannot be set up
beforehand; tasks already finished.
+ private final ConcurrentHashMap<String, LongLongMutablePair>
_untrackedCpuMemUsage = new ConcurrentHashMap<>();
- private final int _sleepTimeMs;
- private final boolean _enableEnforcement;
+ // Tracks the queries under each workload.
+ private final Map<String, Set<QueryExecutionContext>> _workloadQueriesMap =
new HashMap<>();
- WorkloadBudgetManager _workloadBudgetManager;
+ private final String _instanceId;
+ private final InstanceType _instanceType;
+ private final boolean _cpuSamplingEnabled;
+ private final boolean _memorySamplingEnabled;
Review Comment:
True. I kept them here so that it is consistent with `QueryAggregator`. They
might be useful in the future
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]