gortiz commented on code in PR #15245:
URL: https://github.com/apache/pinot/pull/15245#discussion_r2024217488


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java:
##########
@@ -18,20 +18,80 @@
  */
 package org.apache.pinot.query.runtime.blocks;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
 
 
 /**
- * Interface for splitting transferable blocks. This is used for ensuring
+ * Interface for splitting MSE blocks. This is used for ensuring
  * that the blocks that are sent along the wire play nicely with the
  * underlying transport.
  */
 public interface BlockSplitter {
   BlockSplitter NO_OP = (block, maxBlockSize) -> 
Iterators.singletonIterator(block);
+  BlockSplitter DEFAULT = new Default();
 
   /**
    * @return a list of blocks that was split from the original {@code block}
    */
-  Iterator<TransferableBlock> split(TransferableBlock block, int maxBlockSize);
+  Iterator<? extends MseBlock.Data> split(MseBlock.Data block, int 
maxBlockSize);
+
+  /**
+   *  Split a block into multiple block so that each block size is within 
maxBlockSize. Currently,
+   *  <ul>
+   *    <li>For row data block, we split for row type dataBlock.</li>
+   *    <li>For columnar data block, exceptions are thrown.</li>
+   *    <li>For metadata block, split is not supported.</li>
+   *  </ul>
+   *
+   * @return a list of data block chunks
+   */
+  class Default implements BlockSplitter, 
MseBlock.Data.Visitor<Iterator<MseBlock.Data>, Integer> {
+    private static final int MEDIAN_COLUMN_SIZE_BYTES = 8;
+
+    private Default() {
+    }
+
+    @Override
+    public Iterator<MseBlock.Data> split(MseBlock.Data block, int 
maxBlockSize) {
+      return block.accept(this, maxBlockSize);
+    }
+
+    @Override
+    public Iterator<MseBlock.Data> visit(RowHeapDataBlock block, Integer 
maxBlockSize) {
+      // Use estimated row size, this estimate is not accurate and is used to 
estimate numRowsPerChunk only.
+      DataSchema dataSchema = block.getDataSchema();
+      assert dataSchema != null;
+      int estimatedRowSizeInBytes = dataSchema.getColumnNames().length * 
MEDIAN_COLUMN_SIZE_BYTES;
+      int numRowsPerChunk = maxBlockSize / estimatedRowSizeInBytes;
+      Preconditions.checkState(numRowsPerChunk > 0, "row size too large for 
query engine to handle, abort!");
+
+      List<Object[]> rows = block.getRows();
+      int numRows = rows.size();
+      int numChunks = (numRows + numRowsPerChunk - 1) / numRowsPerChunk;
+      if (numChunks == 1) {
+        return Iterators.singletonIterator(block);
+      }
+      List<MseBlock.Data> blockChunks = new ArrayList<>(numChunks);
+      for (int fromIndex = 0; fromIndex < numRows; fromIndex += 
numRowsPerChunk) {
+        int toIndex = Math.min(fromIndex + numRowsPerChunk, numRows);
+        blockChunks.add(new RowHeapDataBlock(rows.subList(fromIndex, toIndex), 
dataSchema, block.getAggFunctions()));
+      }
+      return blockChunks.iterator();
+    }
+
+    @Override
+    public Iterator<MseBlock.Data> visit(SerializedDataBlock block, Integer 
maxBlockSize
+    ) {
+      if (block.getDataBlock().getDataBlockType() == DataBlock.Type.ROW) {

Review Comment:
   Yes, I didn't find any usage of columnar mode in the current MSE code. If we 
end up using it in the future we can add them as a new MseBlock.Data 
implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to