gortiz commented on code in PR #15571:
URL: https://github.com/apache/pinot/pull/15571#discussion_r2052080756


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock)
     return byteString;
   }
 
+  public static List<ByteString> toByteStrings(DataBlock dataBlock, int 
maxBlockSize)
+      throws IOException {
+    List<ByteBuffer> bytes = dataBlock.serialize();
+    if (bytes.isEmpty()) {
+      return List.of(ByteString.EMPTY);
+    }
+
+    List<ByteString> byteStrings = new ArrayList<>();
+    ByteString current = UnsafeByteOperations.unsafeWrap(bytes.get(0));
+    for (int i = 1; i < bytes.size(); i++) {
+      ByteBuffer bb = bytes.get(i);
+      if (current.size() + bb.remaining() > maxBlockSize) {
+        byteStrings.add(current);
+        current = UnsafeByteOperations.unsafeWrap(bb);
+      } else {
+        current = current.concat(UnsafeByteOperations.unsafeWrap(bb));
+      }
+    }

Review Comment:
   Correct me if I'm wrong, but here we are assuming each page in `bytes` is 
always going to be smaller than maxBlockSize, right? Are we sure that it will 
always be the case? Can we prepare the code for that situation? In case we 
don't we need to document that properly.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> 
getContentObserver() {
         .open(_statusObserver);
   }
 
-  private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> 
serializedStats)
+  private List<MailboxContent> toMailboxContents(MseBlock block, 
List<DataBuffer> serializedStats)
       throws IOException {
     _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
     long start = System.currentTimeMillis();
     try {
       DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, 
serializedStats);
-      ByteString byteString = DataBlockUtils.toByteString(dataBlock);
-      int sizeInBytes = byteString.size();
+      // so far we ensure payload is not bigger than maxBlockSize/2, we can 
fine tune this later
+      List<ByteString> byteStrings = DataBlockUtils.toByteStrings(dataBlock, 
getMaxBlockSize() / 2);

Review Comment:
   `getMaxBlockSize()` won't change once the app is started, right? Do we need 
to read configs and parse strings to int once per block sent?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> 
getContentObserver() {
         .open(_statusObserver);
   }
 
-  private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> 
serializedStats)
+  private List<MailboxContent> toMailboxContents(MseBlock block, 
List<DataBuffer> serializedStats)
       throws IOException {
     _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
     long start = System.currentTimeMillis();
     try {
       DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, 
serializedStats);
-      ByteString byteString = DataBlockUtils.toByteString(dataBlock);
-      int sizeInBytes = byteString.size();
+      // so far we ensure payload is not bigger than maxBlockSize/2, we can 
fine tune this later
+      List<ByteString> byteStrings = DataBlockUtils.toByteStrings(dataBlock, 
getMaxBlockSize() / 2);
+      int sizeInBytes = byteStrings.stream().map(ByteString::size).reduce(0, 
Integer::sum);

Review Comment:
   Use `mapToInt` instead of map here. It may be more efficient.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock)
     return byteString;
   }
 
+  public static List<ByteString> toByteStrings(DataBlock dataBlock, int 
maxBlockSize)

Review Comment:
   I think this method really needs some javadoc. Also `maxBlockSize` may be 
confusing. AFAIK is the max size of each element in the returned list, but 
given this method receives blocks, callers may think this parameter is the max 
size of the first parameter.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> 
getContentObserver() {
         .open(_statusObserver);
   }
 
-  private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> 
serializedStats)
+  private List<MailboxContent> toMailboxContents(MseBlock block, 
List<DataBuffer> serializedStats)

Review Comment:
   nit: you can even return an iterable/stream here, so we don't need to 
allocate the ArrayList to store `contents`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -58,10 +61,18 @@ public void onNext(MailboxContent mailboxContent) {
     if (_mailbox == null) {
       _mailbox = _mailboxService.getReceivingMailbox(mailboxId);
     }
+    if (_mailboxBuffers == null) {
+      _mailboxBuffers = new ArrayList<>();
+    }

Review Comment:
   nit: What is the advantage of this lazy initialization instead of doing it 
eagerly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to