albertobastos commented on code in PR #15571: URL: https://github.com/apache/pinot/pull/15571#discussion_r2071398844
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -156,19 +176,30 @@ private StreamObserver<MailboxContent> getContentObserver() { .open(_statusObserver); } - private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> serializedStats) + private void splitAndSend(MseBlock block, List<DataBuffer> serializedStats) throws IOException { _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1); long start = System.currentTimeMillis(); try { DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, serializedStats); - ByteString byteString = DataBlockUtils.toByteString(dataBlock); - int sizeInBytes = byteString.size(); + List<ByteString> byteStrings; + if (_splitBlocks) { Review Comment: I like that. Will create a couple of `SplitSender` and `NonSplitSender` classes to minimize checks during runtime. Won't worry too much about avoid redundant code because this is a temporary flag during cluster upgrading. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -156,19 +176,30 @@ private StreamObserver<MailboxContent> getContentObserver() { .open(_statusObserver); } - private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> serializedStats) + private void splitAndSend(MseBlock block, List<DataBuffer> serializedStats) throws IOException { _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1); long start = System.currentTimeMillis(); try { DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, serializedStats); - ByteString byteString = DataBlockUtils.toByteString(dataBlock); - int sizeInBytes = byteString.size(); + List<ByteString> byteStrings; + if (_splitBlocks) { + byteStrings = toByteStrings(dataBlock, _maxByteStringSize); + } else { + byteStrings = List.of(DataBlockUtils.toByteString(dataBlock)); + } + int sizeInBytes = byteStrings.stream().mapToInt(ByteString::size).reduce(0, Integer::sum); Review Comment: I'm always in the fence using it, any call to `.stream()` makes my frown. Will change it to a classic loop. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -52,24 +59,36 @@ public class GrpcSendingMailbox implements SendingMailbox { private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class); + private static final List<ByteString> EMPTY_BYTEBUFFER_LIST = Collections.emptyList(); private final String _id; private final ChannelManager _channelManager; private final String _hostname; private final int _port; private final long _deadlineMs; private final StatMap<MailboxSendOperator.StatKey> _statMap; private final MailboxStatusObserver _statusObserver = new MailboxStatusObserver(); + private final boolean _splitBlocks; + private final int _maxByteStringSize; private StreamObserver<MailboxContent> _contentObserver; - public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostname, int port, long deadlineMs, + public GrpcSendingMailbox( + PinotConfiguration config, String id, ChannelManager channelManager, String hostname, int port, long deadlineMs, StatMap<MailboxSendOperator.StatKey> statMap) { _id = id; _channelManager = channelManager; _hostname = hostname; _port = port; _deadlineMs = deadlineMs; _statMap = statMap; + _splitBlocks = config.getProperty( Review Comment: Thanks, will do. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java: ########## @@ -47,11 +48,13 @@ public class MailboxContentObserver implements StreamObserver<MailboxContent> { private final MailboxService _mailboxService; private final StreamObserver<MailboxStatus> _responseObserver; + private final List<ByteBuffer> _mailboxBuffers; private transient ReceivingMailbox _mailbox; public MailboxContentObserver(MailboxService mailboxService, StreamObserver<MailboxStatus> responseObserver) { _mailboxService = mailboxService; _responseObserver = responseObserver; + _mailboxBuffers = new ArrayList<>(); Review Comment: As we discussed offline, Mailbox Observer is not shared among concurrent queries and therefore is safe to just stack blocks until a block with `waitForMore=false` is received. Anyway, we will add an integration test with a small buffer and concurrent queries to verify that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org