Jackie-Jiang commented on code in PR #15571: URL: https://github.com/apache/pinot/pull/15571#discussion_r2069226305
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -156,19 +176,30 @@ private StreamObserver<MailboxContent> getContentObserver() { .open(_statusObserver); } - private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> serializedStats) + private void splitAndSend(MseBlock block, List<DataBuffer> serializedStats) throws IOException { _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1); long start = System.currentTimeMillis(); try { DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, serializedStats); - ByteString byteString = DataBlockUtils.toByteString(dataBlock); - int sizeInBytes = byteString.size(); + List<ByteString> byteStrings; + if (_splitBlocks) { Review Comment: Consider moving this check one level up (into `sendInternal()`) to separate the handling of split vs non-split to reduce overhead of non-split case ########## pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java: ########## @@ -402,7 +403,14 @@ public List<ByteBuffer> serialize() if (_serialized == null) { _serialized = DataBlockUtils.serialize(this); } - return _serialized; + // Return a copy of the serialized data to avoid external modification. + List<ByteBuffer> copy = new ArrayList<>(_serialized.size()); + for (ByteBuffer page: _serialized) { + ByteBuffer pageCopy = page.duplicate(); + pageCopy.order(page.order()); + copy.add(pageCopy); + } + return copy; Review Comment: This is adding overhead to every single data block. Do we need to cache the serialized buffers? Are we ever serializing the same block twice? ########## pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java: ########## @@ -252,7 +252,7 @@ public static DataBlock readFrom(ByteBuffer buffer) public static DataBlock deserialize(List<ByteBuffer> buffers) throws IOException { List<DataBuffer> dataBuffers = buffers.stream() - .map(PinotByteBuffer::wrap) + .map(PinotByteBuffer::slice) Review Comment: This is adding overhead. Can you share which test is failing because of this? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -52,24 +59,36 @@ public class GrpcSendingMailbox implements SendingMailbox { private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class); + private static final List<ByteString> EMPTY_BYTEBUFFER_LIST = Collections.emptyList(); private final String _id; private final ChannelManager _channelManager; private final String _hostname; private final int _port; private final long _deadlineMs; private final StatMap<MailboxSendOperator.StatKey> _statMap; private final MailboxStatusObserver _statusObserver = new MailboxStatusObserver(); + private final boolean _splitBlocks; + private final int _maxByteStringSize; private StreamObserver<MailboxContent> _contentObserver; - public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostname, int port, long deadlineMs, + public GrpcSendingMailbox( + PinotConfiguration config, String id, ChannelManager channelManager, String hostname, int port, long deadlineMs, StatMap<MailboxSendOperator.StatKey> statMap) { _id = id; _channelManager = channelManager; _hostname = hostname; _port = port; _deadlineMs = deadlineMs; _statMap = statMap; + _splitBlocks = config.getProperty( Review Comment: We should move the config parsing logic into `MailboxService` constructor to avoid parsing config on a per mailbox level ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java: ########## @@ -47,11 +48,13 @@ public class MailboxContentObserver implements StreamObserver<MailboxContent> { private final MailboxService _mailboxService; private final StreamObserver<MailboxStatus> _responseObserver; + private final List<ByteBuffer> _mailboxBuffers; private transient ReceivingMailbox _mailbox; public MailboxContentObserver(MailboxService mailboxService, StreamObserver<MailboxStatus> responseObserver) { _mailboxService = mailboxService; _responseObserver = responseObserver; + _mailboxBuffers = new ArrayList<>(); Review Comment: (MAJOR) Is this thread safe? Are we sharing the same observer across multiple queries? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -156,19 +176,30 @@ private StreamObserver<MailboxContent> getContentObserver() { .open(_statusObserver); } - private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> serializedStats) + private void splitAndSend(MseBlock block, List<DataBuffer> serializedStats) throws IOException { _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1); long start = System.currentTimeMillis(); try { DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, serializedStats); - ByteString byteString = DataBlockUtils.toByteString(dataBlock); - int sizeInBytes = byteString.size(); + List<ByteString> byteStrings; + if (_splitBlocks) { + byteStrings = toByteStrings(dataBlock, _maxByteStringSize); + } else { + byteStrings = List.of(DataBlockUtils.toByteString(dataBlock)); + } + int sizeInBytes = byteStrings.stream().mapToInt(ByteString::size).reduce(0, Integer::sum); Review Comment: I'd avoid using functional API at per block level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org