This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5366635 Instrument combine operators query execution code with thread cpu time. (#6680) 5366635 is described below commit 536663564b76a7878e1680833b6e61b1bd89da8d Author: Liang Mingqiang <mili...@linkedin.com> AuthorDate: Thu Mar 18 00:31:49 2021 -0700 Instrument combine operators query execution code with thread cpu time. (#6680) Instrument the query execution code in combine operators to measure the thread cpu time. The measurement is disabled by default and can be enabled using an instance level config. Co-authored-by: Siddharth Teotia <siddharthteo...@gmail.com> --- .../apache/pinot/common/metrics/ServerGauge.java | 2 +- .../apache/pinot/common/utils/CommonConstants.java | 4 ++ .../org/apache/pinot/common/utils/DataTable.java | 1 + .../core/operator/InstanceResponseOperator.java | 15 ++++- .../operator/blocks/IntermediateResultsBlock.java | 9 +++ .../core/operator/combine/BaseCombineOperator.java | 14 ++++- .../StreamingSelectionOnlyCombineOperator.java | 1 + .../query/executor/ServerQueryExecutorV1Impl.java | 1 + .../core/query/request/context/ThreadTimer.java | 72 ++++++++++++++++++++++ .../pinot/core/query/scheduler/QueryScheduler.java | 17 +++-- .../operator/combine/CombineSlowOperatorsTest.java | 6 +- .../pinot/integration/tests/ClusterTest.java | 3 + .../server/starter/helix/HelixServerStarter.java | 6 ++ 13 files changed, 142 insertions(+), 9 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java index 8d566a3..1b629dd 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java @@ -42,7 +42,7 @@ public enum ServerGauge implements AbstractMetrics.Gauge { REALTIME_SEGMENT_NUM_PARTITIONS("realtimeSegmentNumPartitions", false), LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true), RESIZE_TIME_MS("milliseconds", false), - + EXECUTION_THREAD_CPU_TIME_NS("nanoseconds", false), // Upsert metrics UPSERT_PRIMARY_KEYS_COUNT("upsertPrimaryKeysCount", false); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 191ae93..678aea1 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -317,6 +317,10 @@ public class CommonConstants { public static final String ACCESS_CONTROL_FACTORY_CLASS = "pinot.server.admin.access.control.factory.class"; public static final String DEFAULT_ACCESS_CONTROL_FACTORY_CLASS = "org.apache.pinot.server.api.access.AllowAllAccessFactory"; + + public static final String CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT = + "pinot.server.instance.enableThreadCpuTimeMeasurement"; + public static final boolean DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT = false; } public static class Controller { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java index 181dc5a..7e82e56 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java @@ -44,6 +44,7 @@ public interface DataTable { String REQUEST_ID_METADATA_KEY = "requestId"; String NUM_RESIZES_METADATA_KEY = "numResizes"; String RESIZE_TIME_MS_METADATA_KEY = "resizeTimeMs"; + String EXECUTION_THREAD_CPU_TIME_NS_METADATA_KEY = "executionThreadCpuTimeNs"; void addException(ProcessingException processingException); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java index 31da495..02fa1b5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java @@ -18,9 +18,11 @@ */ package org.apache.pinot.core.operator; +import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.query.request.context.ThreadTimer; public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock> { @@ -34,7 +36,18 @@ public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock @Override protected InstanceResponseBlock getNextBlock() { - return new InstanceResponseBlock((IntermediateResultsBlock) _operator.nextBlock()); + ThreadTimer mainThreadTimer = new ThreadTimer(); + mainThreadTimer.start(); + + IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) _operator.nextBlock(); + InstanceResponseBlock instanceResponseBlock = new InstanceResponseBlock(intermediateResultsBlock); + DataTable dataTable = instanceResponseBlock.getInstanceResponseDataTable(); + + mainThreadTimer.stop(); + long totalThreadCpuTimeNs = intermediateResultsBlock.getThreadCpuTimeNs() + mainThreadTimer.getThreadTimeNs(); + dataTable.getMetadata().put(DataTable.EXECUTION_THREAD_CPU_TIME_NS_METADATA_KEY, String.valueOf(totalThreadCpuTimeNs)); + + return instanceResponseBlock; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java index 09784fa..37f395f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java @@ -69,6 +69,7 @@ public class IntermediateResultsBlock implements Block { private boolean _numGroupsLimitReached; private int _numResizes; private long _resizeTimeMs; + private long _threadCpuTimeNs; private Table _table; @@ -231,6 +232,14 @@ public class IntermediateResultsBlock implements Block { _resizeTimeMs = resizeTimeMs; } + public void setThreadCpuTimeNs(long threadCpuTimeNanos) { + _threadCpuTimeNs = threadCpuTimeNanos; + } + + public long getThreadCpuTimeNs() { + return _threadCpuTimeNs; + } + @VisibleForTesting public long getNumDocsScanned() { return _numDocsScanned; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index aef6cd1..417f449 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -26,12 +26,14 @@ import java.util.concurrent.Future; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.exception.EarlyTerminationException; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.ThreadTimer; import org.apache.pinot.core.util.trace.TraceRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +61,7 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul protected final Phaser _phaser = new Phaser(1); // Use a _blockingQueue to store the per-segment result protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue; + private final AtomicLong totalWorkerThreadCpuTimeNs = new AtomicLong(0); protected int _numThreads; protected Future[] _futures; @@ -88,12 +91,21 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul _futures[i] = _executorService.submit(new TraceRunnable() { @Override public void runJob() { + ThreadTimer executionThreadTimer = new ThreadTimer(); + executionThreadTimer.start(); + processSegments(threadIndex); + + totalWorkerThreadCpuTimeNs.addAndGet(executionThreadTimer.stopAndGetThreadTimeNs()); } }); } - IntermediateResultsBlock mergedBlock = mergeResultsFromSegments(); + /* + * TODO: setThreadTime logic can be put into CombineOperatorUtils.setExecutionStatistics(), + * after we extends StreamingSelectionOnlyCombineOperator from BaseCombineOperator. + */ + mergedBlock.setThreadCpuTimeNs(totalWorkerThreadCpuTimeNs.get()); CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators); return mergedBlock; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java index 619bcc5..e84e084 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory; /** * Combine operator for selection only streaming queries. + * TODO: extend StreamingSelectionOnlyCombineOperator from BaseCombineOperator. */ @SuppressWarnings({"rawtypes", "unchecked"}) public class StreamingSelectionOnlyCombineOperator extends BaseOperator<IntermediateResultsBlock> { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 194e57f..88b64be 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -77,6 +77,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { private PlanMaker _planMaker; private long _defaultTimeOutMs = CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS; private ServerMetrics _serverMetrics; + boolean _enableThreadCpuTimeInstrument; @Override public synchronized void init(PinotConfiguration config, InstanceDataManager instanceDataManager, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/ThreadTimer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/ThreadTimer.java new file mode 100644 index 0000000..f29b7fb --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/ThreadTimer.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.request.context; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The {@code ThreadTimer} class providing the functionality of measuring the CPU time for the current thread. + */ +public class ThreadTimer { + private static final ThreadMXBean MX_BEAN = ManagementFactory.getThreadMXBean(); + private static final boolean IS_CURRENT_THREAD_CPU_TIME_SUPPORTED = MX_BEAN.isCurrentThreadCpuTimeSupported(); + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadTimer.class); + private static boolean IS_THREAD_CPU_TIME_MEASUREMENT_ENABLED = false; + private long _startTimeNs = -1; + private long _endTimeNs = -1; + + public ThreadTimer() { + } + + public static void setThreadCpuTimeMeasurementEnabled(boolean enable) { + IS_THREAD_CPU_TIME_MEASUREMENT_ENABLED = enable && IS_CURRENT_THREAD_CPU_TIME_SUPPORTED; + } + + public void start() { + if (IS_THREAD_CPU_TIME_MEASUREMENT_ENABLED) { + _startTimeNs = MX_BEAN.getCurrentThreadCpuTime(); + } + } + + public void stop() { + if (IS_THREAD_CPU_TIME_MEASUREMENT_ENABLED) { + _endTimeNs = MX_BEAN.getCurrentThreadCpuTime(); + } + } + + public long getThreadTimeNs() { + if (_startTimeNs == -1 || _endTimeNs == -1) { + return 0; + } + return _endTimeNs - _startTimeNs; + } + + public long stopAndGetThreadTimeNs() { + stop(); + return getThreadTimeNs(); + } + + static { + LOGGER.info("Current thread cpu time measurement supported: {}", IS_CURRENT_THREAD_CPU_TIME_SUPPORTED); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java index 50c2434..bda3fc9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java @@ -168,8 +168,6 @@ public abstract class QueryScheduler { Map<String, String> dataTableMetadata = dataTable.getMetadata(); dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(requestId)); - byte[] responseData = serializeDataTable(queryRequest, dataTable); - // Log the statistics String tableNameWithType = queryRequest.getTableNameWithType(); long numDocsScanned = @@ -188,6 +186,7 @@ public abstract class QueryScheduler { Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, INVALID_FRESHNESS_MS)); int numResizes = Integer.parseInt(dataTableMetadata.getOrDefault(DataTable.NUM_RESIZES_METADATA_KEY, INVALID_NUM_RESIZES)); long resizeTimeMs = Long.parseLong(dataTableMetadata.getOrDefault(DataTable.RESIZE_TIME_MS_METADATA_KEY, INVALID_RESIZE_TIME_MS)); + long executionThreadCpuTimeNs = Long.parseLong(dataTableMetadata.getOrDefault(DataTable.EXECUTION_THREAD_CPU_TIME_NS_METADATA_KEY, "0")); if (numDocsScanned > 0) { serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned); @@ -206,6 +205,9 @@ public abstract class QueryScheduler { if (resizeTimeMs > 0) { serverMetrics.addValueToTableGauge(tableNameWithType, ServerGauge.RESIZE_TIME_MS, resizeTimeMs); } + if (executionThreadCpuTimeNs > 0) { + serverMetrics.addValueToTableGauge(tableNameWithType, ServerGauge.EXECUTION_THREAD_CPU_TIME_NS, executionThreadCpuTimeNs); + } TimerContext timerContext = queryRequest.getTimerContext(); int numSegmentsQueried = queryRequest.getSegmentsToQuery().size(); @@ -216,13 +218,14 @@ public abstract class QueryScheduler { if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) { LOGGER.info("Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{}," + "schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},minConsumingFreshnessMs={},broker={}," - + "numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}", requestId, tableNameWithType, + + "numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},executionThreadCpuTimeNs={}", requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming, schedulerWaitMs, timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION), timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING), timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION), timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs, - queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name()); + queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name(), + executionThreadCpuTimeNs); // Limit the dropping log message at most once per second. if (numDroppedLogRateLimiter.tryAcquire()) { @@ -247,6 +250,12 @@ public abstract class QueryScheduler { serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed); serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched); + /** + * TODO: Currently not send "executionThreadCpuTimeNs" as part of metadata to broker. Revisit this when follow-up + * work of data table serialization cost measurement is done. + */ + dataTableMetadata.remove(DataTable.EXECUTION_THREAD_CPU_TIME_NS_METADATA_KEY); + byte[] responseData = serializeDataTable(queryRequest, dataTable); return responseData; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java index 6dcf556..ec4ad2e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java @@ -65,7 +65,8 @@ public class CombineSlowOperatorsTest { public void testSelectionOnlyCombineOperator() { List<Operator> operators = getOperators(); SelectionOnlyCombineOperator combineOperator = new SelectionOnlyCombineOperator(operators, - QueryContextConverterUtils.getQueryContextFromPQL("SELECT * FROM table"), _executorService, TIMEOUT_MS); + QueryContextConverterUtils.getQueryContextFromPQL("SELECT * FROM table"), + _executorService, TIMEOUT_MS); testCombineOperator(operators, combineOperator); } @@ -76,7 +77,8 @@ public class CombineSlowOperatorsTest { public void testAggregationOnlyCombineOperator() { List<Operator> operators = getOperators(); AggregationOnlyCombineOperator combineOperator = new AggregationOnlyCombineOperator(operators, - QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) FROM table"), _executorService, TIMEOUT_MS); + QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) FROM table"), _executorService, + TIMEOUT_MS); testCombineOperator(operators, combineOperator); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index ed8e4da..c16c765 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -165,6 +165,9 @@ public abstract class ClusterTest extends ControllerTest { .setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i); configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i); configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i); + // Thread time measurement is disabled by default, enable it in integration tests. + // TODO: this can be removed when we eventually enable thread time measurement by default. + configuration.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true); HelixServerStarter helixServerStarter = new HelixServerStarter(getHelixClusterName(), zkStr, configuration); _serverStarters.add(helixServerStarter); helixServerStarter.start(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java index 509e8f7..2d9758b 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java @@ -60,6 +60,7 @@ import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.ServiceStatus.Status; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.query.request.context.ThreadTimer; import org.apache.pinot.core.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState; import org.apache.pinot.core.segment.memory.PinotDataBuffer; import org.apache.pinot.core.transport.ListenerConfig; @@ -140,6 +141,11 @@ public class HelixServerStarter implements ServiceStartable { _instanceConfigScope = new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT, _helixClusterName).forParticipant(_instanceId) .build(); + + // Enable/disable thread CPU time measurement through instance config. + ThreadTimer.setThreadCpuTimeMeasurementEnabled(_serverConf + .getProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, + Server.DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT)); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org