albertobastos commented on code in PR #15571: URL: https://github.com/apache/pinot/pull/15571#discussion_r2055538832
########## pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java: ########## @@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock) return byteString; } + public static List<ByteString> toByteStrings(DataBlock dataBlock, int maxBlockSize) + throws IOException { + List<ByteBuffer> bytes = dataBlock.serialize(); + if (bytes.isEmpty()) { + return List.of(ByteString.EMPTY); Review Comment: Just copy-pasted from the previous version where a `ByteString.EMPTY` was returned. But now that we are dealing with list, I guess is ok to return just an empty one. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> getContentObserver() { .open(_statusObserver); } - private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> serializedStats) + private List<MailboxContent> toMailboxContents(MseBlock block, List<DataBuffer> serializedStats) Review Comment: Let's go with the stream approach for once. ########## pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java: ########## @@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock) return byteString; } + public static List<ByteString> toByteStrings(DataBlock dataBlock, int maxBlockSize) + throws IOException { + List<ByteBuffer> bytes = dataBlock.serialize(); + if (bytes.isEmpty()) { + return List.of(ByteString.EMPTY); + } + + List<ByteString> byteStrings = new ArrayList<>(); + ByteString current = UnsafeByteOperations.unsafeWrap(bytes.get(0)); + for (int i = 1; i < bytes.size(); i++) { + ByteBuffer bb = bytes.get(i); + if (current.size() + bb.remaining() > maxBlockSize) { + byteStrings.add(current); + current = UnsafeByteOperations.unsafeWrap(bb); + } else { + current = current.concat(UnsafeByteOperations.unsafeWrap(bb)); + } + } Review Comment: Yeah, we are assuming that the `ByteBuffer`s returned by block serialization will never exceed the given maximum. If that happens, then that big buffer will be converted to a `ByteString` as-is. I thought of ways to optimize that but all of them require dealing with array copies instead of relying on unsafe wrap operations. I believe that would add an undesired overhead so for now I just documented the current behaviour. ########## pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java: ########## @@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock) return byteString; } + public static List<ByteString> toByteStrings(DataBlock dataBlock, int maxBlockSize) + throws IOException { + List<ByteBuffer> bytes = dataBlock.serialize(); + if (bytes.isEmpty()) { + return List.of(ByteString.EMPTY); + } + + List<ByteString> byteStrings = new ArrayList<>(); Review Comment: Got it. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> getContentObserver() { .open(_statusObserver); } - private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> serializedStats) + private List<MailboxContent> toMailboxContents(MseBlock block, List<DataBuffer> serializedStats) throws IOException { _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1); long start = System.currentTimeMillis(); try { DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, serializedStats); - ByteString byteString = DataBlockUtils.toByteString(dataBlock); - int sizeInBytes = byteString.size(); + // so far we ensure payload is not bigger than maxBlockSize/2, we can fine tune this later + List<ByteString> byteStrings = DataBlockUtils.toByteStrings(dataBlock, getMaxBlockSize() / 2); + int sizeInBytes = byteStrings.stream().map(ByteString::size).reduce(0, Integer::sum); Review Comment: Done. ########## pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java: ########## @@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock) return byteString; } + public static List<ByteString> toByteStrings(DataBlock dataBlock, int maxBlockSize) Review Comment: Yeah, I see that. Renamed to `maxByteStringSize` and added some simple doc. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> getContentObserver() { .open(_statusObserver); } - private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> serializedStats) + private List<MailboxContent> toMailboxContents(MseBlock block, List<DataBuffer> serializedStats) throws IOException { _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1); long start = System.currentTimeMillis(); try { DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, serializedStats); - ByteString byteString = DataBlockUtils.toByteString(dataBlock); - int sizeInBytes = byteString.size(); + // so far we ensure payload is not bigger than maxBlockSize/2, we can fine tune this later + List<ByteString> byteStrings = DataBlockUtils.toByteStrings(dataBlock, getMaxBlockSize() / 2); Review Comment: You're right. Moved to the mailbox initialization. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java: ########## @@ -58,10 +61,18 @@ public void onNext(MailboxContent mailboxContent) { if (_mailbox == null) { _mailbox = _mailboxService.getReceivingMailbox(mailboxId); } + if (_mailboxBuffers == null) { + _mailboxBuffers = new ArrayList<>(); + } Review Comment: Just avoid allocating the array until the observer is actually used, but I guess we will rarely create an observer and not use it. Changed it to eager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org