yashmayya commented on code in PR #15245:
URL: https://github.com/apache/pinot/pull/15245#discussion_r2016415044


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowHeapDataBlock.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+
+/// A block that contains data in row heap format.
+///
+/// This class is a subclass of [MseBlock.Data] and is used to store data in 
row heap format.
+/// This is probably the less efficient way to store data, but it is also the 
easiest to work with.
+/// At the time of writing, this class is used almost every time we need to 
read or create data blocks.
+/// The only place where this class is not used is when we need to shuffle 
data through the network, in which case
+/// we use [SerializedDataBlock].
+public class RowHeapDataBlock implements MseBlock, MseBlock.Data {

Review Comment:
   Doesn't `MseBlock.Data` itself extend `MseBlock`?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowHeapDataBlock.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+
+/// A block that contains data in row heap format.

Review Comment:
   I think it might be useful to add a sentence or two on what "row heap" means 
here for more context.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MseBlock.java:
##########
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Block;
+
+
+/// Blocks used by 
[MultiStageOperator][org.apache.pinot.query.runtime.operator.MultiStageOperator]
 to share
+/// information.
+///
+/// Blocks always go from upstream (the children of the operator) to 
downstream (the parent of the operator) and can be
+/// classified in the following categories:
+/// - [Data] blocks: contain data that can be processed by the operator.
+/// - [Eos] blocks: signal the end of a stream. These blocks can be either 
[successful][SuccessMseBlock] or
+/// [error][ErrorMseBlock].
+///
+/// ## The MseBlock API
+/// A MseBlock itself is not very useful, as they have almost no methods.
+/// Instead, they are used as a common sub-interface for [data][Data] and 
[end-of-stream][Eos] blocks,
+/// which are then subclassed to provide the actual functionality.
+/// This pattern follows the principles of Java 17 sealed interfaces and the 
intention is implement them as such once
+/// Pinot source code is migrated to Java 17 or newer, specially in Java 21 
where pattern matching can also be used,
+/// removing the need for the [Visitor] pattern.
+///
+/// Meanwhile, the API force callers to do some castings, but it is a 
trade-off to have a more robust and maintainable
+/// codebase given that we can rely on Java typesystem to verify some 
important properties at compile time instead of
+/// adding runtime checks.
+///
+/// The alternative to this pattern would be to have a single class with all 
methods, adding runtime checks to verify
+/// whether it is acceptable to call a method or not. This is the approach 
that was used in the removed
+/// TransferableBlock class, which was used for all possible block type 
combinations. As a result each method
+/// had to include a runtime check to verify if it was legal to call it given 
some conditions imposed by its attributes.
+/// This approach was error-prone and hard to maintain, as it was easy to 
forget to add a check in a new method or to
+/// know which methods could be called at a given point in time.
+///
+/// ## MseBlock vs DataBlock
+/// MseBlock are conceptually close to 
[DataBlocks][org.apache.pinot.common.datablock.DataBlock].
+/// MseBlocks are sent from one operator to another while DataBlocks are a way 
to codify data. It is important to notice
+/// that MseBlocks don't store stats, while DataBlocks do.
+///
+/// When a MseBlock needs to be sent to another server, it will serialize it 
into a DataBlock. Then, when a DataBlock
+/// is received by another server, it will deserialize it into a MseBlock 
(plus stats if needed). This is done by
+/// [GrpcSendingMailbox][org.apache.pinot.query.mailbox.GrpcSendingMailbox] and
+/// [ReceivingMailbox][org.apache.pinot.query.mailbox.ReceivingMailbox].
+public interface MseBlock extends Block {
+  /// Whether the block is a [Data] block or otherwise an [Eos] block.
+  boolean isData();
+
+  /// Whether the block is an [Eos] block or otherwise a [Data] block.
+  default boolean isEos() {
+    return !isData();
+  }
+
+  /// Whether the block is an [Eos] that signals the end of a stream with no 
errors.
+  boolean isSuccess();
+
+  /// Whether the block is an [Eos] that signals the end of a stream with one 
or more errors.
+  boolean isError();
+
+  <R, A> R accept(Visitor<R, A> visitor, A arg);
+
+  /// A block that contains data.
+  /// These blocks can store data as [rows on heap][RowHeapDataBlock] or as
+  /// [DataBlocks][SerializedDataBlock].
+  interface Data extends MseBlock {
+    /// Returns the number of rows in the block.
+    int getNumRows();
+
+    /// Returns the schema of the data in the block.
+    DataSchema getDataSchema();
+
+    /// Returns the data in the block as a [RowHeapDataBlock].
+    /// This is a no-op if the block is already a [RowHeapDataBlock]} but is 
an CPU and memory intensive operation
+    /// if the block is a [SerializedDataBlock].
+    RowHeapDataBlock asRowHeap();
+    /// Returns the data in the block as a [SerializedDataBlock].
+    /// This is a no-op if the block is already a [SerializedDataBlock] but is 
a CPU and memory intensive operation
+    /// if the block is a [RowHeapDataBlock].
+    /// @throws java.io.UncheckedIOException if the block cannot be serialized.
+    SerializedDataBlock asSerialized();
+
+    /// Returns whether the block is a [RowHeapDataBlock].
+    boolean isRowHeap();
+    /// Returns whether the block is a [SerializedDataBlock].
+    default boolean isSerialized() {
+      return !isRowHeap();
+    }
+
+    @Override
+    default boolean isData() {
+      return true;
+    }
+
+    @Override
+    default boolean isSuccess() {
+      return false;
+    }
+
+    @Override
+    default boolean isError() {
+      return false;
+    }
+
+    <R, A> R accept(Visitor<R, A> visitor, A arg);
+
+    @Override
+    default <R, A> R accept(MseBlock.Visitor<R, A> visitor, A arg) {
+      return accept((MseBlock.Data.Visitor<R, A>) visitor, arg);
+    }

Review Comment:
   I'm not sure I follow the pattern of the two separate visitor types being 
used here and the interchange between them?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/SerializedDataBlock.java:
##########
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.util.DataBlockExtractUtils;
+
+/// A block that contains data in serialized format.
+///
+/// This class is a subclass of [MseBlock.Data] and is used to store data in 
serialized format.
+/// This is the most efficient way to store data, but it is also the hardest 
to work with.
+/// As the day this comment was written, this class is only used when we need 
to shuffle data through the network.
+/// In all other cases, we use [RowHeapDataBlock].
+public class SerializedDataBlock implements MseBlock.Data {
+  private final DataBlock _dataBlock;
+
+  /// Creates a new block with the given data block.
+  /// @param dataBlock The data block to store in this block. It cannot be a 
metadata block.
+  /// @throws IllegalArgumentException If the data block is a metadata block.
+  public SerializedDataBlock(DataBlock dataBlock) {
+    Preconditions.checkArgument(dataBlock.getDataBlockType() != 
DataBlock.Type.METADATA,
+        "SerializedDataBlock cannot be used to decorate metadata block");
+    _dataBlock = dataBlock;
+  }
+
+  /// Returns the data block stored in this block.
+  /// It is guaranteed that the returned data block is not a metadata block.
+  public DataBlock getDataBlock() {
+    return _dataBlock;
+  }
+
+  @Override
+  public int getNumRows() {
+    return _dataBlock.getNumberOfRows();
+  }
+
+  @Override
+  public DataSchema getDataSchema() {
+    return _dataBlock.getDataSchema();
+  }
+
+  @Override
+  public RowHeapDataBlock asRowHeap() {
+    List<Object[]> rows = DataBlockExtractUtils.extractRows(_dataBlock);
+    return new RowHeapDataBlock(rows, _dataBlock.getDataSchema(), null);

Review Comment:
   So the aggregation functions are only used for serializing from row heap 
format to serialized format and we don't need to worry about them here because 
we won't convert from serialized -> row heap -> serialized?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowHeapDataBlock.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+
+/// A block that contains data in row heap format.
+///
+/// This class is a subclass of [MseBlock.Data] and is used to store data in 
row heap format.
+/// This is probably the less efficient way to store data, but it is also the 
easiest to work with.
+/// At the time of writing, this class is used almost every time we need to 
read or create data blocks.
+/// The only place where this class is not used is when we need to shuffle 
data through the network, in which case
+/// we use [SerializedDataBlock].
+public class RowHeapDataBlock implements MseBlock, MseBlock.Data {
+  private final DataSchema _dataSchema;
+  private final List<Object[]> _rows;
+  @Nullable
+  @SuppressWarnings("rawtypes")
+  private final AggregationFunction[] _aggFunctions;
+
+  /// Creates a new block with the given rows and schema.
+  /// @param rows The rows in the block. Once received, the list should not be 
mutated from outside this class.
+  /// @param dataSchema The schema of the data in the block.
+  public RowHeapDataBlock(List<Object[]> rows, DataSchema dataSchema) {
+    this(rows, dataSchema, null);
+  }
+
+  /// Creates a new block with the given rows, schema and aggregation 
functions.
+  /// @param rows The rows in the block. Once received, the list should not be 
mutated from outside this class.
+  /// @param dataSchema The schema of the data in the block.
+  @SuppressWarnings("rawtypes")
+  public RowHeapDataBlock(List<Object[]> rows, DataSchema dataSchema,
+      @Nullable AggregationFunction[] aggFunctions) {
+    _dataSchema = dataSchema;
+    _rows = rows;
+    _aggFunctions = aggFunctions;
+  }
+
+  @Override
+  public int getNumRows() {
+    return _rows.size();
+  }
+
+  @Override
+  public DataSchema getDataSchema() {
+    return _dataSchema;
+  }
+
+  /// Returns the rows in the block.
+  /// The returned list should be considered immutable.
+  public List<Object[]> getRows() {
+    return _rows;
+  }
+
+  @Nullable
+  @SuppressWarnings("rawtypes")
+  public AggregationFunction[] getAggFunctions() {
+    return _aggFunctions;
+  }
+
+  /// Returns this same object.
+  @Override
+  public RowHeapDataBlock asRowHeap() {
+    return this;
+  }
+
+  @Override
+  public SerializedDataBlock asSerialized() {
+    try {
+      return new SerializedDataBlock(DataBlockBuilder.buildFromRows(_rows, 
_dataSchema, _aggFunctions));
+    } catch (IOException ex) {
+      throw new UncheckedIOException(ex);
+    }
+  }
+
+  @Override
+  public boolean isRowHeap() {
+    return true;
+  }
+
+  @Override
+  public <R, A> R accept(Data.Visitor<R, A> visitor, A arg) {
+    return visitor.visit(this, arg);
+  }
+
+  @Override
+  public String toString() {
+    return "{\"type\": \"rowHeap\", \"numRows\": " + getNumRows() + "}";

Review Comment:
   Why does this need to be JSON?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MseBlock.java:
##########
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Block;
+
+
+/// Blocks used by 
[MultiStageOperator][org.apache.pinot.query.runtime.operator.MultiStageOperator]
 to share
+/// information.
+///
+/// Blocks always go from upstream (the children of the operator) to 
downstream (the parent of the operator) and can be
+/// classified in the following categories:
+/// - [Data] blocks: contain data that can be processed by the operator.
+/// - [Eos] blocks: signal the end of a stream. These blocks can be either 
[successful][SuccessMseBlock] or
+/// [error][ErrorMseBlock].
+///
+/// ## The MseBlock API
+/// A MseBlock itself is not very useful, as they have almost no methods.
+/// Instead, they are used as a common sub-interface for [data][Data] and 
[end-of-stream][Eos] blocks,
+/// which are then subclassed to provide the actual functionality.
+/// This pattern follows the principles of Java 17 sealed interfaces and the 
intention is implement them as such once
+/// Pinot source code is migrated to Java 17 or newer, specially in Java 21 
where pattern matching can also be used,
+/// removing the need for the [Visitor] pattern.
+///
+/// Meanwhile, the API force callers to do some castings, but it is a 
trade-off to have a more robust and maintainable
+/// codebase given that we can rely on Java typesystem to verify some 
important properties at compile time instead of
+/// adding runtime checks.
+///
+/// The alternative to this pattern would be to have a single class with all 
methods, adding runtime checks to verify
+/// whether it is acceptable to call a method or not. This is the approach 
that was used in the removed
+/// TransferableBlock class, which was used for all possible block type 
combinations. As a result each method
+/// had to include a runtime check to verify if it was legal to call it given 
some conditions imposed by its attributes.
+/// This approach was error-prone and hard to maintain, as it was easy to 
forget to add a check in a new method or to
+/// know which methods could be called at a given point in time.
+///
+/// ## MseBlock vs DataBlock
+/// MseBlock are conceptually close to 
[DataBlocks][org.apache.pinot.common.datablock.DataBlock].
+/// MseBlocks are sent from one operator to another while DataBlocks are a way 
to codify data. It is important to notice
+/// that MseBlocks don't store stats, while DataBlocks do.
+///
+/// When a MseBlock needs to be sent to another server, it will serialize it 
into a DataBlock. Then, when a DataBlock
+/// is received by another server, it will deserialize it into a MseBlock 
(plus stats if needed). This is done by

Review Comment:
   We're separating stats from `MseBlock` mainly because data blocks should not 
have stats?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java:
##########
@@ -36,11 +38,19 @@ public interface SendingMailbox {
   boolean isLocal();
 
   /**
-   * Sends a block to the receiver. Note that SendingMailbox are required to 
acquire resources lazily in this call, and
-   * they should <b>not</b> acquire any resources when they are created. This 
method should throw if there was an error
-   * sending the data, since that would allow {@link BlockExchange} to exit 
early.
+   * Sends a data block to the receiver. Note that SendingMailbox are required 
to acquire resources lazily in this call,
+   * and they should <b>not</b> acquire any resources when they are created. 
This method should throw if there was a

Review Comment:
   nit: `an error` is the correct usage, not sure why this was changed to `a 
error`?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -275,8 +276,12 @@ public void processQuery(WorkerMetadata workerMetadata, 
StagePlan stagePlan, Map
       for (RoutingInfo routingInfo : routingInfos) {
         try {
           StatMap<MailboxSendOperator.StatKey> statMap = new 
StatMap<>(MailboxSendOperator.StatKey.class);
-          _mailboxService.getSendingMailbox(routingInfo.getHostname(), 
routingInfo.getPort(),
-              routingInfo.getMailboxId(), deadlineMs, 
statMap).send(errorBlock);
+          SendingMailbox sendingMailbox = 
_mailboxService.getSendingMailbox(routingInfo.getHostname(),
+                routingInfo.getPort(), routingInfo.getMailboxId(), deadlineMs, 
statMap);
+            // TODO: Here we are breaking the stats invariants, sending errors 
without including the stats of the

Review Comment:
   Could we include here _why_ we are doing this and how the stats system 
handles pipeline breakers?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -68,9 +78,20 @@ public boolean isLocal() {
   }
 
   @Override
-  public void send(TransferableBlock block)
+  public void send(MseBlock.Data data)
+      throws IOException, TimeoutException {
+    sendPrivate(data, List.of());
+  }
+
+  @Override
+  public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
+      throws IOException, TimeoutException {
+    sendPrivate(block, serializedStats);
+  }
+
+  private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)

Review Comment:
   nit: I think we usually use `Internal` suffix instead of `Private` for such 
methods?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java:
##########
@@ -18,20 +18,80 @@
  */
 package org.apache.pinot.query.runtime.blocks;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
 
 
 /**
- * Interface for splitting transferable blocks. This is used for ensuring
+ * Interface for splitting MSE blocks. This is used for ensuring
  * that the blocks that are sent along the wire play nicely with the
  * underlying transport.
  */
 public interface BlockSplitter {
   BlockSplitter NO_OP = (block, maxBlockSize) -> 
Iterators.singletonIterator(block);
+  BlockSplitter DEFAULT = new Default();
 
   /**
    * @return a list of blocks that was split from the original {@code block}
    */
-  Iterator<TransferableBlock> split(TransferableBlock block, int maxBlockSize);
+  Iterator<? extends MseBlock.Data> split(MseBlock.Data block, int 
maxBlockSize);
+
+  /**
+   *  Split a block into multiple block so that each block size is within 
maxBlockSize. Currently,
+   *  <ul>
+   *    <li>For row data block, we split for row type dataBlock.</li>
+   *    <li>For columnar data block, exceptions are thrown.</li>
+   *    <li>For metadata block, split is not supported.</li>
+   *  </ul>
+   *
+   * @return a list of data block chunks
+   */
+  class Default implements BlockSplitter, 
MseBlock.Data.Visitor<Iterator<MseBlock.Data>, Integer> {
+    private static final int MEDIAN_COLUMN_SIZE_BYTES = 8;
+
+    private Default() {
+    }
+
+    @Override
+    public Iterator<MseBlock.Data> split(MseBlock.Data block, int 
maxBlockSize) {
+      return block.accept(this, maxBlockSize);
+    }
+
+    @Override
+    public Iterator<MseBlock.Data> visit(RowHeapDataBlock block, Integer 
maxBlockSize) {
+      // Use estimated row size, this estimate is not accurate and is used to 
estimate numRowsPerChunk only.
+      DataSchema dataSchema = block.getDataSchema();
+      assert dataSchema != null;
+      int estimatedRowSizeInBytes = dataSchema.getColumnNames().length * 
MEDIAN_COLUMN_SIZE_BYTES;
+      int numRowsPerChunk = maxBlockSize / estimatedRowSizeInBytes;
+      Preconditions.checkState(numRowsPerChunk > 0, "row size too large for 
query engine to handle, abort!");
+
+      List<Object[]> rows = block.getRows();
+      int numRows = rows.size();
+      int numChunks = (numRows + numRowsPerChunk - 1) / numRowsPerChunk;
+      if (numChunks == 1) {
+        return Iterators.singletonIterator(block);
+      }
+      List<MseBlock.Data> blockChunks = new ArrayList<>(numChunks);
+      for (int fromIndex = 0; fromIndex < numRows; fromIndex += 
numRowsPerChunk) {
+        int toIndex = Math.min(fromIndex + numRowsPerChunk, numRows);
+        blockChunks.add(new RowHeapDataBlock(rows.subList(fromIndex, toIndex), 
dataSchema, block.getAggFunctions()));
+      }
+      return blockChunks.iterator();
+    }
+
+    @Override
+    public Iterator<MseBlock.Data> visit(SerializedDataBlock block, Integer 
maxBlockSize
+    ) {
+      if (block.getDataBlock().getDataBlockType() == DataBlock.Type.ROW) {

Review Comment:
   Are columnar blocks essentially irrelevant?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java:
##########
@@ -122,24 +123,37 @@ protected void earlyTerminate() {
     }
   }
 
+  @Override
+  public abstract List<MultiStageOperator> getChildOperators();
+
   /**
-   * Adds the current operator stats as the last operator in the open stats of 
the given holder.
+   * Calculates and returns the stats for the operator.
    *
-   * It is assumed that:
-   * <ol>
-   *   <li>The current stage of the holder is equal to the stage id of this 
operator.</li>
-   *   <li>The holder already contains the stats of the previous operators of 
the same stage in inorder</li>
-   * </ol>
+   * Each time this method is called, a new instance of the stats is created. 
This is because the stats are mutable and
+   * can be updated by the operator or the caller after the stats are returned.
    */
-  protected void addStats(MultiStageQueryStats holder, StatMap<?> statMap) {
-    Preconditions.checkArgument(holder.getCurrentStageId() == 
_context.getStageId(),
+  public final MultiStageQueryStats calculateStats() {
+    MultiStageQueryStats upstreamStats = calculateUpstreamStats();
+
+    Preconditions.checkArgument(upstreamStats.getCurrentStageId() == 
_context.getStageId(),
         "The holder's stage id should be the same as the current operator's 
stage id. Expected %s, got %s",
-        _context.getStageId(), holder.getCurrentStageId());
-    holder.getCurrentStats().addLastOperator(getOperatorType(), statMap);
+        _context.getStageId(), upstreamStats.getCurrentStageId());
+    upstreamStats.getCurrentStats().addLastOperator(getOperatorType(), 
copyStatMaps());
+    return upstreamStats;
   }
 
-  @Override
-  public abstract List<MultiStageOperator> getChildOperators();
+  protected MultiStageQueryStats calculateUpstreamStats() {

Review Comment:
   I think a comment explaining the merging logic here might be useful



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowHeapDataBlock.java:
##########
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+
+/// A block that contains data in row heap format.
+///
+/// This class is a subclass of [MseBlock.Data] and is used to store data in 
row heap format.
+/// This is probably the less efficient way to store data, but it is also the 
easiest to work with.
+/// As the day this comment was written, this class is used almost everytime 
we need to read or create data blocks.
+/// The only place where this class is not used is when we need to shuffle 
data through the network, in which case
+/// we use [SerializedDataBlock].
+public class RowHeapDataBlock implements MseBlock, MseBlock.Data {
+  private final DataSchema _dataSchema;
+  private final List<Object[]> _rows;
+  @Nullable
+  @SuppressWarnings("rawtypes")
+  private final AggregationFunction[] _aggFunctions;
+

Review Comment:
   Yeah +1, the presence of these aggregation functions here is pretty 
confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to