gortiz commented on code in PR #15245: URL: https://github.com/apache/pinot/pull/15245#discussion_r2024217488
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java: ########## @@ -18,20 +18,80 @@ */ package org.apache.pinot.query.runtime.blocks; +import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; +import org.apache.pinot.common.datablock.DataBlock; +import org.apache.pinot.common.utils.DataSchema; /** - * Interface for splitting transferable blocks. This is used for ensuring + * Interface for splitting MSE blocks. This is used for ensuring * that the blocks that are sent along the wire play nicely with the * underlying transport. */ public interface BlockSplitter { BlockSplitter NO_OP = (block, maxBlockSize) -> Iterators.singletonIterator(block); + BlockSplitter DEFAULT = new Default(); /** * @return a list of blocks that was split from the original {@code block} */ - Iterator<TransferableBlock> split(TransferableBlock block, int maxBlockSize); + Iterator<? extends MseBlock.Data> split(MseBlock.Data block, int maxBlockSize); + + /** + * Split a block into multiple block so that each block size is within maxBlockSize. Currently, + * <ul> + * <li>For row data block, we split for row type dataBlock.</li> + * <li>For columnar data block, exceptions are thrown.</li> + * <li>For metadata block, split is not supported.</li> + * </ul> + * + * @return a list of data block chunks + */ + class Default implements BlockSplitter, MseBlock.Data.Visitor<Iterator<MseBlock.Data>, Integer> { + private static final int MEDIAN_COLUMN_SIZE_BYTES = 8; + + private Default() { + } + + @Override + public Iterator<MseBlock.Data> split(MseBlock.Data block, int maxBlockSize) { + return block.accept(this, maxBlockSize); + } + + @Override + public Iterator<MseBlock.Data> visit(RowHeapDataBlock block, Integer maxBlockSize) { + // Use estimated row size, this estimate is not accurate and is used to estimate numRowsPerChunk only. + DataSchema dataSchema = block.getDataSchema(); + assert dataSchema != null; + int estimatedRowSizeInBytes = dataSchema.getColumnNames().length * MEDIAN_COLUMN_SIZE_BYTES; + int numRowsPerChunk = maxBlockSize / estimatedRowSizeInBytes; + Preconditions.checkState(numRowsPerChunk > 0, "row size too large for query engine to handle, abort!"); + + List<Object[]> rows = block.getRows(); + int numRows = rows.size(); + int numChunks = (numRows + numRowsPerChunk - 1) / numRowsPerChunk; + if (numChunks == 1) { + return Iterators.singletonIterator(block); + } + List<MseBlock.Data> blockChunks = new ArrayList<>(numChunks); + for (int fromIndex = 0; fromIndex < numRows; fromIndex += numRowsPerChunk) { + int toIndex = Math.min(fromIndex + numRowsPerChunk, numRows); + blockChunks.add(new RowHeapDataBlock(rows.subList(fromIndex, toIndex), dataSchema, block.getAggFunctions())); + } + return blockChunks.iterator(); + } + + @Override + public Iterator<MseBlock.Data> visit(SerializedDataBlock block, Integer maxBlockSize + ) { + if (block.getDataBlock().getDataBlockType() == DataBlock.Type.ROW) { Review Comment: Yes, I didn't find any usage of columnar mode in the current MSE code. If we end up using it in the future we can add them as a new MseBlock.Data implementation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org