Jackie-Jiang commented on code in PR #12704: URL: https://github.com/apache/pinot/pull/12704#discussion_r1575464681
########## pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java: ########## @@ -51,6 +59,7 @@ "segmentStatistics", "traceInfo", "partialResult" }) public class BrokerResponseNative implements BrokerResponse { Review Comment: I'm trying to follow the changes in this class. This class represents the response for v1 queries. Currently (before this PR) we have a `BrokerResponseNativeV2` which extends the v1 query response with just an additional stage id stats list. I feel we should use this opportunity to clean them up and decouple the v2 response format from the v1 format because a lot of things do not apply for v2, and with v2 stats we don't really need the extra fields in this class. I guess the intention here is to keep things compatible with the current interface. Shall we keep the changes within `BrokerResponseNativeV2` if possible? We can make it directly implements `BrokerResponse` so that we can easily decouple v1 and v2 in the future ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java: ########## @@ -276,6 +264,161 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S return brokerResponse; } + private void fillOldBrokerResponseStats(BrokerResponseNativeV2 brokerResponse, + List<MultiStageQueryStats.StageStats.Closed> queryStats, DispatchableSubPlan dispatchableSubPlan) { + for (int i = 0; i < queryStats.size(); i++) { + MultiStageQueryStats.StageStats.Closed stageStats = queryStats.get(i); + if (stageStats == null) { Review Comment: For my understanding, will this ever contain `null` value? Is this for the broker stage (stage 0)? If so, some comments can help understanding the code ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java: ########## @@ -72,11 +81,53 @@ public String getId() { return _id; } + /** + * Offers a raw block into the mailbox within the timeout specified, returns whether the block is successfully added. + * If the block is not added, an error block is added to the mailbox. + * <p> + * Contrary to {@link #offer(TransferableBlock, long)}, the block may be an + * {@link TransferableBlock#isErrorBlock() error block}. + */ + public ReceivingMailboxStatus offerRaw(ByteBuffer byteBuffer, long timeoutMs) + throws IOException { + TransferableBlock block; + long now = System.currentTimeMillis(); + _stats.merge(StatKey.WAIT_CPU_TIME_MS, now - _lastArriveTime); + _lastArriveTime = now; + _stats.merge(StatKey.DESERIALIZED_BYTES, byteBuffer.remaining()); + _stats.merge(StatKey.DESERIALIZED_MESSAGES, 1); + + now = System.currentTimeMillis(); + DataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer); + _stats.merge(StatKey.DESERIALIZATION_TIME_MS, System.currentTimeMillis() - now); + + if (dataBlock instanceof MetadataBlock) { + Map<Integer, String> exceptions = dataBlock.getExceptions(); + if (exceptions.isEmpty()) { + block = TransferableBlockUtils.wrap(dataBlock); + } else { + setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(exceptions)); + return ReceivingMailboxStatus.FIRST_ERROR; Review Comment: `FIRST_ERROR` is a little bit confusing. I think this means the error block is successfully set? Maybe make it more explicit like `ERROR_BLOCK_SET`? ########## pinot-common/src/main/java/org/apache/pinot/common/datablock/V1MetadataBlock.java: ########## @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.datablock; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +/** + * The datablock used before the introduction of {@link org.apache.pinot.common.datatable.StatMap}. + * <p> + * This version stored the metadata in a {@code Map<String, String>} which was encoded as JSON and stored in the + * variable size data buffer. + * <p> + * Instances of this class are not actually seen by the operators. Instead, they are converted to {@link MetadataBlock} + * in {@link MetadataBlock#deserialize(ByteBuffer, int)}. + * <p> + * The reason to keep it here is mostly for backwards compatibility and testing. In order to simplify the code, the + * stats engine just ignores the metadata of these objects, but we need to be able to deserialize them anyway. + */ +public class V1MetadataBlock extends BaseDataBlock { Review Comment: Shall we mark it deprecated? Or maybe moving it to test package since it is just for testing purpose? ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java: ########## @@ -276,6 +264,161 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S return brokerResponse; } + private void fillOldBrokerResponseStats(BrokerResponseNativeV2 brokerResponse, + List<MultiStageQueryStats.StageStats.Closed> queryStats, DispatchableSubPlan dispatchableSubPlan) { + for (int i = 0; i < queryStats.size(); i++) { + MultiStageQueryStats.StageStats.Closed stageStats = queryStats.get(i); + if (stageStats == null) { + brokerResponse.addStageStats(JsonUtils.newObjectNode()); + } else { + stageStats.forEach((type, stats) -> type.mergeInto(brokerResponse, stats)); + + DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(i); + MultiStageStatsTreeBuilder treeBuilder = new MultiStageStatsTreeBuilder(stageStats); + PlanNode fragmentRoot = dispatchablePlanFragment.getPlanFragment().getFragmentRoot(); + JsonNode node = fragmentRoot.visit(treeBuilder, null); + brokerResponse.addStageStats(node); + } + } + } + + public static class MultiStageStatsTreeBuilder implements PlanNodeVisitor<JsonNode, Void> { + private final MultiStageQueryStats.StageStats.Closed _stageStats; + private int _index; + private static final String CHILDREN_KEY = "children"; + + public MultiStageStatsTreeBuilder(MultiStageQueryStats.StageStats.Closed stageStats) { + _stageStats = stageStats; + _index = stageStats.getLastOperatorIndex(); + } + + private ObjectNode selfNode(MultiStageOperator.Type type) { + ObjectNode json = JsonUtils.newObjectNode(); + json.put("type", type.toString()); + Iterator<Map.Entry<String, JsonNode>> statsIt = _stageStats.getOperatorStats(_index).asJson().fields(); + while (statsIt.hasNext()) { + Map.Entry<String, JsonNode> entry = statsIt.next(); + json.set(entry.getKey(), entry.getValue()); + } + return json; + } + + private JsonNode recursiveCase(AbstractPlanNode node, MultiStageOperator.Type expectedType) { + MultiStageOperator.Type type = _stageStats.getOperatorType(_index); + if (type != expectedType) { Review Comment: Is this regular case? Some comments can help explain the logic ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java: ########## @@ -276,6 +264,161 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S return brokerResponse; } + private void fillOldBrokerResponseStats(BrokerResponseNativeV2 brokerResponse, + List<MultiStageQueryStats.StageStats.Closed> queryStats, DispatchableSubPlan dispatchableSubPlan) { + for (int i = 0; i < queryStats.size(); i++) { + MultiStageQueryStats.StageStats.Closed stageStats = queryStats.get(i); + if (stageStats == null) { + brokerResponse.addStageStats(JsonUtils.newObjectNode()); + } else { + stageStats.forEach((type, stats) -> type.mergeInto(brokerResponse, stats)); + + DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(i); + MultiStageStatsTreeBuilder treeBuilder = new MultiStageStatsTreeBuilder(stageStats); + PlanNode fragmentRoot = dispatchablePlanFragment.getPlanFragment().getFragmentRoot(); + JsonNode node = fragmentRoot.visit(treeBuilder, null); + brokerResponse.addStageStats(node); + } + } + } + + public static class MultiStageStatsTreeBuilder implements PlanNodeVisitor<JsonNode, Void> { + private final MultiStageQueryStats.StageStats.Closed _stageStats; + private int _index; + private static final String CHILDREN_KEY = "children"; + + public MultiStageStatsTreeBuilder(MultiStageQueryStats.StageStats.Closed stageStats) { + _stageStats = stageStats; + _index = stageStats.getLastOperatorIndex(); + } + + private ObjectNode selfNode(MultiStageOperator.Type type) { + ObjectNode json = JsonUtils.newObjectNode(); + json.put("type", type.toString()); + Iterator<Map.Entry<String, JsonNode>> statsIt = _stageStats.getOperatorStats(_index).asJson().fields(); + while (statsIt.hasNext()) { + Map.Entry<String, JsonNode> entry = statsIt.next(); + json.set(entry.getKey(), entry.getValue()); + } + return json; + } + + private JsonNode recursiveCase(AbstractPlanNode node, MultiStageOperator.Type expectedType) { + MultiStageOperator.Type type = _stageStats.getOperatorType(_index); + if (type != expectedType) { + if (type == MultiStageOperator.Type.LEAF) { + // Leaf nodes compile the plan node into a single operator and therefore return a single stat + return selfNode(type); + } + int childrenSize = node.getInputs().size(); + switch (childrenSize) { + case 0: + LOGGER.warn("Skipping unexpected node {} when stat of type {} was found at index {}", + node.getClass(), type, _index); + return JsonUtils.newObjectNode(); + case 1: + LOGGER.warn("Skipping unexpected node {} when stat of type {} was found at index {}", + node.getClass(), type, _index); + return node.getInputs().get(0).visit(this, null); + default: + throw new IllegalStateException("Expected operator type: " + expectedType + ", but got: " + type + " with " Review Comment: Do we want to fail the whole query when stats processing fails? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java: ########## @@ -19,44 +19,50 @@ package org.apache.pinot.query.runtime.blocks; import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.common.datablock.ColumnarDataBlock; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.datablock.MetadataBlock; import org.apache.pinot.common.datablock.RowDataBlock; -import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.datablock.DataBlockBuilder; import org.apache.pinot.core.util.DataBlockExtractUtils; -import org.apache.pinot.query.runtime.operator.OperatorStats; -import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; - +import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; /** * A {@code TransferableBlock} is a wrapper around {@link DataBlock} for transferring data using * {@link org.apache.pinot.common.proto.Mailbox}. */ public class TransferableBlock implements Block { private final DataBlock.Type _type; + @Nullable private final DataSchema _dataSchema; private final int _numRows; private List<Object[]> _container; private DataBlock _dataBlock; private Map<Integer, String> _errCodeToExceptionMap; + @Nullable + private final MultiStageQueryStats _queryStats; - public TransferableBlock(List<Object[]> container, DataSchema dataSchema, DataBlock.Type type) { + public TransferableBlock(List<Object[]> container, @Nullable DataSchema dataSchema, DataBlock.Type type) { Review Comment: We shouldn't use this constructor when schema doesn't exist ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java: ########## @@ -72,11 +81,53 @@ public String getId() { return _id; } + /** + * Offers a raw block into the mailbox within the timeout specified, returns whether the block is successfully added. + * If the block is not added, an error block is added to the mailbox. + * <p> + * Contrary to {@link #offer(TransferableBlock, long)}, the block may be an + * {@link TransferableBlock#isErrorBlock() error block}. + */ + public ReceivingMailboxStatus offerRaw(ByteBuffer byteBuffer, long timeoutMs) + throws IOException { + TransferableBlock block; + long now = System.currentTimeMillis(); + _stats.merge(StatKey.WAIT_CPU_TIME_MS, now - _lastArriveTime); + _lastArriveTime = now; + _stats.merge(StatKey.DESERIALIZED_BYTES, byteBuffer.remaining()); + _stats.merge(StatKey.DESERIALIZED_MESSAGES, 1); + + now = System.currentTimeMillis(); + DataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer); + _stats.merge(StatKey.DESERIALIZATION_TIME_MS, System.currentTimeMillis() - now); + + if (dataBlock instanceof MetadataBlock) { + Map<Integer, String> exceptions = dataBlock.getExceptions(); + if (exceptions.isEmpty()) { + block = TransferableBlockUtils.wrap(dataBlock); + } else { + setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(exceptions)); + return ReceivingMailboxStatus.FIRST_ERROR; Review Comment: Also seems this message doesn't apply to in-memory mailbox, some comments would help explain it ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java: ########## @@ -0,0 +1,668 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.avro.util.ByteBufferInputStream; +import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream; +import org.apache.pinot.common.datatable.StatMap; +import org.apache.pinot.query.runtime.operator.BaseMailboxReceiveOperator; +import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator; +import org.apache.pinot.query.runtime.operator.LiteralValueOperator; +import org.apache.pinot.query.runtime.operator.MailboxSendOperator; +import org.apache.pinot.query.runtime.operator.MultiStageOperator; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The stats of a given query. + * <p> + * For the same query, multiple instances of this class may exist. Each of them will have a partial view of the stats. + * Specifically, while the query is being executed, each operator will return its own partial view of the stats when + * EOS block is sent. + * <p> + * Simple operations with a single upstream, like filters or transforms, would just add their own information to the + * stats. More complex operations, like joins or receiving mailboxes, will merge the stats from all their upstreams and + * add their own stats. + * <p> + * The complete stats for the query are obtained in the execution root (usually the broker) by merging the partial + * views. + * <p> + * In order to reduce allocation, this class is mutable. Some operators may create their own stats, but most of them + * will receive a stats object from the upstream operator and modify it by adding their own stats and sometimes merging + * them with other upstream stats. + */ +public class MultiStageQueryStats { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageQueryStats.class); + private final int _currentStageId; + private final StageStats.Open _currentStats; + /** + * Known stats for stages whose id is higher than the current one. + * <p> + * A stage may not know all the stats whose id is higher than the current one, so this list may contain null values. + * It may also grow in size when different merge methods are called. + * <p> + * For example the stats of the left hand side of a join may know stats of stages 3 and 4 and the right side may know + * stats of stages 5. When merging the stats of the join, the stats of stages 5 will be added to this list. + * + * @see #mergeUpstream(List) + * @see #mergeUpstream(MultiStageQueryStats) + * @see #mergeInOrder(MultiStageQueryStats, MultiStageOperator.Type, StatMap) + */ + private final ArrayList<StageStats.Closed> _closedStats; + private static final MultiStageOperator.Type[] ALL_TYPES = MultiStageOperator.Type.values(); + + private MultiStageQueryStats(int stageId) { + _currentStageId = stageId; + _currentStats = new StageStats.Open(); + _closedStats = new ArrayList<>(); + } + + private static MultiStageQueryStats create(int stageId, MultiStageOperator.Type type, @Nullable StatMap<?> opStats) { + MultiStageQueryStats multiStageQueryStats = new MultiStageQueryStats(stageId); + multiStageQueryStats.getCurrentStats().addLastOperator(type, opStats); + return multiStageQueryStats; + } + + public static MultiStageQueryStats emptyStats(int stageId) { + return new MultiStageQueryStats(stageId); + } + + public static MultiStageQueryStats createLeaf(int stageId, + StatMap<LeafStageTransferableBlockOperator.StatKey> opStats) { + return create(stageId, MultiStageOperator.Type.LEAF, opStats); + } + + public static MultiStageQueryStats createLiteral(int stageId, StatMap<LiteralValueOperator.StatKey> statMap) { + return create(stageId, MultiStageOperator.Type.LITERAL, statMap); + } + + public static MultiStageQueryStats createCancelledSend(int stageId, + StatMap<MailboxSendOperator.StatKey> statMap) { + return create(stageId, MultiStageOperator.Type.MAILBOX_SEND, statMap); + } + + public static MultiStageQueryStats createReceive(int stageId, StatMap<BaseMailboxReceiveOperator.StatKey> stats) { + return create(stageId, MultiStageOperator.Type.MAILBOX_RECEIVE, stats); + } + + public int getCurrentStageId() { + return _currentStageId; + } + + /** + * Serialize the current stats in a way it is compatible with {@link #mergeUpstream(List)}. + * <p> + * The serialized stats are returned in a list where the index is the stage id. Stages downstream or not related to + * the current one will be null. + */ + public List<ByteBuffer> serialize() + throws IOException { + + ArrayList<ByteBuffer> serializedStats = new ArrayList<>(getMaxStageId()); + for (int i = 0; i < _currentStageId; i++) { + serializedStats.add(null); + } + + try (UnsynchronizedByteArrayOutputStream baos = new UnsynchronizedByteArrayOutputStream.Builder().get(); + DataOutputStream output = new DataOutputStream(baos)) { + + _currentStats.serialize(output); + ByteBuffer currentBuf = ByteBuffer.wrap(baos.toByteArray()); + + serializedStats.add(currentBuf); + + for (StageStats.Closed closedStats : _closedStats) { + if (closedStats == null) { + serializedStats.add(null); + continue; + } + baos.reset(); + closedStats.serialize(output); + ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); + serializedStats.add(buf); + } + } + Preconditions.checkState(serializedStats.size() == getMaxStageId() + 1, + "Serialized stats size is different from expected size. Expected %s, got %s", + getMaxStageId() + 1, serializedStats.size()); + return serializedStats; + } + + public StageStats.Open getCurrentStats() { + return _currentStats; + } + + /** + * Returns the higher stage id known by this object. + */ + public int getMaxStageId() { + return _currentStageId + _closedStats.size(); + } + + /** + * Get the stats of a stage whose id is higher than the current one. + * <p> + * This method returns null in case the stage id is unknown by this stage or no stats are stored for it. + */ + @Nullable + public StageStats.Closed getUpstreamStageStats(int stageId) { + if (stageId <= _currentStageId) { + throw new IllegalArgumentException("Stage " + stageId + " cannot be upstream of current stage " + + _currentStageId); + } + + int index = stageId - _currentStageId - 1; + if (index >= _closedStats.size()) { + return null; + } + return _closedStats.get(index); + } + + public void mergeInOrder(MultiStageQueryStats otherStats, MultiStageOperator.Type type, + StatMap<?> statMap) { + Preconditions.checkArgument(_currentStageId == otherStats._currentStageId, + "Cannot merge stats from different stages (%s and %s)", _currentStageId, otherStats._currentStageId); + mergeUpstream(otherStats); + StageStats.Open currentStats = getCurrentStats(); + currentStats.concat(otherStats.getCurrentStats()); + currentStats.addLastOperator(type, statMap); + } + + private void growUpToStage(int stageId) { + _closedStats.ensureCapacity(stageId - _currentStageId); + while (getMaxStageId() < stageId) { + _closedStats.add(null); + } + } + + /** + * Merge upstream stats from another MultiStageQueryStats object into this one. + * <p> + * Only the stages whose id is higher than the current one are merged. The reason to do so is that upstream stats + * should be already closed while current stage may need some extra tuning. + * <p> + * For example set operations may need to merge the stats from all its upstreams before concatenating stats of the + * current stage. + */ + public void mergeUpstream(MultiStageQueryStats otherStats) { + Preconditions.checkArgument(_currentStageId <= otherStats._currentStageId, + "Cannot merge stats from early stage %s into stats of later stage %s", + otherStats._currentStageId, _currentStageId); + + growUpToStage(otherStats.getMaxStageId()); + + int currentDiff = otherStats._currentStageId - _currentStageId; + if (currentDiff > 0) { + StageStats.Closed close = otherStats._currentStats.close(); + int selfIdx = currentDiff - 1; + StageStats.Closed myStats = _closedStats.get(selfIdx); + if (myStats == null) { + _closedStats.set(selfIdx, close); + } else { + myStats.merge(close); + } + } + + for (int i = 0; i < otherStats._closedStats.size(); i++) { + StageStats.Closed otherStatsForStage = otherStats._closedStats.get(i); + if (otherStatsForStage == null) { + continue; + } + int selfIdx = i + currentDiff; + StageStats.Closed myStats = _closedStats.get(selfIdx); + try { + if (myStats == null) { + _closedStats.set(selfIdx, otherStatsForStage); + assert getUpstreamStageStats(i + otherStats._currentStageId + 1) == otherStatsForStage; + } else { + myStats.merge(otherStatsForStage); + } + } catch (IllegalArgumentException | IllegalStateException ex) { + LOGGER.warn("Error merging stats on stage " + i + ". Ignoring the new stats", ex); + } + } + } + + public void mergeUpstream(List<ByteBuffer> otherStats) { + for (int i = 0; i <= _currentStageId && i < otherStats.size(); i++) { + if (otherStats.get(i) != null) { + throw new IllegalArgumentException("Cannot merge stats from early stage " + i + " into stats of " + + "later stage " + _currentStageId); + } + } + growUpToStage(otherStats.size() - 1); + + for (int i = _currentStageId + 1; i < otherStats.size(); i++) { + ByteBuffer otherBuf = otherStats.get(i); + if (otherBuf != null) { + StageStats.Closed myStats = getUpstreamStageStats(i); + try (InputStream is = new ByteBufferInputStream(Collections.singletonList(otherBuf)); + DataInputStream dis = new DataInputStream(is)) { + if (myStats == null) { + StageStats.Closed deserialized = StageStats.Closed.deserialize(dis); + _closedStats.set(i - _currentStageId - 1, deserialized); + assert getUpstreamStageStats(i) == deserialized; + } else { + myStats.merge(dis); + } + } catch (IOException ex) { + boolean assertOn = false; + // *assigns* true if assertions are on. + //CHECKSTYLE:OFF + assert assertOn = true; + if (assertOn) { + throw new UncheckedIOException("Error deserializing stats on stage " + i, ex); + } + //CHECKSTYLE:ON Review Comment: I feel it is safer to have consistent behavior in test and production. In the test we can check for the stats value to verify it is correctly handled. I don't have strong opinion, so either way is fine -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org