This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 53b7ebb5285 Add memory and GC time tracking to execution statistics 
(#16576)
53b7ebb5285 is described below

commit 53b7ebb52855e7306fd7d9cf37ea4fc9c835aa45
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Sep 10 16:27:35 2025 +0200

    Add memory and GC time tracking to execution statistics (#16576)
---
 .../org/apache/pinot/common/datatable/StatMap.java |  2 +-
 .../common/utils/config/QueryOptionsUtils.java     |  5 +++++
 .../pinot/query/mailbox/ReceivingMailbox.java      |  4 +++-
 .../query/runtime/operator/AggregateOperator.java  | 16 ++++++++++----
 .../query/runtime/operator/BaseJoinOperator.java   | 16 ++++++++++----
 .../operator/BaseMailboxReceiveOperator.java       | 16 ++++++++++----
 .../query/runtime/operator/ErrorOperator.java      |  4 +++-
 .../query/runtime/operator/FilterOperator.java     | 16 ++++++++++----
 .../pinot/query/runtime/operator/LeafOperator.java | 25 +++++++++++++++++++---
 .../runtime/operator/LiteralValueOperator.java     | 16 ++++++++++----
 .../query/runtime/operator/LookupJoinOperator.java | 20 +++++++++++------
 .../runtime/operator/MailboxSendOperator.java      | 16 ++++++++++----
 .../query/runtime/operator/MultiStageOperator.java | 19 ++++++++++++++--
 .../pinot/query/runtime/operator/SortOperator.java | 16 ++++++++++----
 .../query/runtime/operator/TransformOperator.java  | 16 ++++++++++----
 .../runtime/operator/WindowAggregateOperator.java  | 16 ++++++++++----
 .../runtime/operator/exchange/BlockExchange.java   |  2 +-
 .../query/runtime/operator/set/SetOperator.java    | 16 ++++++++++----
 .../plan/pipeline/PipelineBreakerOperator.java     | 14 ++++++++++--
 .../operator/BlockListMultiStageOperator.java      |  2 +-
 .../pinot/query/runtime/operator/OpChainTest.java  |  2 +-
 .../accounting/ThreadResourceUsageProvider.java    | 15 +++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  1 +
 .../pinot/tools/ColocatedJoinQuickStart.java       |  3 +++
 24 files changed, 219 insertions(+), 59 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
index 55a8d9bdc23..b64b281d5f3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
@@ -45,7 +45,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
  *   <li>Change the name of the keys</li>
  * </ul>
  *
- * Any other change (like changing the type of key, changing their literal 
order are not supported or removing keys)
+ * Any other change (like changing the type of key, changing their literal 
order or removing keys)
  * are backward incompatible changes.
  * @param <K>
  */
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 43b9036e741..b723c15e4c6 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -163,6 +163,11 @@ public class QueryOptionsUtils {
     return 
"false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION));
   }
 
+  public static boolean isCollectGcStats(Map<String, String> queryOptions) {
+    // Disabled by default
+    return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.COLLECT_GC_STATS));
+  }
+
   @Nullable
   public static Map<String, Set<FieldConfig.IndexType>> 
