gortiz commented on code in PR #15571: URL: https://github.com/apache/pinot/pull/15571#discussion_r2052080756
########## pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java: ########## @@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock) return byteString; } + public static List<ByteString> toByteStrings(DataBlock dataBlock, int maxBlockSize) + throws IOException { + List<ByteBuffer> bytes = dataBlock.serialize(); + if (bytes.isEmpty()) { + return List.of(ByteString.EMPTY); + } + + List<ByteString> byteStrings = new ArrayList<>(); + ByteString current = UnsafeByteOperations.unsafeWrap(bytes.get(0)); + for (int i = 1; i < bytes.size(); i++) { + ByteBuffer bb = bytes.get(i); + if (current.size() + bb.remaining() > maxBlockSize) { + byteStrings.add(current); + current = UnsafeByteOperations.unsafeWrap(bb); + } else { + current = current.concat(UnsafeByteOperations.unsafeWrap(bb)); + } + } Review Comment: Correct me if I'm wrong, but here we are assuming each page in `bytes` is always going to be smaller than maxBlockSize, right? Are we sure that it will always be the case? Can we prepare the code for that situation? In case we don't we need to document that properly. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> getContentObserver() { .open(_statusObserver); } - private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> serializedStats) + private List<MailboxContent> toMailboxContents(MseBlock block, List<DataBuffer> serializedStats) throws IOException { _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1); long start = System.currentTimeMillis(); try { DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, serializedStats); - ByteString byteString = DataBlockUtils.toByteString(dataBlock); - int sizeInBytes = byteString.size(); + // so far we ensure payload is not bigger than maxBlockSize/2, we can fine tune this later + List<ByteString> byteStrings = DataBlockUtils.toByteStrings(dataBlock, getMaxBlockSize() / 2); Review Comment: `getMaxBlockSize()` won't change once the app is started, right? Do we need to read configs and parse strings to int once per block sent? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> getContentObserver() { .open(_statusObserver); } - private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> serializedStats) + private List<MailboxContent> toMailboxContents(MseBlock block, List<DataBuffer> serializedStats) throws IOException { _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1); long start = System.currentTimeMillis(); try { DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, serializedStats); - ByteString byteString = DataBlockUtils.toByteString(dataBlock); - int sizeInBytes = byteString.size(); + // so far we ensure payload is not bigger than maxBlockSize/2, we can fine tune this later + List<ByteString> byteStrings = DataBlockUtils.toByteStrings(dataBlock, getMaxBlockSize() / 2); + int sizeInBytes = byteStrings.stream().map(ByteString::size).reduce(0, Integer::sum); Review Comment: Use `mapToInt` instead of map here. It may be more efficient. ########## pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java: ########## @@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock) return byteString; } + public static List<ByteString> toByteStrings(DataBlock dataBlock, int maxBlockSize) Review Comment: I think this method really needs some javadoc. Also `maxBlockSize` may be confusing. AFAIK is the max size of each element in the returned list, but given this method receives blocks, callers may think this parameter is the max size of the first parameter. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> getContentObserver() { .open(_statusObserver); } - private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> serializedStats) + private List<MailboxContent> toMailboxContents(MseBlock block, List<DataBuffer> serializedStats) Review Comment: nit: you can even return an iterable/stream here, so we don't need to allocate the ArrayList to store `contents` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java: ########## @@ -58,10 +61,18 @@ public void onNext(MailboxContent mailboxContent) { if (_mailbox == null) { _mailbox = _mailboxService.getReceivingMailbox(mailboxId); } + if (_mailboxBuffers == null) { + _mailboxBuffers = new ArrayList<>(); + } Review Comment: nit: What is the advantage of this lazy initialization instead of doing it eagerly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org