This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 397874e548 Add Debug APIs that return Thread and Query Resource Usage 
(#13583)
397874e548 is described below

commit 397874e548a8ba06ff95a0777c10909393f400c2
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Mon Jul 22 19:43:26 2024 +0530

    Add Debug APIs that return Thread and Query Resource Usage (#13583)
    
    * Checkpoint
    
    * Checkpoint
    
    * Add javadocs for new interfaces.
    
    * Code linting and java docs.
    
    * Add two quick starts for resource tracking.
    
    * Add a javadoc
    
    * Dont serialize thread info
    
    * Add auth to query debug resource
---
 .../broker/api/resources/PinotBrokerDebug.java     | 26 ++++++
 .../CPUMemThreadLevelAccountingObjects.java        | 23 +++++-
 .../PerQueryCPUMemAccountantFactory.java           | 85 ++++++++++++++++++--
 .../java/org/apache/pinot/core/auth/Actions.java   |  1 +
 .../pinot/server/api/resources/DebugResource.java  | 26 ++++++
 .../pinot/spi/accounting/QueryResourceTracker.java | 46 +++++++++++
 .../spi/accounting/ThreadResourceTracker.java      | 52 ++++++++++++
 .../accounting/ThreadResourceUsageAccountant.java  | 14 ++++
 .../java/org/apache/pinot/spi/trace/Tracing.java   | 15 ++++
 .../tools/MultiStageResourceTrackerQuickStart.java | 56 +++++++++++++
 .../pinot/tools/MultistageEngineQuickStart.java    | 24 +++---
 .../SingleStageResourceTrackingQuickStart.java     | 93 ++++++++++++++++++++++
 .../apache/pinot/tools/utils/PinotConfigUtils.java | 22 +++++
 .../apache/pinot/tools/utils/SampleQueries.java    | 42 ++++++++++
 14 files changed, 503 insertions(+), 22 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index 11e35eb6f7..8311b5de87 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -27,6 +27,7 @@ import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -57,7 +58,11 @@ import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 
@@ -269,4 +274,25 @@ public class PinotBrokerDebug {
   private long getRequestId() {
     return _requestIdGenerator.getAndIncrement();
   }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/debug/threads/resourceUsage")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.DEBUG_RESOURCE_USAGE)
+  @ApiOperation(value = "Get resource usage of threads")
+  public Collection<? extends ThreadResourceTracker> getThreadResourceUsage() {
+    ThreadResourceUsageAccountant threadAccountant = 
Tracing.getThreadAccountant();
+    return threadAccountant.getThreadResources();
+  }
+
+  @GET
+  @Path("debug/queries/resourceUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.DEBUG_RESOURCE_USAGE)
+  @ApiOperation(value = "Get current resource usage of queries in this 
service", notes = "This is a debug endpoint, "
+      + "and won't maintain backward compatibility")
+  public Collection<? extends QueryResourceTracker> getQueryUsage() {
+    ThreadResourceUsageAccountant threadAccountant = 
Tracing.getThreadAccountant();
+    return threadAccountant.getQueryResources().values();
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
index 431643942a..6a375b95bb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pinot.core.accounting;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
 import org.apache.pinot.spi.utils.CommonConstants;
 
 
@@ -34,7 +36,7 @@ public class CPUMemThreadLevelAccountingObjects {
    * Entry to track the task execution status and usage stats of a Thread
    * (including but not limited to server worker thread, runner thread, broker 
jetty thread, or broker netty thread)
    */
-  public static class ThreadEntry {
+  public static class ThreadEntry implements ThreadResourceTracker {
     // current query_id, task_id of the thread; this field is accessed by the 
thread itself and the accountant
     AtomicReference<TaskEntry> _currentThreadTaskStatus = new 
AtomicReference<>();
     // current sample of thread memory usage/cputime ; this field is accessed 
by the thread itself and the accountant
@@ -77,11 +79,30 @@ public class CPUMemThreadLevelAccountingObjects {
      *
      * @return the current query id on the thread, {@code null} if idle
      */
+    @JsonIgnore
     @Nullable
     public TaskEntry getCurrentThreadTaskStatus() {
       return _currentThreadTaskStatus.get();
     }
 
+    public long getCPUTimeMS() {
+      return _currentThreadCPUTimeSampleMS;
+    }
+
+    public long getAllocatedBytes() {
+      return _currentThreadMemoryAllocationSampleBytes;
+    }
+
+    public String getQueryId() {
+      TaskEntry taskEntry = _currentThreadTaskStatus.get();
+      return taskEntry == null ? "" : taskEntry.getQueryId();
+    }
+
+    public int getTaskId() {
+      TaskEntry taskEntry = _currentThreadTaskStatus.get();
+      return taskEntry == null ? -1 : taskEntry.getTaskId();
+    }
+
     public void setThreadTaskStatus(@Nonnull String queryId, int taskId, 
@Nonnull Thread anchorThread) {
       _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, 
anchorThread));
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 598b68b344..34e496d3bf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.core.accounting;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -38,8 +40,10 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.config.instance.InstanceType;
@@ -162,6 +166,65 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       _watcherTask = new WatcherTask();
     }
 
+    @Override
+    public Collection<? extends ThreadResourceTracker> getThreadResources() {
+      return _threadEntriesMap.values();
+    }
+
+    /**
+     * This function aggregates resource usage from all active threads and 
groups by queryId.
+     * It is inspired by {@link 
PerQueryCPUMemResourceUsageAccountant::aggregate}. The major difference is that
+     * it only reads from thread entries and does not update them.
+     * @return A map of query id, QueryResourceTracker.
+     */
+    @Override
+    public Map<String, ? extends QueryResourceTracker> getQueryResources() {
+      HashMap<String, AggregatedStats> ret = new HashMap<>();
+
+      // for each {pqr, pqw}
+      for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> 
entry : _threadEntriesMap.entrySet()) {
+        // sample current usage
+        CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
entry.getValue();
+        long currentCPUSample = _isThreadCPUSamplingEnabled ? 
threadEntry._currentThreadCPUTimeSampleMS : 0;
+        long currentMemSample =
+            _isThreadMemorySamplingEnabled ? 
threadEntry._currentThreadMemoryAllocationSampleBytes : 0;
+        // sample current running task status
+        CPUMemThreadLevelAccountingObjects.TaskEntry currentTaskStatus = 
threadEntry.getCurrentThreadTaskStatus();
+        Thread thread = entry.getKey();
+        LOGGER.trace("tid: {}, task: {}", thread.getId(), currentTaskStatus);
+
+        // if current thread is not idle
+        if (currentTaskStatus != null) {
+          // extract query id from queryTask string
+          String queryId = currentTaskStatus.getQueryId();
+          if (queryId != null) {
+            Thread anchorThread = currentTaskStatus.getAnchorThread();
+            boolean isAnchorThread = currentTaskStatus.isAnchorThread();
+            ret.compute(queryId,
+                (k, v) -> v == null ? new AggregatedStats(currentCPUSample, 
currentMemSample, anchorThread,
+                    isAnchorThread, threadEntry._errorStatus, queryId)
+                    : v.merge(currentCPUSample, currentMemSample, 
isAnchorThread, threadEntry._errorStatus));
+          }
+        }
+      }
+
+      // if triggered, accumulate stats of finished tasks of each active query
+      for (Map.Entry<String, AggregatedStats> queryIdResult : ret.entrySet()) {
+        String activeQueryId = queryIdResult.getKey();
+        long accumulatedCPUValue =
+            _isThreadCPUSamplingEnabled ? 
_finishedTaskCPUStatsAggregator.getOrDefault(activeQueryId, 0L) : 0;
+        long concurrentCPUValue =
+            _isThreadCPUSamplingEnabled ? 
_concurrentTaskCPUStatsAggregator.getOrDefault(activeQueryId, 0L) : 0;
+        long accumulatedMemValue =
+            _isThreadMemorySamplingEnabled ? 
_finishedTaskMemStatsAggregator.getOrDefault(activeQueryId, 0L) : 0;
+        long concurrentMemValue =
+            _isThreadMemorySamplingEnabled ? 
_concurrentTaskMemStatsAggregator.getOrDefault(activeQueryId, 0L) : 0;
+        queryIdResult.getValue()
+            .merge(accumulatedCPUValue + concurrentCPUValue, 
accumulatedMemValue + concurrentMemValue, false, null);
+      }
+      return ret;
+    }
+
     @Override
     public void sampleUsage() {
       sampleThreadBytesAllocated();
@@ -402,7 +465,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     /**
      * aggregated usage of a query, _thread is the runner
      */
-    protected static class AggregatedStats {
+    protected static class AggregatedStats implements QueryResourceTracker {
       final String _queryId;
       final Thread _anchorThread;
       boolean _isAnchorThread;
@@ -432,14 +495,22 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
             + '}';
       }
 
-      public long getCpuNS() {
-        return _cpuNS;
+      @Override
+      public String getQueryId() {
+        return _queryId;
       }
 
+      @Override
       public long getAllocatedBytes() {
         return _allocatedBytes;
       }
 
+      @Override
+      public long getCpuTimeNs() {
+        return _cpuNS;
+      }
+
+      @JsonIgnore
       public Thread getAnchorThread() {
         return _anchorThread;
       }
@@ -804,7 +875,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
           }
         } else {
           maxUsageTuple = 
Collections.max(_aggregatedUsagePerActiveQuery.values(),
-              Comparator.comparing(AggregatedStats::getCpuNS));
+              Comparator.comparing(AggregatedStats::getCpuTimeNs));
           if (_oomKillQueryEnabled) {
             maxUsageTuple._exceptionAtomicReference
                 .set(new RuntimeException(String.format(
@@ -828,12 +899,12 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
           AggregatedStats value = entry.getValue();
           if (value._cpuNS > _cpuTimeBasedKillingThresholdNS) {
             LOGGER.error("Current task status recorded is {}. Query {} got 
picked because using {} ns of cpu time,"
-                    + " greater than threshold {}", _threadEntriesMap, 
value._queryId, value.getCpuNS(),
+                    + " greater than threshold {}", _threadEntriesMap, 
value._queryId, value.getCpuTimeNs(),
                 _cpuTimeBasedKillingThresholdNS);
             value._exceptionAtomicReference.set(new RuntimeException(
                 String.format("Query %s got killed on %s: %s because using %d "
-                        + "CPU time exceeding limit of %d ns CPU time",
-                    value._queryId, _instanceType, _instanceId, 
value.getCpuNS(), _cpuTimeBasedKillingThresholdNS)));
+                        + "CPU time exceeding limit of %d ns CPU time", 
value._queryId, _instanceType, _instanceId,
+                    value.getCpuTimeNs(), _cpuTimeBasedKillingThresholdNS)));
             interruptRunnerThread(value.getAnchorThread());
           }
         }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 877041eb7d..51cab16711 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -34,6 +34,7 @@ public class Actions {
     public static final String CREATE_TENANT = "CreateTenant";
     public static final String CREATE_USER = "CreateUser";
     public static final String DEBUG_TASK = "DebugTask";
+    public static final String DEBUG_RESOURCE_USAGE = "DebugResourceUsage";
     public static final String DELETE_CLUSTER_CONFIG = "DeleteClusterConfig";
     public static final String DELETE_INSTANCE = "DeleteInstance";
     public static final String DELETE_TASK = "DeleteTask";
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
index b90f4b42a4..23f7d9a6cc 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -52,8 +53,12 @@ import 
org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 import static org.apache.pinot.spi.utils.CommonConstants.DATABASE;
@@ -123,6 +128,27 @@ public class DebugResource {
     }
   }
 
+  @GET
+  @Path("threads/resourceUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get current resource usage of threads",
+      notes = "This is a debug endpoint, and won't maintain backward 
compatibility")
+  public Collection<? extends ThreadResourceTracker> getThreadUsage() {
+    ThreadResourceUsageAccountant threadAccountant = 
Tracing.getThreadAccountant();
+    return threadAccountant.getThreadResources();
+  }
+
+  @GET
+  @Path("queries/resourceUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get current resource usage of queries in this 
service",
+      notes = "This is a debug endpoint, and won't maintain backward 
compatibility")
+  public Collection<? extends QueryResourceTracker> getQueryUsage() {
+    ThreadResourceUsageAccountant threadAccountant = 
Tracing.getThreadAccountant();
+    Collection<? extends QueryResourceTracker> resources = 
threadAccountant.getQueryResources().values();
+    return resources;
+  }
+
   private List<SegmentServerDebugInfo> getSegmentServerDebugInfo(String 
tableNameWithType, TableType tableType) {
     List<SegmentServerDebugInfo> segmentServerDebugInfos = new ArrayList<>();
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryResourceTracker.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryResourceTracker.java
new file mode 100644
index 0000000000..869528bb75
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryResourceTracker.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.accounting;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+
+/**
+ * Tracks allocated bytes and CPU time for a query in a server or a broker.
+ */
+@JsonSerialize
+public interface QueryResourceTracker {
+  /**
+   * QueryId tracked by the implementation.
+   * @return a string containing the query id.
+   */
+  String getQueryId();
+
+  /**
+   * Allocated bytes for a query in a server or broker
+   * @return A long containing the number of bytes allocated to execute the 
query.
+   */
+  long getAllocatedBytes();
+
+  /**
+   * Total execution CPU Time(nanoseconds) of a query in a server or broker.
+   * @return A long containing the nanoseconds.
+   */
+  long getCpuTimeNs();
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
new file mode 100644
index 0000000000..ff78a0d33c
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.accounting;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+
+/**
+ * Tracks allocated bytes and CPU time by a thread when executing a task of a 
query.
+ */
+@JsonSerialize
+public interface ThreadResourceTracker {
+  /**
+   * Total execution CPU Time(nanoseconds) of a thread when executing a query 
task in a server or broker.
+   * @return A long containing the nanoseconds.
+   */
+  long getCPUTimeMS();
+
+  /**
+   * Allocated bytes for a query task in a server or broker
+   * @return A long containing the number of bytes allocated to execute the 
query task.
+   */
+  long getAllocatedBytes();
+
+  /**
+   * QueryId of the task the thread is executing.
+   * @return a string containing the query id.
+   */
+  String getQueryId();
+
+  /**
+   * TaskId of the task the thread is executing.
+   * @return an int containing the task id.
+   */
+  int getTaskId();
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index 41bbd59110..8be0632e6b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.spi.accounting;
 
+import java.util.Collection;
+import java.util.Map;
 import javax.annotation.Nullable;
 
 
@@ -74,4 +76,16 @@ public interface ThreadResourceUsageAccountant {
    * @return empty string if N/A
    */
   Exception getErrorStatus();
+
+  /**
+   * Get all the ThreadResourceTrackers for all threads executing query tasks
+   * @return A collection of ThreadResourceTracker objects
+   */
+  Collection<? extends ThreadResourceTracker> getThreadResources();
+
+  /**
+   * Get all the QueryResourceTrackers for all the queries executing in a 
broker or server.
+   * @return A Map of String, QueryResourceTracker for all the queries.
+   */
+  Map<String, ? extends QueryResourceTracker> getQueryResources();
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index a853b03c20..910ebc35cf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -20,9 +20,14 @@ package org.apache.pinot.spi.trace;
 
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.MethodType;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -226,6 +231,16 @@ public class Tracing {
     public Exception getErrorStatus() {
       return null;
     }
+
+    @Override
+    public Collection<? extends ThreadResourceTracker> getThreadResources() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public Map<String, ? extends QueryResourceTracker> getQueryResources() {
+      return Collections.emptyMap();
+    }
   }
 
   /**
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/MultiStageResourceTrackerQuickStart.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/MultiStageResourceTrackerQuickStart.java
new file mode 100644
index 0000000000..a669011a18
--- /dev/null
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/MultiStageResourceTrackerQuickStart.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+import org.apache.pinot.tools.utils.SampleQueries;
+
+
+public class MultiStageResourceTrackerQuickStart extends 
SingleStageResourceTrackingQuickStart {
+  @Override
+  protected List<String> getQueries() {
+    return List.of(SampleQueries.COUNT_BASEBALL_STATS, 
SampleQueries.BASEBALL_STATS_SELF_JOIN,
+        SampleQueries.BASEBALL_JOIN_DIM_BASEBALL_TEAMS);
+  }
+
+  @Override
+  protected Map<String, String> getQueryOptions() {
+    return Collections.singletonMap("queryOptions",
+        CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE + 
"=true");
+  }
+
+  @Override
+  public List<String> types() {
+    return Collections.singletonList("MULTI_STAGE_RESOURCE_TRACKING");
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    List<String> arguments = new ArrayList<>();
+    arguments.addAll(Arrays.asList("QuickStart", "-type", 
"MULTI_STAGE_RESOURCE_TRACKING"));
+    arguments.addAll(Arrays.asList(args));
+    PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
index 658aa694dc..e249c37c0b 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.utils.SampleQueries;
 
 
 public class MultistageEngineQuickStart extends Quickstart {
@@ -58,28 +59,23 @@ public class MultistageEngineQuickStart extends Quickstart {
     printStatus(Quickstart.Color.YELLOW, "***** Multi-stage engine quickstart 
setup complete *****");
     Map<String, String> queryOptions = Collections.singletonMap("queryOptions",
         CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE + 
"=true");
-    String q1 = "SELECT count(*) FROM baseballStats_OFFLINE LIMIT 10";
     printStatus(Quickstart.Color.YELLOW, "Total number of documents in the 
table");
-    printStatus(Quickstart.Color.CYAN, "Query : " + q1);
-    printStatus(Quickstart.Color.YELLOW, 
prettyPrintResponse(runner.runQuery(q1, queryOptions)));
+    printStatus(Quickstart.Color.CYAN, "Query : " + 
SampleQueries.COUNT_BASEBALL_STATS);
+    printStatus(Quickstart.Color.YELLOW,
+        
prettyPrintResponse(runner.runQuery(SampleQueries.COUNT_BASEBALL_STATS, 
queryOptions)));
     printStatus(Quickstart.Color.GREEN, 
"***************************************************");
 
-    String q2 = "SELECT a.playerID, a.runs, a.yearID, b.runs, b.yearID"
-        + " FROM baseballStats_OFFLINE AS a JOIN baseballStats_OFFLINE AS b ON 
a.playerID = b.playerID"
-        + " WHERE a.runs > 160 AND b.runs < 2 LIMIT 10";
     printStatus(Quickstart.Color.YELLOW, "Correlate the same player(s) with 
more than 160-run some year(s) and"
         + " with less than 2-run some other year(s)");
-    printStatus(Quickstart.Color.CYAN, "Query : " + q2);
-    printStatus(Quickstart.Color.YELLOW, 
prettyPrintResponse(runner.runQuery(q2, queryOptions)));
+    printStatus(Quickstart.Color.CYAN, "Query : " + 
SampleQueries.BASEBALL_STATS_SELF_JOIN);
+    printStatus(Quickstart.Color.YELLOW,
+        
prettyPrintResponse(runner.runQuery(SampleQueries.BASEBALL_STATS_SELF_JOIN, 
queryOptions)));
     printStatus(Quickstart.Color.GREEN, 
"***************************************************");
 
-    String q3 = "SELECT a.playerName, a.teamID, b.teamName \n"
-        + "FROM baseballStats_OFFLINE AS a\n"
-        + "JOIN dimBaseballTeams_OFFLINE AS b\n"
-        + "ON a.teamID = b.teamID LIMIT 10";
     printStatus(Quickstart.Color.YELLOW, "Baseball Stats with joined team 
names");
-    printStatus(Quickstart.Color.CYAN, "Query : " + q3);
-    printStatus(Quickstart.Color.YELLOW, 
prettyPrintResponse(runner.runQuery(q3, queryOptions)));
+    printStatus(Quickstart.Color.CYAN, "Query : " + 
SampleQueries.BASEBALL_JOIN_DIM_BASEBALL_TEAMS);
+    printStatus(Quickstart.Color.YELLOW,
+        
prettyPrintResponse(runner.runQuery(SampleQueries.BASEBALL_JOIN_DIM_BASEBALL_TEAMS,
 queryOptions)));
     printStatus(Quickstart.Color.GREEN, 
"***************************************************");
 
     String q4 =
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/SingleStageResourceTrackingQuickStart.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/SingleStageResourceTrackingQuickStart.java
new file mode 100644
index 0000000000..8fbc4fbdf6
--- /dev/null
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/SingleStageResourceTrackingQuickStart.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.utils.PinotConfigUtils;
+import org.apache.pinot.tools.utils.SampleQueries;
+
+public class SingleStageResourceTrackingQuickStart extends Quickstart {
+  @Override
+  protected Map<String, Object> getConfigOverrides() {
+    Map<String, Object> configOverrides = new HashMap<>();
+    // Quickstart.getConfigOverrides may return an ImmutableMap.
+    configOverrides.putAll(super.getConfigOverrides());
+    configOverrides.putAll(PinotConfigUtils.getResourceTrackingConf());
+    return configOverrides;
+  }
+
+  @Override
+  public void runSampleQueries(QuickstartRunner runner)
+      throws Exception {
+    List<String> queries = getQueries();
+    Map<String, String> queryOptions = getQueryOptions();
+
+    printStatus(Color.YELLOW, "***** Running queries for eternity *****");
+    ExecutorService service = Executors.newFixedThreadPool(10);
+
+    for (int i = 0; i < 10; i++) {
+      service.submit(() -> {
+        try {
+          while (true) {
+            for (String query : queries) {
+              runner.runQuery(query, queryOptions);
+            }
+            Thread.sleep(10);
+          }
+        } catch (Exception e) {
+          printStatus(Color.CYAN, e.getMessage());
+        }
+      });
+    }
+  }
+
+  protected List<String> getQueries() {
+    return List.of(SampleQueries.COUNT_BASEBALL_STATS,
+        SampleQueries.BASEBALL_SUM_RUNS_Q1,
+        SampleQueries.BASEBALL_SUM_RUNS_Q2,
+        SampleQueries.BASEBALL_SUM_RUNS_Q3,
+        SampleQueries.BASEBALL_ORDER_BY_YEAR);
+  }
+
+  protected Map<String, String> getQueryOptions() {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public List<String> types() {
+    return Collections.singletonList("SINGLE_STAGE_RESOURCE_TRACKING");
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    List<String> arguments = new ArrayList<>();
+    arguments.addAll(Arrays.asList("QuickStart", "-type", 
"SINGLE_STAGE_RESOURCE_TRACKING"));
+    arguments.addAll(Arrays.asList(args));
+    PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index 77d0eee664..0a9e1cd31e 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -32,6 +32,7 @@ import 
org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf;
+import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.NetUtils;
@@ -215,6 +216,27 @@ public class PinotConfigUtils {
     return properties;
   }
 
+  public static Map<String, Object> getResourceTrackingConf() {
+    Map<String, Object> configOverrides = new HashMap<>();
+    
configOverrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
 true);
+    
configOverrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
+    
configOverrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
 true);
+    
configOverrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
+    
configOverrides.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
 true);
+    
configOverrides.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
 true);
+
+    configOverrides.put(
+        CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + 
CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+        PerQueryCPUMemAccountantFactory.class.getCanonicalName());
+    configOverrides.put(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+        + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
true);
+    configOverrides.put(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+        + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, 
false);
+    configOverrides.put(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+        + CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, 
true);
+    return configOverrides;
+  }
+
   public static int getAvailablePort() {
     try {
       try (ServerSocket socket = new ServerSocket(0)) {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/SampleQueries.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/SampleQueries.java
new file mode 100644
index 0000000000..8247de78b7
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/SampleQueries.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.utils;
+
+public class SampleQueries {
+  private SampleQueries() {
+  }
+
+  public final static String COUNT_BASEBALL_STATS = "SELECT count(*) FROM 
baseballStats_OFFLINE LIMIT 10";
+  public final static String BASEBALL_STATS_SELF_JOIN = "SELECT a.playerID, 
a.runs, a.yearID, b.runs, b.yearID"
+      + " FROM baseballStats_OFFLINE AS a JOIN baseballStats_OFFLINE AS b ON 
a.playerID = b.playerID"
+      + " WHERE a.runs > 160 AND b.runs < 2 LIMIT 10";
+  public final static String BASEBALL_JOIN_DIM_BASEBALL_TEAMS =
+      "SELECT a.playerName, a.teamID, b.teamName \n" + "FROM 
baseballStats_OFFLINE AS a\n"
+          + "JOIN dimBaseballTeams_OFFLINE AS b\n" + "ON a.teamID = b.teamID 
LIMIT 10";
+  public final static String BASEBALL_SUM_RUNS_Q1 =
+      "select playerName, sum(runs) from baseballStats group by playerName 
order by sum(runs) desc limit 5";
+  public final static String BASEBALL_SUM_RUNS_Q2 =
+      "select playerName, sum(runs) from baseballStats where yearID=2000 group 
by playerName order by sum(runs) "
+          + "desc limit 5";
+  public final static String BASEBALL_SUM_RUNS_Q3 =
+      "select playerName, sum(runs) from baseballStats where yearID>=2000 
group by playerName order by sum(runs) "
+          + "desc limit 10";
+  public final static String BASEBALL_ORDER_BY_YEAR =
+      "select playerName, runs, homeRuns from baseballStats order by yearID 
limit 10";
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to