gortiz commented on code in PR #15245: URL: https://github.com/apache/pinot/pull/15245#discussion_r1989329010
########## pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java: ########## @@ -61,6 +61,13 @@ public StatMap(Class<K> keyClass) { _map = Collections.synchronizedMap(new EnumMap<>(keyClass)); } + /// A copy constructor for the class. Review Comment: By the way, I'm using https://openjdk.org/jeps/467 here. Intellij already supports it even when compiling with Java 11, and it is shorter and easier to read in general. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java: ########## @@ -481,47 +483,32 @@ void addResultsBlock(BaseResultsBlock resultsBlock) } } - // TODO: Revisit the stats aggregation logic - private void aggregateExecutionStats(Map<String, String> stats1, Map<String, String> stats2) { - for (Map.Entry<String, String> entry : stats2.entrySet()) { - String k2 = entry.getKey(); - String v2 = entry.getValue(); - stats1.merge(k2, v2, (val1, val2) -> { - try { - return Long.toString(Long.parseLong(val1) + Long.parseLong(val2)); - } catch (Exception e) { - return val1 + "\n" + val2; - } - }); - } - } - Review Comment: this method wasn't called at all ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MseBlock.java: ########## @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Block; + + +/// Blocks used by [MultiStageOperator][org.apache.pinot.query.runtime.operator.MultiStageOperator] to share +/// information. +/// +/// Blocks always go from upstream (the children of the operator) to downstream (the parent of the operator) and can be +/// classified in the following categories: +/// - [Data] blocks: contain data that can be processed by the operator. +/// - [Eos] blocks: signal the end of a stream. These blocks can be either [successful][SuccessMseBlock] or +/// [error][ErrorMseBlock]. +/// +/// ## The MseBlock API +/// A MseBlock itself is not very useful, as they have almost no methods. +/// Instead, they are used as a common sub-interface for [data][Data] and [end-of-stream][Eos] blocks, +/// which are then subclassed to provide the actual functionality. +/// This pattern follows the principles of Java 17 sealed interfaces and the intention is implement them as such once +/// Pinot source code is migrated to Java 17 or newer, specially in Java 21 where pattern matching can also be used, +/// removing the need for the [Visitor] pattern. +/// +/// Meanwhile, the API force callers to do some castings, but it is a trade-off to have a more robust and maintainable +/// codebase given that we can relay on Java typesystem to verify some important properties at compile time instead of +/// adding runtime checks. +/// Review Comment: Done ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java: ########## @@ -94,31 +94,33 @@ public String toExplainString() { } @Override - protected TransferableBlock getNextBlock() { + protected MseBlock getNextBlock() { // Keep reading the input blocks until we find a match row or all blocks are processed. // TODO: Consider batching the rows to improve performance. while (true) { - TransferableBlock block = _input.nextBlock(); - if (block.isErrorBlock()) { + MseBlock block = _input.nextBlock(); Review Comment: What do you mean by _this_? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java: ########## @@ -244,10 +245,10 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map // Send error block to all the receivers if pipeline breaker fails if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) { - TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock(); + ErrorMseBlock errorBlock = pipelineBreakerResult.getErrorBlock(); int stageId = stageMetadata.getStageId(); LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId, Review Comment: Yes and no. The older TransferableBlocks overrides toString. I've modifies MseBlocks to do the same. Anyway, the error is pretty much the same, given the default toString includes the class name and that is mostly what we want to log. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java: ########## @@ -176,16 +174,25 @@ protected TransferableBlock getNextBlock() // Terminate when receiving exception block Map<Integer, String> exceptions = _exceptions; if (exceptions != null) { Review Comment: I've done a different thing. I've created that static method in QueryErrorCode. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowHeapDataBlock.java: ########## @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.datablock.DataBlockBuilder; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; + +/// A block that contains data in row heap format. +/// +/// This class is a subclass of [MseBlock.Data] and is used to store data in row heap format. +/// This is probably the less efficient way to store data, but it is also the easiest to work with. +/// As the day this comment was written, this class is used almost everytime we need to read or create data blocks. +/// The only place where this class is not used is when we need to shuffle data through the network, in which case +/// we use [SerializedDataBlock]. +public class RowHeapDataBlock implements MseBlock, MseBlock.Data { + private final DataSchema _dataSchema; + private final List<Object[]> _rows; + @Nullable + @SuppressWarnings("rawtypes") + private final AggregationFunction[] _aggFunctions; + Review Comment: TBH I don't know what they are. To me they look like a break in abstraction. Maybe @Jackie-Jiang knows more about them ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MseBlock.java: ########## @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Block; + + +/// Blocks used by [MultiStageOperator][org.apache.pinot.query.runtime.operator.MultiStageOperator] to share +/// information. +/// +/// Blocks always go from upstream (the children of the operator) to downstream (the parent of the operator) and can be +/// classified in the following categories: +/// - [Data] blocks: contain data that can be processed by the operator. +/// - [Eos] blocks: signal the end of a stream. These blocks can be either [successful][SuccessMseBlock] or +/// [error][ErrorMseBlock]. +/// +/// ## The MseBlock API +/// A MseBlock itself is not very useful, as they have almost no methods. +/// Instead, they are used as a common sub-interface for [data][Data] and [end-of-stream][Eos] blocks, +/// which are then subclassed to provide the actual functionality. +/// This pattern follows the principles of Java 17 sealed interfaces and the intention is implement them as such once +/// Pinot source code is migrated to Java 17 or newer, specially in Java 21 where pattern matching can also be used, +/// removing the need for the [Visitor] pattern. +/// +/// Meanwhile, the API force callers to do some castings, but it is a trade-off to have a more robust and maintainable Review Comment: Done ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowHeapDataBlock.java: ########## @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.datablock.DataBlockBuilder; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; + +/// A block that contains data in row heap format. +/// +/// This class is a subclass of [MseBlock.Data] and is used to store data in row heap format. +/// This is probably the less efficient way to store data, but it is also the easiest to work with. Review Comment: Done ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java: ########## @@ -156,15 +156,15 @@ public Void visitJoin(JoinNode node, ServerPlanRequestContext context) { if (visit(left, context)) { PipelineBreakerResult pipelineBreakerResult = context.getPipelineBreakerResult(); int resultMapId = pipelineBreakerResult.getNodeIdMap().get(right); - List<TransferableBlock> transferableBlocks = - pipelineBreakerResult.getResultMap().getOrDefault(resultMapId, Collections.emptyList()); + List<MseBlock> blocks = pipelineBreakerResult.getResultMap().getOrDefault(resultMapId, Collections.emptyList()); List<Object[]> resultDataContainer = new ArrayList<>(); DataSchema dataSchema = right.getDataSchema(); Review Comment: I think this comment should be a bit below, right? I think you meant: ```java for (TransferableBlock block : transferableBlocks) { if (block.getType() == DataBlock.Type.ROW) { resultDataContainer.addAll(block.getContainer()); } } ``` Which is now: ```java for (MseBlock block : blocks) { if (block.isData()) { resultDataContainer.addAll(((MseBlock.Data) block).asRowHeap().getRows()); } } ``` I don't think we could receive column base blocks there. In fact I didn't see a single place where column blocks were used. Specifically in this case, I hope we weren't receiving any columnar block, otherwise we would just skip it and return an incorrect value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org