This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 53b7ebb5285 Add memory and GC time tracking to execution statistics
(#16576)
53b7ebb5285 is described below
commit 53b7ebb52855e7306fd7d9cf37ea4fc9c835aa45
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Sep 10 16:27:35 2025 +0200
Add memory and GC time tracking to execution statistics (#16576)
---
.../org/apache/pinot/common/datatable/StatMap.java | 2 +-
.../common/utils/config/QueryOptionsUtils.java | 5 +++++
.../pinot/query/mailbox/ReceivingMailbox.java | 4 +++-
.../query/runtime/operator/AggregateOperator.java | 16 ++++++++++----
.../query/runtime/operator/BaseJoinOperator.java | 16 ++++++++++----
.../operator/BaseMailboxReceiveOperator.java | 16 ++++++++++----
.../query/runtime/operator/ErrorOperator.java | 4 +++-
.../query/runtime/operator/FilterOperator.java | 16 ++++++++++----
.../pinot/query/runtime/operator/LeafOperator.java | 25 +++++++++++++++++++---
.../runtime/operator/LiteralValueOperator.java | 16 ++++++++++----
.../query/runtime/operator/LookupJoinOperator.java | 20 +++++++++++------
.../runtime/operator/MailboxSendOperator.java | 16 ++++++++++----
.../query/runtime/operator/MultiStageOperator.java | 19 ++++++++++++++--
.../pinot/query/runtime/operator/SortOperator.java | 16 ++++++++++----
.../query/runtime/operator/TransformOperator.java | 16 ++++++++++----
.../runtime/operator/WindowAggregateOperator.java | 16 ++++++++++----
.../runtime/operator/exchange/BlockExchange.java | 2 +-
.../query/runtime/operator/set/SetOperator.java | 16 ++++++++++----
.../plan/pipeline/PipelineBreakerOperator.java | 14 ++++++++++--
.../operator/BlockListMultiStageOperator.java | 2 +-
.../pinot/query/runtime/operator/OpChainTest.java | 2 +-
.../accounting/ThreadResourceUsageProvider.java | 15 +++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
.../pinot/tools/ColocatedJoinQuickStart.java | 3 +++
24 files changed, 219 insertions(+), 59 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
index 55a8d9bdc23..b64b281d5f3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
@@ -45,7 +45,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
* <li>Change the name of the keys</li>
* </ul>
*
- * Any other change (like changing the type of key, changing their literal
order are not supported or removing keys)
+ * Any other change (like changing the type of key, changing their literal
order or removing keys)
* are backward incompatible changes.
* @param <K>
*/
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 43b9036e741..b723c15e4c6 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -163,6 +163,11 @@ public class QueryOptionsUtils {
return
"false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION));
}
+ public static boolean isCollectGcStats(Map<String, String> queryOptions) {
+ // Disabled by default
+ return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.COLLECT_GC_STATS));
+ }
+
@Nullable
public static Map<String, Set<FieldConfig.IndexType>>
getSkipIndexes(Map<String, String> queryOptions) {
// Example config: skipIndexes='col1=inverted,range&col2=inverted'
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 6588e1cf76a..f1e4d5fd253 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -281,7 +281,9 @@ public class ReceivingMailbox {
},
IN_MEMORY_MESSAGES(StatMap.Type.INT),
OFFER_CPU_TIME_MS(StatMap.Type.LONG),
- WAIT_CPU_TIME_MS(StatMap.Type.LONG);
+ WAIT_CPU_TIME_MS(StatMap.Type.LONG),
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index da39a78fea6..eaf32a96ca6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -172,9 +172,11 @@ public class AggregateOperator extends MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -453,7 +455,6 @@ public class AggregateOperator extends MultiStageOperator {
}
public enum StatKey implements StatMap.Key {
- //@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
@Override
public boolean includeDefaultInJson() {
@@ -468,8 +469,15 @@ public class AggregateOperator extends MultiStageOperator {
},
GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
- NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
- //@formatter:on
+ NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
index ce8b959ff2e..063e692b79d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
@@ -159,9 +159,11 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -371,7 +373,6 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
}
public enum StatKey implements StatMap.Key {
- //@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
@Override
public boolean includeDefaultInJson() {
@@ -388,8 +389,15 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
/**
* How long (CPU time) has been spent on building the hash table.
*/
- TIME_BUILDING_HASH_TABLE_MS(StatMap.Type.LONG);
- //@formatter:on
+ TIME_BUILDING_HASH_TABLE_MS(StatMap.Type.LONG),
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index f6cf8e578c7..753ec9744b6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -141,9 +141,11 @@ public abstract class BaseMailboxReceiveOperator extends
MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
private void addReceivingStats(StatMap<ReceivingMailbox.StatKey> from) {
@@ -202,7 +204,6 @@ public abstract class BaseMailboxReceiveOperator extends
MultiStageOperator {
}
public enum StatKey implements StatMap.Key {
- //@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
@Override
public boolean includeDefaultInJson() {
@@ -255,8 +256,15 @@ public abstract class BaseMailboxReceiveOperator extends
MultiStageOperator {
/**
* How long (in CPU time) it took to wait for the messages to be offered
to downstream operator.
*/
- UPSTREAM_WAIT_MS(StatMap.Type.LONG);
- //@formatter:on
+ UPSTREAM_WAIT_MS(StatMap.Type.LONG),
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
index fbe9c8f99a3..926fec585ed 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/ErrorOperator.java
@@ -81,9 +81,11 @@ public class ErrorOperator extends MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(LiteralValueOperator.StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(LiteralValueOperator.StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(LiteralValueOperator.StatKey.ALLOCATED_MEMORY_BYTES,
memoryUsedBytes);
+ _statMap.merge(LiteralValueOperator.StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index a2886f8405a..e98b9e2b245 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -68,9 +68,11 @@ public class FilterOperator extends MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -122,7 +124,6 @@ public class FilterOperator extends MultiStageOperator {
}
public enum StatKey implements StatMap.Key {
- //@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
@Override
public boolean includeDefaultInJson() {
@@ -134,8 +135,15 @@ public class FilterOperator extends MultiStageOperator {
public boolean includeDefaultInJson() {
return true;
}
- };
- //@formatter:on
+ },
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 2d0939c85e8..42356ce74c1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -129,9 +129,11 @@ public class LeafOperator extends MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -714,7 +716,19 @@ public class LeafOperator extends MultiStageOperator {
public String getStatName() {
return "responseSerializationCpuTimeNs";
}
- };
+ },
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG, null),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG, null);
+ // IMPORTANT: When adding new StatKeys, make sure to either create the
same key in BrokerResponseNativeV2.StatKey or
+ // call the constructor that accepts a String as last argument and set it
to null.
+ // Otherwise the constructor will fail with an IllegalArgumentException
which will not be caught and will
+ // propagate to the caller, causing the query to timeout.
private final StatMap.Type _type;
@Nullable
@@ -722,7 +736,12 @@ public class LeafOperator extends MultiStageOperator {
StatKey(StatMap.Type type) {
_type = type;
- _brokerKey = BrokerResponseNativeV2.StatKey.valueOf(name());
+ try {
+ _brokerKey = BrokerResponseNativeV2.StatKey.valueOf(name());
+ } catch (IllegalArgumentException e) {
+ LOGGER.error("Failed to map StatKey: {} to
BrokerResponseNativeV2.StatKey", name(), e);
+ throw e;
+ }
}
StatKey(StatMap.Type type, @Nullable BrokerResponseNativeV2.StatKey
brokerKey) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 0dd666f9e11..a2b00827749 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -50,9 +50,11 @@ public class LiteralValueOperator extends MultiStageOperator
{
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -103,10 +105,16 @@ public class LiteralValueOperator extends
MultiStageOperator {
}
public enum StatKey implements StatMap.Key {
- //@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG),
- EMITTED_ROWS(StatMap.Type.LONG);
- //@formatter:on
+ EMITTED_ROWS(StatMap.Type.LONG),
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
index 6c9f2935b9f..8deec263954 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LookupJoinOperator.java
@@ -100,9 +100,11 @@ public class LookupJoinOperator extends MultiStageOperator
{
}
@Override
- public void registerExecution(long time, int numRows) {
- _statMap.merge(LookupJoinOperator.StatKey.EXECUTION_TIME_MS, time);
- _statMap.merge(LookupJoinOperator.StatKey.EMITTED_ROWS, numRows);
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
+ _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
+ _statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -240,7 +242,6 @@ public class LookupJoinOperator extends MultiStageOperator {
}
public enum StatKey implements StatMap.Key {
- //@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
@Override
public boolean includeDefaultInJson() {
@@ -252,8 +253,15 @@ public class LookupJoinOperator extends MultiStageOperator
{
public boolean includeDefaultInJson() {
return true;
}
- };
- //@formatter:on
+ },
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index cfdfecedb95..aa723360851 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -178,9 +178,11 @@ public class MailboxSendOperator extends
MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -317,7 +319,6 @@ public class MailboxSendOperator extends MultiStageOperator
{
}
public enum StatKey implements StatMap.Key {
- //@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
@Override
public boolean includeDefaultInJson() {
@@ -387,8 +388,15 @@ public class MailboxSendOperator extends
MultiStageOperator {
public boolean includeDefaultInJson() {
return true;
}
- };
- //@formatter:on
+ },
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index fc7bb41106b..269e83aa869 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.plan.ExplainInfo;
import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
@@ -41,6 +42,8 @@ import
org.apache.pinot.query.runtime.operator.set.SetOperator;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.trace.InvocationScope;
@@ -70,7 +73,7 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
public abstract Type getOperatorType();
- public abstract void registerExecution(long time, int numRows);
+ public abstract void registerExecution(long time, int numRows, long
memoryUsedBytes, long gcTimeMs);
/// By default, it uses the active deadline, which is the one that should be
used for most operators, but if the
/// operator does not actively process data (ie both mailbox operators), it
should override this method to use the
@@ -113,6 +116,9 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
if (logger().isDebugEnabled()) {
logger().debug("Operator {}: Reading next block", _operatorId);
}
+
+ ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
+ long preBlockGcTime = getGcTimeMillis();
try (InvocationScope ignored =
Tracing.getTracer().createScope(getClass())) {
MseBlock nextBlock;
Stopwatch executeStopwatch = Stopwatch.createStarted();
@@ -124,7 +130,9 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
nextBlock = ErrorMseBlock.fromException(e);
}
int numRows = nextBlock instanceof MseBlock.Data ? ((MseBlock.Data)
nextBlock).getNumRows() : 0;
- registerExecution(executeStopwatch.elapsed(TimeUnit.MILLISECONDS),
numRows);
+ long memoryUsedBytes = resourceSnapshot.getAllocatedBytes();
+ long gcTimeMs = getGcTimeMillis() - preBlockGcTime;
+ registerExecution(executeStopwatch.elapsed(TimeUnit.MILLISECONDS),
numRows, memoryUsedBytes, gcTimeMs);
if (logger().isDebugEnabled()) {
logger().debug("Operator {}. Block {} ready to send", _operatorId,
nextBlock);
@@ -220,6 +228,13 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
return Collections.emptyMap();
}
+ private long getGcTimeMillis() {
+ if (!QueryOptionsUtils.isCollectGcStats(_context.getOpChainMetadata())) {
+ return -1;
+ }
+ return ThreadResourceUsageProvider.getGcTime();
+ }
+
/**
* This enum is used to identify the operation type.
* <p>
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 798d3c356dd..c3b3ad2f9b9 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -86,9 +86,11 @@ public class SortOperator extends MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -193,7 +195,6 @@ public class SortOperator extends MultiStageOperator {
}
public enum StatKey implements StatMap.Key {
- //@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
@Override
public boolean includeDefaultInJson() {
@@ -205,8 +206,15 @@ public class SortOperator extends MultiStageOperator {
public boolean includeDefaultInJson() {
return true;
}
- };
- //@formatter:on
+ },
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 14aac5e86fb..4d919ffe7d0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -68,9 +68,11 @@ public class TransformOperator extends MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -118,15 +120,21 @@ public class TransformOperator extends MultiStageOperator
{
}
public enum StatKey implements StatMap.Key {
- //@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
@Override
public boolean includeDefaultInJson() {
return true;
}
},
- EMITTED_ROWS(StatMap.Type.LONG);
- //@formatter:on
+ EMITTED_ROWS(StatMap.Type.LONG),
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 0edaa21d3b9..908604c21f4 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -147,9 +147,11 @@ public class WindowAggregateOperator extends
MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -286,7 +288,6 @@ public class WindowAggregateOperator extends
MultiStageOperator {
}
public enum StatKey implements StatMap.Key {
- //@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
@Override
public boolean includeDefaultInJson() {
@@ -299,8 +300,15 @@ public class WindowAggregateOperator extends
MultiStageOperator {
return true;
}
},
- MAX_ROWS_IN_WINDOW_REACHED(StatMap.Type.BOOLEAN);
- //@formatter:on
+ MAX_ROWS_IN_WINDOW_REACHED(StatMap.Type.BOOLEAN),
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index 2518a31b761..ccc170fad27 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -48,7 +48,7 @@ public abstract class BlockExchange {
private final List<SendingMailbox> _sendingMailboxes;
private final BlockSplitter _splitter;
- private final Function<List<SendingMailbox>, Integer> _statsIndexChooser;
+ private final Function<List<SendingMailbox>, Integer> _statsIndexChooser;
protected static final Function<List<SendingMailbox>, Integer>
RANDOM_INDEX_CHOOSER =
(mailboxes) -> ThreadLocalRandom.current().nextInt(mailboxes.size());
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java
index a19487e35bd..dcfeb9397cc 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/set/SetOperator.java
@@ -58,9 +58,11 @@ public abstract class SetOperator extends MultiStageOperator
{
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -127,7 +129,6 @@ public abstract class SetOperator extends
MultiStageOperator {
}
public enum StatKey implements StatMap.Key {
- //@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
@Override
public boolean includeDefaultInJson() {
@@ -139,8 +140,15 @@ public abstract class SetOperator extends
MultiStageOperator {
public boolean includeDefaultInJson() {
return true;
}
- };
- //@formatter:on
+ },
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index c58bbfd26b4..4ed8dfe3d1c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -63,9 +63,11 @@ public class PipelineBreakerOperator extends
MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(StatKey.EMITTED_ROWS, numRows);
+ _statMap.merge(StatKey.ALLOCATED_MEMORY_BYTES, memoryUsedBytes);
+ _statMap.merge(StatKey.GC_TIME_MS, gcTimeMs);
}
@Override
@@ -152,7 +154,15 @@ public class PipelineBreakerOperator extends
MultiStageOperator {
public enum StatKey implements StatMap.Key {
EXECUTION_TIME_MS(StatMap.Type.LONG),
- EMITTED_ROWS(StatMap.Type.LONG);
+ EMITTED_ROWS(StatMap.Type.LONG),
+ /**
+ * Allocated memory in bytes for this operator or its children in the same
stage.
+ */
+ ALLOCATED_MEMORY_BYTES(StatMap.Type.LONG),
+ /**
+ * Time spent on GC while this operator or its children in the same stage
were running.
+ */
+ GC_TIME_MS(StatMap.Type.LONG);
private final StatMap.Type _type;
StatKey(StatMap.Type type) {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java
index e72de377755..6cef4969bb6 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/BlockListMultiStageOperator.java
@@ -63,7 +63,7 @@ public class BlockListMultiStageOperator extends
MultiStageOperator {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long memoryUsedBytes,
long gcTimeMs) {
_statMap.merge(LiteralValueOperator.StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(LiteralValueOperator.StatKey.EMITTED_ROWS, numRows);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 328020498eb..7d5a89baed0 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -165,7 +165,7 @@ public class OpChainTest {
}
@Override
- public void registerExecution(long time, int numRows) {
+ public void registerExecution(long time, int numRows, long
memoryUsedBytes, long gcTimeMs) {
_statMap.merge(LiteralValueOperator.StatKey.EXECUTION_TIME_MS, time);
_statMap.merge(LiteralValueOperator.StatKey.EMITTED_ROWS, numRows);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
index 44fb9f6cea3..998dffda02d 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
@@ -18,10 +18,12 @@
*/
package org.apache.pinot.spi.accounting;
+import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +78,19 @@ public class ThreadResourceUsageProvider {
}
}
+ /// Returns an approximation of the total garbage collection time in
milliseconds.
+ public static long getGcTime() {
+ long totalGCTime = 0;
+ List<GarbageCollectorMXBean> gcBeans =
ManagementFactory.getGarbageCollectorMXBeans();
+ for (GarbageCollectorMXBean gcBean : gcBeans) {
+ long gcTime = gcBean.getCollectionTime();
+ if (gcTime > 0) {
+ totalGCTime += gcTime;
+ }
+ }
+ return totalGCTime;
+ }
+
public static boolean isThreadCpuTimeMeasurementEnabled() {
return _isThreadCpuTimeMeasurementEnabled;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 9b5302cea6d..4473c0ad4f9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -583,6 +583,7 @@ public class CommonConstants {
public static final String ROUTING_OPTIONS = "routingOptions";
public static final String USE_SCAN_REORDER_OPTIMIZATION =
"useScanReorderOpt";
public static final String MAX_EXECUTION_THREADS =
"maxExecutionThreads";
+ public static final String COLLECT_GC_STATS = "collectGCStats";
// For group-by queries with order-by clause, the tail groups are
trimmed off to reduce the memory footprint. To
// ensure the accuracy of the result, {@code max(limit * 5,
minTrimSize)} groups are retained. When
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
index ffb1603f9e6..14181c31228 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
@@ -89,6 +89,9 @@ public class ColocatedJoinQuickStart extends
MultistageEngineQuickStart {
Map<String, Object> overrides = new HashMap<>(super.getConfigOverrides());
// This is actually not required anymore, but we are keeping it as
reference
overrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER,
"true");
+
+
overrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
true);
+
overrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
true);
return overrides;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]