albertobastos commented on code in PR #15571: URL: https://github.com/apache/pinot/pull/15571#discussion_r2063163586
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -229,4 +254,64 @@ public DataBlock visit(ErrorMseBlock block, List<DataBuffer> serializedStats) { } } } + + @VisibleForTesting + public static List<ByteString> toByteStrings(DataBlock dataBlock, int maxByteStringSize) Review Comment: Yeah that's enough, forgot to change it after moving them from the Utils class. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java: ########## @@ -229,4 +254,64 @@ public DataBlock visit(ErrorMseBlock block, List<DataBuffer> serializedStats) { } } } + + @VisibleForTesting + public static List<ByteString> toByteStrings(DataBlock dataBlock, int maxByteStringSize) + throws IOException { + return toByteStrings(dataBlock.serialize(), maxByteStringSize); + } + + @VisibleForTesting + public static List<ByteString> toByteStrings(List<ByteBuffer> bytes, int maxByteStringSize) { + if (bytes.isEmpty()) { + return EMPTY_BYTEBUFFER_LIST; + } + + int totalBytes = 0; + for (ByteBuffer bb : bytes) { + totalBytes += bb.remaining(); + } + int initialCapacity = (totalBytes / maxByteStringSize) + bytes.size(); + List<ByteString> result = new ArrayList<>(initialCapacity); + + ByteString acc = ByteString.EMPTY; + int available = maxByteStringSize; + + for (ByteBuffer bb: bytes) { + int from = bb.position(); + int remaining = bb.limit() - from; + while (remaining > 0) { + if (remaining <= available) { + acc = acc.concat(UnsafeByteOperations.unsafeWrap(sliceByteBuffer(bb, from, from + remaining))); + available -= remaining; + remaining = 0; + } else { + acc = acc.concat(UnsafeByteOperations.unsafeWrap(sliceByteBuffer(bb, from, from + available))); + from += available; + remaining -= available; + result.add(acc); + acc = ByteString.EMPTY; + available = maxByteStringSize; + } + } + } + result.add(acc); + + return result; + } + + // polyfill because ByteBuffer.slice(pos, lim) is not available until Java 13 + private static ByteBuffer sliceByteBuffer(ByteBuffer bb, int position, int limit) { + int oldPosition = bb.position(); + int oldLimit = bb.limit(); + + try { + bb.position(position); + bb.limit(limit); + return bb.slice(); + } finally { + bb.position(oldPosition); + bb.limit(oldLimit); + } Review Comment: As we talked offline, it depends on what we want exactly to achieve during the splitting process. If we want to optimize the amount of output chunks (filling chunks of `maxByteStringSize` until the very last chunk) it doesn't get much better than this. If we only want to bother splitting when a `ByteBuffer` exceeds the `maxByteStringSize` and we're ok with just pushing as-is the ones smaller than that, we could get rid of the slicing (although we will still need to update the buffer internal limit and position during the splitting). I'm not convinced yet about changing it. Talking about performance, wouldn't say that updating limit and position cursors are too expensive (internally is just some simple assignations and comparisons). Talking about readibility... I believe current code is quite readable, to be honest 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org