albertobastos commented on code in PR #15571:
URL: https://github.com/apache/pinot/pull/15571#discussion_r2055538832


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock)
     return byteString;
   }
 
+  public static List<ByteString> toByteStrings(DataBlock dataBlock, int 
maxBlockSize)
+      throws IOException {
+    List<ByteBuffer> bytes = dataBlock.serialize();
+    if (bytes.isEmpty()) {
+      return List.of(ByteString.EMPTY);

Review Comment:
   Just copy-pasted from the previous version where a `ByteString.EMPTY` was 
returned. But now that we are dealing with list, I guess is ok to return just 
an empty one.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> 
getContentObserver() {
         .open(_statusObserver);
   }
 
-  private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> 
serializedStats)
+  private List<MailboxContent> toMailboxContents(MseBlock block, 
List<DataBuffer> serializedStats)

Review Comment:
   Let's go with the stream approach for once.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock)
     return byteString;
   }
 
+  public static List<ByteString> toByteStrings(DataBlock dataBlock, int 
maxBlockSize)
+      throws IOException {
+    List<ByteBuffer> bytes = dataBlock.serialize();
+    if (bytes.isEmpty()) {
+      return List.of(ByteString.EMPTY);
+    }
+
+    List<ByteString> byteStrings = new ArrayList<>();
+    ByteString current = UnsafeByteOperations.unsafeWrap(bytes.get(0));
+    for (int i = 1; i < bytes.size(); i++) {
+      ByteBuffer bb = bytes.get(i);
+      if (current.size() + bb.remaining() > maxBlockSize) {
+        byteStrings.add(current);
+        current = UnsafeByteOperations.unsafeWrap(bb);
+      } else {
+        current = current.concat(UnsafeByteOperations.unsafeWrap(bb));
+      }
+    }

Review Comment:
   Yeah, we are assuming that the `ByteBuffer`s returned by block serialization 
will never exceed the given maximum. If that happens, then that big buffer will 
be converted to a `ByteString` as-is.
   
   I thought of ways to optimize that but all of them require dealing with 
array copies instead of relying on unsafe wrap operations. I believe that would 
add an undesired overhead so for now I just documented the current behaviour.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock)
     return byteString;
   }
 
+  public static List<ByteString> toByteStrings(DataBlock dataBlock, int 
maxBlockSize)
+      throws IOException {
+    List<ByteBuffer> bytes = dataBlock.serialize();
+    if (bytes.isEmpty()) {
+      return List.of(ByteString.EMPTY);
+    }
+
+    List<ByteString> byteStrings = new ArrayList<>();

Review Comment:
   Got it.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> 
getContentObserver() {
         .open(_statusObserver);
   }
 
-  private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> 
serializedStats)
+  private List<MailboxContent> toMailboxContents(MseBlock block, 
List<DataBuffer> serializedStats)
       throws IOException {
     _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
     long start = System.currentTimeMillis();
     try {
       DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, 
serializedStats);
-      ByteString byteString = DataBlockUtils.toByteString(dataBlock);
-      int sizeInBytes = byteString.size();
+      // so far we ensure payload is not bigger than maxBlockSize/2, we can 
fine tune this later
+      List<ByteString> byteStrings = DataBlockUtils.toByteStrings(dataBlock, 
getMaxBlockSize() / 2);
+      int sizeInBytes = byteStrings.stream().map(ByteString::size).reduce(0, 
Integer::sum);

Review Comment:
   Done.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -230,6 +230,28 @@ public static ByteString toByteString(DataBlock dataBlock)
     return byteString;
   }
 
+  public static List<ByteString> toByteStrings(DataBlock dataBlock, int 
maxBlockSize)

Review Comment:
   Yeah, I see that. Renamed to `maxByteStringSize` and added some simple doc.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -157,19 +167,28 @@ private StreamObserver<MailboxContent> 
getContentObserver() {
         .open(_statusObserver);
   }
 
-  private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> 
serializedStats)
+  private List<MailboxContent> toMailboxContents(MseBlock block, 
List<DataBuffer> serializedStats)
       throws IOException {
     _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
     long start = System.currentTimeMillis();
     try {
       DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, 
serializedStats);
-      ByteString byteString = DataBlockUtils.toByteString(dataBlock);
-      int sizeInBytes = byteString.size();
+      // so far we ensure payload is not bigger than maxBlockSize/2, we can 
fine tune this later
+      List<ByteString> byteStrings = DataBlockUtils.toByteStrings(dataBlock, 
getMaxBlockSize() / 2);

Review Comment:
   You're right. Moved to the mailbox initialization.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -58,10 +61,18 @@ public void onNext(MailboxContent mailboxContent) {
     if (_mailbox == null) {
       _mailbox = _mailboxService.getReceivingMailbox(mailboxId);
     }
+    if (_mailboxBuffers == null) {
+      _mailboxBuffers = new ArrayList<>();
+    }

Review Comment:
   Just avoid allocating the array until the observer is actually used, but I 
guess we will rarely create an observer and not use it. Changed it to eager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to