gortiz commented on code in PR #11205: URL: https://github.com/apache/pinot/pull/11205#discussion_r1277731284
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java: ########## @@ -51,16 +51,30 @@ public class ReceivingMailbox { // TODO: Revisit if this is the correct way to apply back pressure private final BlockingQueue<TransferableBlock> _blocks = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS); private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>(); + @Nullable + private volatile Reader _reader; public ReceivingMailbox(String id, Consumer<OpChainId> receiveMailCallback) { _id = id; _receiveMailCallback = receiveMailCallback; } + public void registeredReader(Reader reader) { Review Comment: This is my main contribution over @walterddr's code. Instead of storing a map in the scheduler that maps op chains to their blocking objects (in his case a blocking list of 1 element) here I'm registering the ReceivingOperator on the mailbox itself. By doing that we decouple the receiving operation from the scheduler. The ReceivingMailbox will notify the reader (if any) whenever a message is received. The older `_receiveMailCallback` is maintained because some tests use it, but we may decide to change it in a way that it doesn't need to allocate OpChainIds for each message sent. You can see BaseMailboxReceiveOperator.onData to understand how this callback is implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org