gortiz commented on code in PR #15694: URL: https://github.com/apache/pinot/pull/15694#discussion_r2073162113
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -319,4 +277,75 @@ static List<ByteString> toByteStrings(List<ByteBuffer> bytes, int maxByteStringS return result; } + + private static abstract class Sender { + protected final GrpcSendingMailbox _mailbox; + + protected Sender(GrpcSendingMailbox mailbox) { + _mailbox = mailbox; + } + + void send(MseBlock block, List<DataBuffer> serializedStats) + throws IOException { + _mailbox._statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1); + long start = System.currentTimeMillis(); + try { + DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, serializedStats); + int sizeInBytes = processAndSend(dataBlock); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Serialized block: {} to {} bytes", block, sizeInBytes); + } + _mailbox._statMap.merge(MailboxSendOperator.StatKey.SERIALIZED_BYTES, sizeInBytes); + } catch (Throwable t) { + LOGGER.warn("Caught exception while serializing block: {}", block, t); + throw t; + } finally { + _mailbox._statMap.merge(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS, System.currentTimeMillis() - start); + } + } + + protected abstract int processAndSend(DataBlock dataBlock) + throws IOException; + } + + private static class SplitSender extends Sender { + private final int _maxByteStringSize; + + public SplitSender(GrpcSendingMailbox mailbox, int maxByteStringSize) { + super(mailbox); + _maxByteStringSize = maxByteStringSize; + } + + @Override + protected int processAndSend(DataBlock dataBlock) + throws IOException { + List<ByteString> byteStrings = toByteStrings(dataBlock, _maxByteStringSize); + int sizeInBytes = 0; + for (ByteString byteString : byteStrings) { + sizeInBytes += byteString.size(); + } + Iterator<ByteString> byteStringIt = byteStrings.iterator(); + while (byteStringIt.hasNext()) { + ByteString byteString = byteStringIt.next(); + boolean waitForMore = byteStringIt.hasNext(); + _mailbox.sendContent(byteString, waitForMore); + } + return sizeInBytes; Review Comment: I don't know if this is a codecov bug or not, but the codecov chrome plugin says these lines are not tested -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org