albertobastos commented on code in PR #15571:
URL: https://github.com/apache/pinot/pull/15571#discussion_r2071398844


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -156,19 +176,30 @@ private StreamObserver<MailboxContent> 
getContentObserver() {
         .open(_statusObserver);
   }
 
-  private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> 
serializedStats)
+  private void splitAndSend(MseBlock block, List<DataBuffer> serializedStats)
       throws IOException {
     _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
     long start = System.currentTimeMillis();
     try {
       DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, 
serializedStats);
-      ByteString byteString = DataBlockUtils.toByteString(dataBlock);
-      int sizeInBytes = byteString.size();
+      List<ByteString> byteStrings;
+      if (_splitBlocks) {

Review Comment:
   I like that. Will create a couple of `SplitSender` and `NonSplitSender` 
classes to minimize checks during runtime. Won't worry too much about avoid 
redundant code because this is a temporary flag during cluster upgrading.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -156,19 +176,30 @@ private StreamObserver<MailboxContent> 
getContentObserver() {
         .open(_statusObserver);
   }
 
-  private MailboxContent toMailboxContent(MseBlock block, List<DataBuffer> 
serializedStats)
+  private void splitAndSend(MseBlock block, List<DataBuffer> serializedStats)
       throws IOException {
     _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
     long start = System.currentTimeMillis();
     try {
       DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, 
serializedStats);
-      ByteString byteString = DataBlockUtils.toByteString(dataBlock);
-      int sizeInBytes = byteString.size();
+      List<ByteString> byteStrings;
+      if (_splitBlocks) {
+        byteStrings = toByteStrings(dataBlock, _maxByteStringSize);
+      } else {
+        byteStrings = List.of(DataBlockUtils.toByteString(dataBlock));
+      }
+      int sizeInBytes = 
byteStrings.stream().mapToInt(ByteString::size).reduce(0, Integer::sum);

Review Comment:
   I'm always in the fence using it, any call to `.stream()` makes my frown. 
Will change it to a classic loop.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -52,24 +59,36 @@
 public class GrpcSendingMailbox implements SendingMailbox {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcSendingMailbox.class);
 
+  private static final List<ByteString> EMPTY_BYTEBUFFER_LIST = 
Collections.emptyList();
   private final String _id;
   private final ChannelManager _channelManager;
   private final String _hostname;
   private final int _port;
   private final long _deadlineMs;
   private final StatMap<MailboxSendOperator.StatKey> _statMap;
   private final MailboxStatusObserver _statusObserver = new 
MailboxStatusObserver();
+  private final boolean _splitBlocks;
+  private final int _maxByteStringSize;
 
   private StreamObserver<MailboxContent> _contentObserver;
 
-  public GrpcSendingMailbox(String id, ChannelManager channelManager, String 
hostname, int port, long deadlineMs,
+  public GrpcSendingMailbox(
+      PinotConfiguration config, String id, ChannelManager channelManager, 
String hostname, int port, long deadlineMs,
       StatMap<MailboxSendOperator.StatKey> statMap) {
     _id = id;
     _channelManager = channelManager;
     _hostname = hostname;
     _port = port;
     _deadlineMs = deadlineMs;
     _statMap = statMap;
+    _splitBlocks = config.getProperty(

Review Comment:
   Thanks, will do.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -47,11 +48,13 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
   private final MailboxService _mailboxService;
   private final StreamObserver<MailboxStatus> _responseObserver;
 
+  private final List<ByteBuffer> _mailboxBuffers;
   private transient ReceivingMailbox _mailbox;
 
   public MailboxContentObserver(MailboxService mailboxService, 
StreamObserver<MailboxStatus> responseObserver) {
     _mailboxService = mailboxService;
     _responseObserver = responseObserver;
+    _mailboxBuffers = new ArrayList<>();

Review Comment:
   As we discussed offline, Mailbox Observer is not shared among concurrent 
queries and therefore is safe to just stack blocks until a block with 
`waitForMore=false` is received.
   
   Anyway, we will add an integration test with a small buffer and concurrent 
queries to verify that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to