gortiz commented on code in PR #16728:
URL: https://github.com/apache/pinot/pull/16728#discussion_r2371297218


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -323,19 +320,21 @@ protected BrokerResponse handleRequest(long requestId, 
String query, SqlNodeAndO
       JsonNode request, @Nullable RequesterIdentity requesterIdentity, 
RequestContext requestContext,
       @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
       throws Exception {
-    QueryThreadContext.setQueryEngine("sse");
     _queryLogger.log(requestId, query);
 
-    //Start instrumentation context. This must not be moved further below 
interspersed into the code.
+    String cid = extractClientRequestId(sqlNodeAndOptions);
+    if (cid == null) {
+      cid = Long.toString(requestId);
+    }
     String workloadName = 
QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions());
-    _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(), 
ThreadExecutionContext.TaskType.SSE,
-        workloadName);
 
-    try {
+    // TODO: Revisit whether we should set deadline here

Review Comment:
   Why should we not? Which other alternatives do we have?



##########
pinot-common/src/main/java/org/apache/pinot/common/function/scalar/InternalFunctions.java:
##########
@@ -72,28 +71,54 @@ public static long startTime(String input) {
   /// This is mostly useful for test and internal usage
   @ScalarFunction
   public static long endTime(String input) {
-    return QueryThreadContext.getActiveDeadlineMs();
+    return 
QueryThreadContext.get().getExecutionContext().getActiveDeadlineMs();
   }
 
-  ///  Returns the [broker id][QueryThreadContext#getBrokerId] of the query.
+  ///  Returns the broker id of the query.
   ///
   /// The input value is not directly used. Instead it is here to control 
whether the function is called during query
   /// optimization or execution. In order to do the latter, a non-constant 
value (like a column) should be passed as
   /// input.
   ///
   /// This is mostly useful for test and internal usage
   @ScalarFunction
-  @Nullable
   public static String brokerId(String input) {
-    return QueryThreadContext.getBrokerId();
+    return QueryThreadContext.get().getExecutionContext().getBrokerId();

Review Comment:
   Isn't it better to provide a static getBrokerId() on ExecutionContext?



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java:
##########
@@ -92,26 +96,122 @@ protected BaseCombineOperator(ResultsBlockMerger<T> 
resultsBlockMerger, List<Ope
     _futures = new Future[_numTasks];
   }
 
+  @Override
+  public List<Operator> getChildOperators() {
+    return _operators;
+  }
+
+  /// Do not check termination in combine operator to ensure [#nextBlock()] 
returns a results block instead of
+  /// throwing exception.
+  @Override
+  protected void checkTermination() {
+  }
+
+  /// Replaces the error block with the terminate exception if exists, and 
attaches the execution statistics to the
+  /// results block.
+  protected BaseResultsBlock 
checkTerminateExceptionAndAttachExecutionStats(BaseResultsBlock resultsBlock) {
+    // For exception results block, check terminate exception and use it as 
the results block if exists. We want to
+    // return the termination reason when query is explicitly terminated.
+    if (resultsBlock instanceof ExceptionResultsBlock) {
+      TerminationException terminateException = 
QueryThreadContext.getTerminateException();
+      if (terminateException != null) {
+        resultsBlock = new ExceptionResultsBlock(terminateException);
+      }
+    }
+    return attachExecutionStats(resultsBlock);
+  }
+
+  /// Creates an exception results block from the given throwable, and 
attaches the execution statistics to it.
+  protected BaseResultsBlock 
createExceptionResultsBlockAndAttachExecutionStats(Exception e, String context) 
{
+    // First check terminate exception and use it as the results block if 
exists. We want to return the termination
+    // reason when query is explicitly terminated.
+    ExceptionResultsBlock resultsBlock;
+    TerminationException terminateException = 
QueryThreadContext.getTerminateException();
+    if (terminateException != null) {
+      resultsBlock = new ExceptionResultsBlock(terminateException);
+    } else {
+      if (e instanceof QueryException) {

Review Comment:
   nit: Couldn't this be simplified with an `else if`?



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java:
##########
@@ -48,8 +47,6 @@
 /**
  * Aggregator that computes resource aggregation for queries. Most of the 
logic from PerQueryCPUMemAccountantFactory is
  * retained here for backward compatibility.
- *
- * TODO: Integrate recent changes in PerQueryCPUMemAccountantFactory
  */
 public class QueryAggregator implements ResourceAggregator {

Review Comment:
   Not included in this PR but, given queries usually aggregate things... The 
name QueryAggregator may be confusing. Can we rename this as 
QueryResourceAggregator?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -96,106 +100,127 @@ public OpChainSchedulerService(ExecutorService 
executorService, int opStatsCache
   }
 
   public void register(OpChain operatorChain) {
+    QueryExecutionContext executionContext = 
QueryThreadContext.get().getExecutionContext();
+    // Check if query is already terminated before acquiring the read lock.
+    checkTermination(operatorChain, executionContext);
     // Acquire read lock for the query to ensure that the query is not 
cancelled while scheduling the operator chain.
     long requestId = operatorChain.getId().getRequestId();
     Lock readLock = getQueryLock(requestId).readLock();
     readLock.lock();
     try {
+      // Check if query is already terminated again after acquiring the read 
lock.
+      checkTermination(operatorChain, executionContext);
       // Do not schedule the operator chain if the query has been cancelled.
       if (_cancelledQueryCache.getIfPresent(requestId) != null) {
         LOGGER.debug("({}): Query has been cancelled", operatorChain);
+        executionContext.terminate(QueryErrorCode.QUERY_CANCELLATION, 
"Cancelled by user");
         throw new QueryCancelledException(
             "Query has been cancelled before op-chain: " + 
operatorChain.getId() + " being scheduled");
       } else {
-        registerInternal(operatorChain);
+        registerInternal(operatorChain, executionContext);
       }
     } finally {
       readLock.unlock();
     }
   }
 
-  private void registerInternal(OpChain operatorChain) {
+  private void checkTermination(OpChain operatorChain, QueryExecutionContext 
executionContext) {
+    TerminationException terminateException = 
executionContext.getTerminateException();
+    if (terminateException != null) {
+      LOGGER.debug("({}): Query has been terminated", operatorChain, 
terminateException);
+      if (terminateException.getErrorCode() == 
QueryErrorCode.QUERY_CANCELLATION) {
+        throw new QueryCancelledException(
+            "Query has been cancelled before op-chain: " + 
operatorChain.getId() + " being scheduled");
+      } else {
+        throw new QueryCancelledException(
+            "Query has been terminated before op-chain: " + 
operatorChain.getId() + " being scheduled: "
+                + terminateException.getErrorCode() + " - " + 
terminateException.getMessage(), terminateException);
+      }
+    }
+  }
+
+  private void registerInternal(OpChain operatorChain, QueryExecutionContext 
executionContext) {
     OpChainId opChainId = operatorChain.getId();
     MultiStageOperator rootOperator = operatorChain.getRoot();
-    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
+    _opChainCache.put(opChainId, Pair.of(rootOperator, executionContext));
+
+    // Create a ListenableFutureTask to ensure the opChain is cancelled even 
if the task is not scheduled
+    ListenableFutureTask<Void> listenableFutureTask = 
ListenableFutureTask.create(new TraceRunnable() {

Review Comment:
   nit: this can be done without Guava with CompletableFutures, but 
ListenableFuture is fine as well



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java:
##########
@@ -92,26 +96,122 @@ protected BaseCombineOperator(ResultsBlockMerger<T> 
resultsBlockMerger, List<Ope
     _futures = new Future[_numTasks];
   }
 
+  @Override
+  public List<Operator> getChildOperators() {
+    return _operators;
+  }
+
+  /// Do not check termination in combine operator to ensure [#nextBlock()] 
returns a results block instead of
+  /// throwing exception.

Review Comment:
   I assume this would apply to any operator. Can we add why this is important 
for BaseCombineOperator?



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java:
##########
@@ -59,6 +53,14 @@ public ExplainInfo getExplainInfo() {
     return new ExplainInfo(getExplainName(), attributeBuilder.build(), 
getChildrenExplainInfo());
   }
 
+  protected void checkTermination() {
+    QueryThreadContext.checkTermination(this::getExplainName);
+  }
+
+  protected void checkTerminationAndSampleUsage() {
+    QueryThreadContext.checkTerminationAndSampleUsage(this::getExplainName);
+  }

Review Comment:
   Couldn't we have a version of `checkTerminationAndSampleUsage` and 
`checkTermination` that accepts a String instead of a supplier? That way we 
could call this method without allocating



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadAggregator.java:
##########
@@ -18,165 +18,134 @@
  */
 package org.apache.pinot.core.accounting;
 
-import java.util.ArrayList;
+import com.google.common.collect.Sets;
+import it.unimi.dsi.fastutil.longs.LongLongMutablePair;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
 import org.apache.pinot.spi.config.instance.InstanceType;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.trace.Tracing;
-import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class WorkloadAggregator implements ResourceAggregator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WorkloadAggregator.class);
 
-  private final boolean _isThreadCPUSamplingEnabled;
-  private final boolean _isThreadMemorySamplingEnabled;
-  private final PinotConfiguration _config;
-  private final InstanceType _instanceType;
-  private final String _instanceId;
-
-  // For one time concurrent update of stats. This is to provide stats 
collection for parts that are not
-  // performance sensitive and workload_name is not known earlier (eg: broker 
inbound netty request)
-  private final ConcurrentHashMap<String, Long> 
_concurrentTaskCPUStatsAggregator = new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, Long> 
_concurrentTaskMemStatsAggregator = new ConcurrentHashMap<>();
+  // Tracks the CPU and memory usage for workloads not tracked by any thread.
+  // E.g. request ser/de where thread execution context cannot be set up 
beforehand; tasks already finished.
+  private final ConcurrentHashMap<String, LongLongMutablePair> 
_untrackedCpuMemUsage = new ConcurrentHashMap<>();
 
-  private final int _sleepTimeMs;
-  private final boolean _enableEnforcement;
+  // Tracks the queries under each workload.
+  private final Map<String, Set<QueryExecutionContext>> _workloadQueriesMap = 
new HashMap<>();
 
-  WorkloadBudgetManager _workloadBudgetManager;
+  private final String _instanceId;
+  private final InstanceType _instanceType;
+  private final boolean _cpuSamplingEnabled;
+  private final boolean _memorySamplingEnabled;

Review Comment:
   These two attributes are not being used



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java:
##########
@@ -92,26 +96,122 @@ protected BaseCombineOperator(ResultsBlockMerger<T> 
resultsBlockMerger, List<Ope
     _futures = new Future[_numTasks];
   }
 
+  @Override
+  public List<Operator> getChildOperators() {
+    return _operators;
+  }
+
+  /// Do not check termination in combine operator to ensure [#nextBlock()] 
returns a results block instead of
+  /// throwing exception.
+  @Override
+  protected void checkTermination() {
+  }
+
+  /// Replaces the error block with the terminate exception if exists, and 
attaches the execution statistics to the
+  /// results block.
+  protected BaseResultsBlock 
checkTerminateExceptionAndAttachExecutionStats(BaseResultsBlock resultsBlock) {
+    // For exception results block, check terminate exception and use it as 
the results block if exists. We want to
+    // return the termination reason when query is explicitly terminated.
+    if (resultsBlock instanceof ExceptionResultsBlock) {
+      TerminationException terminateException = 
QueryThreadContext.getTerminateException();

Review Comment:
   Why dont we do this when the received ExceptionResultBlock is created 
instead of here?



##########
pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkAggregateGroupByOrderByQueriesSSE.java:
##########
@@ -1,557 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.perf;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.IntStream;
-import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.common.datatable.DataTableFactory;
-import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
-import org.apache.pinot.core.plan.Plan;
-import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
-import org.apache.pinot.core.plan.maker.PlanMaker;
-import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
-import org.apache.pinot.core.query.optimizer.QueryOptimizer;
-import org.apache.pinot.core.query.reduce.BrokerReduceService;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import org.apache.pinot.core.transport.ServerRoutingInstance;
-import org.apache.pinot.core.util.GapfillUtils;
-import org.apache.pinot.queries.StatisticalQueriesTest;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.SegmentContext;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
-import org.intellij.lang.annotations.Language;
-import org.mockito.Mockito;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Warmup;
-import org.openjdk.jmh.runner.Runner;
-import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
-
-
-@BenchmarkMode(Mode.AverageTime)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
-@Fork(1)
-@Warmup(iterations = 3, time = 1)
-@Measurement(iterations = 3, time = 5)
-@State(Scope.Benchmark)
-public class BenchmarkAggregateGroupByOrderByQueriesSSE {

Review Comment:
   Aren't this and the other deleted benchmarks useful?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/accounting/TrackingScope.java:
##########
@@ -18,9 +18,7 @@
  */
 package org.apache.pinot.spi.accounting;
 
-/**
- * Scope for tracking resources in ThreadResourceUsageAccountant.
- */
+/// Scope for tracking resources in ThreadAccountant.
 public enum TrackingScope {
   QUERY,
   WORKLOAD

Review Comment:
   What is the difference between each scope?



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java:
##########
@@ -58,83 +55,23 @@ enum TriggeringLevel {
     Normal, HeapMemoryAlarmingVerbose, CPUTimeBasedKilling, 
HeapMemoryCritical, HeapMemoryPanic
   }
 
-  // For one time concurrent update of stats. This is to provide stats 
collection for parts that are not
-  // performance sensitive and query_id is not known beforehand (e.g. broker 
inbound netty thread)
-  private final ConcurrentHashMap<String, Long> 
_concurrentTaskCPUStatsAggregator = new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, Long> 
_concurrentTaskMemStatsAggregator = new ConcurrentHashMap<>();
-
-  // for stats aggregation of finished (worker) threads when the runner is 
still running.
-  private final HashMap<String, Long> _finishedTaskCPUStatsAggregator = new 
HashMap<>();
-  private final HashMap<String, Long> _finishedTaskMemStatsAggregator = new 
HashMap<>();
-
-  private final boolean _isThreadCPUSamplingEnabled;
-  private final boolean _isThreadMemorySamplingEnabled;
+  // Tracks the CPU and memory usage for queries not tracked by any thread.
+  // E.g. request ser/de where thread execution context cannot be set up 
beforehand; tasks already finished.
+  private final ConcurrentHashMap<String, LongLongMutablePair> 
_untrackedCpuMemUsage = new ConcurrentHashMap<>();
 
-  private final Set<String> _inactiveQuery;
-  private final PinotConfiguration _config;
+  // Tracks the id of the inactive queries, which is used to clean up entries 
in _untrackedMemCpuUsage.
+  private final Set<String> _inactiveQueries = new HashSet<>();
 
-  private final InstanceType _instanceType;
   private final String _instanceId;
+  private final InstanceType _instanceType;
+  private final boolean _cpuSamplingEnabled;
+  private final boolean _memorySamplingEnabled;
+  private final AtomicReference<QueryMonitorConfig> _queryMonitorConfig;
 
-  // max heap usage, Xmx
-  private final long _maxHeapSize = ResourceUsageUtils.getMaxHeapSize();
-
-  // don't kill a query if its memory footprint is below some ratio of 
_maxHeapSize
-  private final long _minMemoryFootprintForKill;
-
-  // kill all queries if heap usage exceeds this
-  private final long _panicLevel;
-
-  // kill the most expensive query if heap usage exceeds this
-  private final long _criticalLevel;
-
-  // if after gc the heap usage is still above this, kill the most expensive 
query
-  // use this to prevent heap size oscillation and repeatedly triggering gc
-  private final long _criticalLevelAfterGC;
-
-  // trigger gc if consecutively kill more than some number of queries
-  // set this to 0 to always trigger gc before killing a query to give gc a 
second chance
-  // as would minimize the chance of false positive killing in some usecases
-  // should consider use -XX:+ExplicitGCInvokesConcurrent to avoid STW for 
some gc algorithms
-  private final int _gcBackoffCount;
-
-  // start to sample more frequently if heap usage exceeds this
-  private final long _alarmingLevel;
-
-  // normal sleep time
-  private final int _normalSleepTime;
-
-  // wait for gc to complete, according to system.gc() javadoc, when control 
returns from the method call,
-  // the Java Virtual Machine has made a best effort to reclaim space from all 
discarded objects.
-  // Therefore, we default this to 0.
-  // Tested with Shenandoah GC and G1GC, with -XX:+ExplicitGCInvokesConcurrent
-  private final int _gcWaitTime;
-
-  // alarming sleep time denominator, should be > 1 to sample more frequent at 
alarming level
-  private final int _alarmingSleepTimeDenominator;
-
-  // alarming sleep time
-  private final int _alarmingSleepTime;
-
-  // the framework would not commit to kill any query if this is disabled
-  private final boolean _oomKillQueryEnabled;
-
-  // if we want to publish the heap usage
-  private final boolean _publishHeapUsageMetric;
-
-  // if we want kill query based on CPU time
-  private final boolean _isCPUTimeBasedKillingEnabled;
-
-  // CPU time based killing threshold
-  private final long _cpuTimeBasedKillingThresholdNS;
-
-  private final boolean _isQueryKilledMetricEnabled;
-
-  private long _usedBytes;
+  private volatile long _usedBytes;
   private int _sleepTime;
-  private int _numQueriesKilledConsecutively = 0;
-  protected Map<String, AggregatedStats> _aggregatedUsagePerActiveQuery;
-  private TriggeringLevel _triggeringLevel;
+  private Map<String, QueryResourceTrackerImpl> _queryResourceUsages;

Review Comment:
   Can we add a comment indicating the key is the query id?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to