gortiz commented on code in PR #15694:
URL: https://github.com/apache/pinot/pull/15694#discussion_r2073162113


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -319,4 +277,75 @@ static List<ByteString> toByteStrings(List<ByteBuffer> 
bytes, int maxByteStringS
 
     return result;
   }
+
+  private static abstract class Sender {
+    protected final GrpcSendingMailbox _mailbox;
+
+    protected Sender(GrpcSendingMailbox mailbox) {
+      _mailbox = mailbox;
+    }
+
+    void send(MseBlock block, List<DataBuffer> serializedStats)
+        throws IOException {
+      _mailbox._statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
+      long start = System.currentTimeMillis();
+      try {
+        DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, 
serializedStats);
+        int sizeInBytes = processAndSend(dataBlock);
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Serialized block: {} to {} bytes", block, sizeInBytes);
+        }
+        _mailbox._statMap.merge(MailboxSendOperator.StatKey.SERIALIZED_BYTES, 
sizeInBytes);
+      } catch (Throwable t) {
+        LOGGER.warn("Caught exception while serializing block: {}", block, t);
+        throw t;
+      } finally {
+        
_mailbox._statMap.merge(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS, 
System.currentTimeMillis() - start);
+      }
+    }
+
+    protected abstract int processAndSend(DataBlock dataBlock)
+        throws IOException;
+  }
+
+  private static class SplitSender extends Sender {
+    private final int _maxByteStringSize;
+
+    public SplitSender(GrpcSendingMailbox mailbox, int maxByteStringSize) {
+      super(mailbox);
+      _maxByteStringSize = maxByteStringSize;
+    }
+
+    @Override
+    protected int processAndSend(DataBlock dataBlock)
+        throws IOException {
+      List<ByteString> byteStrings = toByteStrings(dataBlock, 
_maxByteStringSize);
+      int sizeInBytes = 0;
+      for (ByteString byteString : byteStrings) {
+        sizeInBytes += byteString.size();
+      }
+      Iterator<ByteString> byteStringIt = byteStrings.iterator();
+      while (byteStringIt.hasNext()) {
+        ByteString byteString = byteStringIt.next();
+        boolean waitForMore = byteStringIt.hasNext();
+        _mailbox.sendContent(byteString, waitForMore);
+      }
+      return sizeInBytes;

Review Comment:
   I don't know if this is a codecov bug or not, but the codecov chrome plugin 
says these lines are not tested



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to