getSkipIndexes(Map<String, String> queryOptions) {
     // Example config:  skipIndexes='col1=inverted,range&col2=inverted'
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 6588e1cf76a..f1e4d5fd253 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -281,7 +281,9 @@ public class ReceivingMailbox {
     },
     IN_MEMORY_MESSAGES(StatMap.Type.INT),
     OFFER_CPU_TIME_MS(StatMap.Type.LONG),
-    WAIT_CPU_TIME_MS(StatMap.Type.LONG);
+    WAIT_CPU_TIME_MS(StatMap.Type.LONG),
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index da39a78fea6..eaf32a96ca6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -172,9 +172,11 @@ public class AggregateOperator extends MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -453,7 +455,6 @@ public class AggregateOperator extends MultiStageOperator {
   }
 
   public enum StatKey implements StatMap.Key {
-    //@formatter:off
     EXECUTION_TIME_MS(StatMap.Type.LONG) {
       @Override
       public boolean includeDefaultInJson() {
@@ -468,8 +469,15 @@ public class AggregateOperator extends MultiStageOperator {
     },
     GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
     NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
-    NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
-    //@formatter:on
+    NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
index ce8b959ff2e..063e692b79d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
@@ -159,9 +159,11 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -371,7 +373,6 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
   }
 
   public enum StatKey implements StatMap.Key {
-    //@formatter:off
     EXECUTION_TIME_MS(StatMap.Type.LONG) {
       @Override
       public boolean includeDefaultInJson() {
@@ -388,8 +389,15 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
     /**
      * How long (CPU time) has been spent on building the hash table.
      */
-    TIME_BUILDING_HASH_TABLE_MS(StatMap.Type.LONG);
-    //@formatter:on
+    TIME_BUILDING_HASH_TABLE_MS(StatMap.Type.LONG),
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index f6cf8e578c7..753ec9744b6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -141,9 +141,11 @@ public abstract class BaseMailboxReceiveOperator extends 
MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   private void addReceivingStats(StatMap<ReceivingMailbox.StatKey> from) {
@@ -202,7 +204,6 @@ public abstract class BaseMailboxReceiveOperator extends 
MultiStageOperator {
   }
 
   public enum StatKey implements StatMap.Key {
-    //@formatter:off
     EXECUTION_TIME_MS(StatMap.Type.LONG) {
       @Override
       public boolean includeDefaultInJson() {
@@ -255,8 +256,15 @@ public abstract class BaseMailboxReceiveOperator extends 
MultiStageOperator {
     /**
      * How long (in CPU time) it took to wait for the messages to be offered 
to downstream operator.
      */
-    UPSTREAM_WAIT_MS(StatMap.Type.LONG);
-    //@formatter:on
+    UPSTREAM_WAIT_MS(StatMap.Type.LONG),
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
index fbe9c8f99a3..926fec585ed 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
@@ -81,9 +81,11 @@ public class ErrorOperator extends MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(LiteralValueOperator.StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(LiteralValueOperator.StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(LiteralValueOperator.StatKey.ALLOCATED_MEMORY_BYTES, 
memoryUsedBytes);
+    _statMap.merge(LiteralValueOperator.StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index a2886f8405a..e98b9e2b245 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -68,9 +68,11 @@ public class FilterOperator extends MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -122,7 +124,6 @@ public class FilterOperator extends MultiStageOperator {
   }
 
   public enum StatKey implements StatMap.Key {
-    //@formatter:off
     EXECUTION_TIME_MS(StatMap.Type.LONG) {
       @Override
       public boolean includeDefaultInJson() {
@@ -134,8 +135,15 @@ public class FilterOperator extends MultiStageOperator {
       public boolean includeDefaultInJson() {
         return true;
       }
-    };
-    //@formatter:on
+    },
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 2d0939c85e8..42356ce74c1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -129,9 +129,11 @@ public class LeafOperator extends MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -714,7 +716,19 @@ public class LeafOperator extends MultiStageOperator {
       public String getStatName() {
         return "responseSerializationCpuTimeNs";
       }
-    };
+    },
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG, null),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG, null);
+    // IMPORTANT: When adding new StatKeys, make sure to either create the 
same key in BrokerResponseNativeV2.StatKey or
+    //  call the constructor that accepts a String as last argument and set it 
to null.
+    //  Otherwise the constructor will fail with an IllegalArgumentException 
which will not be caught and will
+    //  propagate to the caller, causing the query to timeout.
 
     private final StatMap.Type _type;
     @Nullable
@@ -722,7 +736,12 @@ public class LeafOperator extends MultiStageOperator {
 
     StatKey(StatMap.Type type) {
       _type = type;
-      _brokerKey = BrokerResponseNativeV2.StatKey.valueOf(name());
+      try {
+        _brokerKey = BrokerResponseNativeV2.StatKey.valueOf(name());
+      } catch (IllegalArgumentException e) {
+        LOGGER.error("Failed to map StatKey: {} to 
BrokerResponseNativeV2.StatKey", name(), e);
+        throw e;
+      }
     }
 
     StatKey(StatMap.Type type, @Nullable BrokerResponseNativeV2.StatKey 
brokerKey) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 0dd666f9e11..a2b00827749 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -50,9 +50,11 @@ public class LiteralValueOperator extends MultiStageOperator 
{
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -103,10 +105,16 @@ public class LiteralValueOperator extends 
MultiStageOperator {
   }
 
   public enum StatKey implements StatMap.Key {
-    //@formatter:off
     EXECUTION_TIME_MS(StatMap.Type.LONG),
-    EMITTED_ROWS(StatMap.Type.LONG);
-    //@formatter:on
+    EMITTED_ROWS(StatMap.Type.LONG),
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
index 6c9f2935b9f..8deec263954 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
@@ -100,9 +100,11 @@ public class LookupJoinOperator extends MultiStageOperator 
{
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
-    _statMap.merge(LookupJoinOperator.StatKey.EXECUTION_TIME_MS, time);
-    _statMap.merge(LookupJoinOperator.StatKey.EMITTED_ROWS, numRows);
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
+    _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
+    _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -240,7 +242,6 @@ public class LookupJoinOperator extends MultiStageOperator {
   }
 
   public enum StatKey implements StatMap.Key {
-    //@formatter:off
     EXECUTION_TIME_MS(StatMap.Type.LONG) {
       @Override
       public boolean includeDefaultInJson() {
@@ -252,8 +253,15 @@ public class LookupJoinOperator extends MultiStageOperator 
{
       public boolean includeDefaultInJson() {
         return true;
       }
-    };
-    //@formatter:on
+    },
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index cfdfecedb95..aa723360851 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -178,9 +178,11 @@ public class MailboxSendOperator extends 
MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -317,7 +319,6 @@ public class MailboxSendOperator extends MultiStageOperator 
{
   }
 
   public enum StatKey implements StatMap.Key {
-    //@formatter:off
     EXECUTION_TIME_MS(StatMap.Type.LONG) {
       @Override
       public boolean includeDefaultInJson() {
@@ -387,8 +388,15 @@ public class MailboxSendOperator extends 
MultiStageOperator {
       public boolean includeDefaultInJson() {
         return true;
       }
-    };
-    //@formatter:on
+    },
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index fc7bb41106b..269e83aa869 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.proto.Plan;
 import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.plan.ExplainInfo;
 import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
@@ -41,6 +42,8 @@ import 
org.apache.pinot.query.runtime.operator.set.SetOperator;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.trace.InvocationScope;
@@ -70,7 +73,7 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
 
   public abstract Type getOperatorType();
 
-  public abstract void registerExecution(long time, int numRows);
+  public abstract void registerExecution(long time, int numRows, long 
memoryUsedBytes, long gcTimeMs);
 
   /// By default, it uses the active deadline, which is the one that should be 
used for most operators, but if the
   /// operator does not actively process data (ie both mailbox operators), it 
should override this method to use the
@@ -113,6 +116,9 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
     if (logger().isDebugEnabled()) {
       logger().debug("Operator {}: Reading next block", _operatorId);
     }
+
+    ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
+    long preBlockGcTime = getGcTimeMillis();
     try (InvocationScope ignored = 
Tracing.getTracer().createScope(getClass())) {
       MseBlock nextBlock;
       Stopwatch executeStopwatch = Stopwatch.createStarted();
@@ -124,7 +130,9 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
         nextBlock = ErrorMseBlock.fromException(e);
       }
       int numRows = nextBlock instanceof MseBlock.Data ? ((MseBlock.Data) 
nextBlock).getNumRows() : 0;
-      registerExecution(executeStopwatch.elapsed(TimeUnit.MILLISECONDS), 
numRows);
+      long memoryUsedBytes = resourceSnapshot.getAllocatedBytes();
+      long gcTimeMs = getGcTimeMillis() - preBlockGcTime;
+      registerExecution(executeStopwatch.elapsed(TimeUnit.MILLISECONDS), 
numRows, memoryUsedBytes, gcTimeMs);
 
       if (logger().isDebugEnabled()) {
         logger().debug("Operator {}. Block {} ready to send", _operatorId, 
nextBlock);
@@ -220,6 +228,13 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
     return Collections.emptyMap();
   }
 
+  private long getGcTimeMillis() {
+    if (!QueryOptionsUtils.isCollectGcStats(_context.getOpChainMetadata())) {
+      return -1;
+    }
+    return ThreadResourceUsageProvider.getGcTime();
+  }
+
   /**
    * This enum is used to identify the operation type.
    * <p>
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 798d3c356dd..c3b3ad2f9b9 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -86,9 +86,11 @@ public class SortOperator extends MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -193,7 +195,6 @@ public class SortOperator extends MultiStageOperator {
   }
 
   public enum StatKey implements StatMap.Key {
-    //@formatter:off
     EXECUTION_TIME_MS(StatMap.Type.LONG) {
       @Override
       public boolean includeDefaultInJson() {
@@ -205,8 +206,15 @@ public class SortOperator extends MultiStageOperator {
       public boolean includeDefaultInJson() {
         return true;
       }
-    };
-    //@formatter:on
+    },
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 14aac5e86fb..4d919ffe7d0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -68,9 +68,11 @@ public class TransformOperator extends MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -118,15 +120,21 @@ public class TransformOperator extends MultiStageOperator 
{
   }
 
   public enum StatKey implements StatMap.Key {
-    //@formatter:off
     EXECUTION_TIME_MS(StatMap.Type.LONG) {
       @Override
       public boolean includeDefaultInJson() {
         return true;
       }
     },
-    EMITTED_ROWS(StatMap.Type.LONG);
-    //@formatter:on
+    EMITTED_ROWS(StatMap.Type.LONG),
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 0edaa21d3b9..908604c21f4 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -147,9 +147,11 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -286,7 +288,6 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
   }
 
   public enum StatKey implements StatMap.Key {
-    //@formatter:off
     EXECUTION_TIME_MS(StatMap.Type.LONG) {
       @Override
       public boolean includeDefaultInJson() {
@@ -299,8 +300,15 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
         return true;
       }
     },
-    MAX_ROWS_IN_WINDOW_REACHED(StatMap.Type.BOOLEAN);
-    //@formatter:on
+    MAX_ROWS_IN_WINDOW_REACHED(StatMap.Type.BOOLEAN),
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index 2518a31b761..ccc170fad27 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -48,7 +48,7 @@ public abstract class BlockExchange {
 
   private final List<SendingMailbox> _sendingMailboxes;
   private final BlockSplitter _splitter;
- private final Function<List<SendingMailbox>, Integer> _statsIndexChooser;
+  private final Function<List<SendingMailbox>, Integer> _statsIndexChooser;
 
   protected static final Function<List<SendingMailbox>, Integer> 
RANDOM_INDEX_CHOOSER =
       (mailboxes) -> ThreadLocalRandom.current().nextInt(mailboxes.size());
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java
index a19487e35bd..dcfeb9397cc 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java
@@ -58,9 +58,11 @@ public abstract class SetOperator extends MultiStageOperator 
{
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -127,7 +129,6 @@ public abstract class SetOperator extends 
MultiStageOperator {
   }
 
   public enum StatKey implements StatMap.Key {
-    //@formatter:off
     EXECUTION_TIME_MS(StatMap.Type.LONG) {
       @Override
       public boolean includeDefaultInJson() {
@@ -139,8 +140,15 @@ public abstract class SetOperator extends 
MultiStageOperator {
       public boolean includeDefaultInJson() {
         return true;
       }
-    };
-    //@formatter:on
+    },
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index c58bbfd26b4..4ed8dfe3d1c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -63,9 +63,11 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+    _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+    _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
   }
 
   @Override
@@ -152,7 +154,15 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
 
   public enum StatKey implements StatMap.Key {
     EXECUTION_TIME_MS(StatMap.Type.LONG),
-    EMITTED_ROWS(StatMap.Type.LONG);
+    EMITTED_ROWS(StatMap.Type.LONG),
+    /**
+     * Allocated memory in bytes for this operator or its children in the same 
stage.
+     */
+    ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+    /**
+     * Time spent on GC while this operator or its children in the same stage 
were running.
+     */
+    GC_TIME_MS(StatMap.Type.LONG);
     private final StatMap.Type _type;
 
     StatKey(StatMap.Type type) {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java
index e72de377755..6cef4969bb6 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java
@@ -63,7 +63,7 @@ public class BlockListMultiStageOperator extends 
MultiStageOperator {
   }
 
   @Override
-  public void registerExecution(long time, int numRows) {
+  public void registerExecution(long time, int numRows, long memoryUsedBytes, 
long gcTimeMs) {
     _statMap.merge(LiteralValueOperator.StatKey.EXECUTION_TIME_MS, time);
     _statMap.merge(LiteralValueOperator.StatKey.EMITTED_ROWS, numRows);
   }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 328020498eb..7d5a89baed0 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -165,7 +165,7 @@ public class OpChainTest {
     }
 
     @Override
-    public void registerExecution(long time, int numRows) {
+    public void registerExecution(long time, int numRows, long 
memoryUsedBytes, long gcTimeMs) {
       _statMap.merge(LiteralValueOperator.StatKey.EXECUTION_TIME_MS, time);
       _statMap.merge(LiteralValueOperator.StatKey.EMITTED_ROWS, numRows);
     }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
index 44fb9f6cea3..998dffda02d 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pinot.spi.accounting;
 
+import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadMXBean;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,6 +78,19 @@ public class ThreadResourceUsageProvider {
     }
   }
 
+  /// Returns an approximation of the total garbage collection time in 
milliseconds.
+  public static long getGcTime() {
+    long totalGCTime = 0;
+    List<GarbageCollectorMXBean> gcBeans = 
ManagementFactory.getGarbageCollectorMXBeans();
+    for (GarbageCollectorMXBean gcBean : gcBeans) {
+      long gcTime = gcBean.getCollectionTime();
+      if (gcTime > 0) {
+        totalGCTime += gcTime;
+      }
+    }
+    return totalGCTime;
+  }
+
   public static boolean isThreadCpuTimeMeasurementEnabled() {
     return _isThreadCpuTimeMeasurementEnabled;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 9b5302cea6d..4473c0ad4f9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -583,6 +583,7 @@ public class CommonConstants {
         public static final String ROUTING_OPTIONS = "routingOptions";
         public static final String USE_SCAN_REORDER_OPTIMIZATION = 
"useScanReorderOpt";
         public static final String MAX_EXECUTION_THREADS = 
"maxExecutionThreads";
+        public static final String COLLECT_GC_STATS = "collectGCStats";
 
         // For group-by queries with order-by clause, the tail groups are 
trimmed off to reduce the memory footprint. To
         // ensure the accuracy of the result, {@code max(limit * 5, 
minTrimSize)} groups are retained. When
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
index ffb1603f9e6..14181c31228 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
@@ -89,6 +89,9 @@ public class ColocatedJoinQuickStart extends 
MultistageEngineQuickStart {
     Map<String, Object> overrides = new HashMap<>(super.getConfigOverrides());
     // This is actually not required anymore, but we are keeping it as 
reference
     
overrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER,
 "true");
+
+    
overrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
+    
overrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
     return overrides;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to