This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 35c89c87a5 [Multi-stage] Reduce the stats transfered (#12517) 35c89c87a5 is described below commit 35c89c87a5473465ca273b576d5bf1e554cb0c35 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Feb 29 00:07:41 2024 -0800 [Multi-stage] Reduce the stats transfered (#12517) --- .../MultiStageBrokerRequestHandler.java | 20 +++++++---- .../pinot/common/datablock/DataBlockUtils.java | 2 -- .../runtime/blocks/TransferableBlockUtils.java | 3 +- .../runtime/operator/exchange/BlockExchange.java | 39 ++++++++++++++++------ .../query/service/dispatch/QueryDispatcher.java | 25 ++++++-------- 5 files changed, 55 insertions(+), 34 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 05fb1ddd52..35aff7efd2 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -19,8 +19,9 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Maps; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -61,6 +62,7 @@ import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.catalog.PinotCatalog; import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; import org.apache.pinot.query.routing.WorkerManager; import org.apache.pinot.query.service.dispatch.QueryDispatcher; @@ -179,14 +181,20 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { Map<String, String> queryOptions = sqlNodeAndOptions.getOptions(); boolean traceEnabled = Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)); - - ResultTable queryResults; - Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>(); - for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) { - stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled)); + Map<Integer, ExecutionStatsAggregator> stageIdStatsMap; + if (!traceEnabled) { + stageIdStatsMap = Collections.singletonMap(0, new ExecutionStatsAggregator(false)); + } else { + List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList(); + int numStages = stagePlans.size(); + stageIdStatsMap = Maps.newHashMapWithExpectedSize(numStages); + for (int stageId = 0; stageId < numStages; stageId++) { + stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(true)); + } } long executionStartTimeNs = System.nanoTime(); + ResultTable queryResults; try { queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions, stageIdStatsMap); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java index 5d38168712..27f1140328 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java @@ -59,12 +59,10 @@ public final class DataBlockUtils { } public static MetadataBlock getEndOfStreamDataBlock() { - // TODO: add query statistics metadata for the block. return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS); } public static MetadataBlock getEndOfStreamDataBlock(Map<String, String> stats) { - // TODO: add query statistics metadata for the block. return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS, stats); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java index 01c5fd7ddd..355b6fe294 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java @@ -30,13 +30,14 @@ import org.apache.pinot.common.datablock.DataBlockUtils; public final class TransferableBlockUtils { private static final int MEDIAN_COLUMN_SIZE_BYTES = 8; + private static final TransferableBlock EMPTY_EOS = new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()); private TransferableBlockUtils() { // do not instantiate. } public static TransferableBlock getEndOfStreamTransferableBlock() { - return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()); + return EMPTY_EOS; } public static TransferableBlock getEndOfStreamTransferableBlock(Map<String, String> statsMap) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java index 453288ecb3..f8d49b6328 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java @@ -21,6 +21,7 @@ package org.apache.pinot.query.runtime.operator.exchange; import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.common.datablock.DataBlock; @@ -28,6 +29,7 @@ import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.planner.partitioning.KeySelectorFactory; import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; /** @@ -75,24 +77,39 @@ public abstract class BlockExchange { */ public boolean send(TransferableBlock block) throws Exception { - if (block.isEndOfStreamBlock()) { + if (block.isErrorBlock()) { + // Send error block to all mailboxes to propagate the error for (SendingMailbox sendingMailbox : _sendingMailboxes) { sendBlock(sendingMailbox, block); } return false; - } else { - boolean isEarlyTerminated = true; - for (SendingMailbox sendingMailbox : _sendingMailboxes) { - if (!sendingMailbox.isEarlyTerminated()) { - isEarlyTerminated = false; - break; - } + } + + if (block.isSuccessfulEndOfStreamBlock()) { + // Send metadata to only one randomly picked mailbox, and empty EOS block to other mailboxes + int numMailboxes = _sendingMailboxes.size(); + int mailboxIdToSendMetadata = ThreadLocalRandom.current().nextInt(numMailboxes); + for (int i = 0; i < numMailboxes; i++) { + SendingMailbox sendingMailbox = _sendingMailboxes.get(i); + TransferableBlock blockToSend = + i == mailboxIdToSendMetadata ? block : TransferableBlockUtils.getEndOfStreamTransferableBlock(); + sendBlock(sendingMailbox, blockToSend); } - if (!isEarlyTerminated) { - route(_sendingMailboxes, block); + return false; + } + + assert block.isDataBlock(); + boolean isEarlyTerminated = true; + for (SendingMailbox sendingMailbox : _sendingMailboxes) { + if (!sendingMailbox.isEarlyTerminated()) { + isEarlyTerminated = false; + break; } - return isEarlyTerminated; } + if (!isEarlyTerminated) { + route(_sendingMailboxes, block); + } + return isEarlyTerminated; } protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index f217c9b01a..c13d6f6aee 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import org.apache.calcite.util.Pair; +import org.apache.commons.collections.MapUtils; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.common.response.broker.ResultTable; @@ -89,7 +90,7 @@ public class QueryDispatcher { } public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, - Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator) + Map<String, String> queryOptions, @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregator) throws Exception { long requestId = context.getRequestId(); try { @@ -278,20 +279,16 @@ public class QueryDispatcher { } private static void collectStats(DispatchableSubPlan dispatchableSubPlan, OpChainStats opChainStats, - @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) { - if (executionStatsAggregatorMap != null) { - LOGGER.info("Extracting broker query execution stats, Runtime: {}ms", opChainStats.getExecutionTime()); - for (Map.Entry<String, OperatorStats> entry : opChainStats.getOperatorStatsMap().entrySet()) { - OperatorStats operatorStats = entry.getValue(); - ExecutionStatsAggregator rootStatsAggregator = executionStatsAggregatorMap.get(0); - ExecutionStatsAggregator stageStatsAggregator = executionStatsAggregatorMap.get(operatorStats.getStageId()); - rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>()); + @Nullable Map<Integer, ExecutionStatsAggregator> statsAggregatorMap) { + if (MapUtils.isNotEmpty(statsAggregatorMap)) { + for (OperatorStats operatorStats : opChainStats.getOperatorStatsMap().values()) { + ExecutionStatsAggregator rootStatsAggregator = statsAggregatorMap.get(0); + rootStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>()); + ExecutionStatsAggregator stageStatsAggregator = statsAggregatorMap.get(operatorStats.getStageId()); if (stageStatsAggregator != null) { - if (dispatchableSubPlan != null) { - OperatorUtils.recordTableName(operatorStats, - dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId())); - } - stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>()); + OperatorUtils.recordTableName(operatorStats, + dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId())); + stageStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>()); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org