gortiz commented on code in PR #11205: URL: https://github.com/apache/pinot/pull/11205#discussion_r1277707830
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java: ########## @@ -144,17 +150,30 @@ public SendingMailbox getSendingMailbox(String hostname, int port, String mailbo } } - /** - * Returns the receiving mailbox for the given mailbox id. - */ - public ReceivingMailbox getReceivingMailbox(String mailboxId) { + public ReceivingMailbox getReceivingMailbox(String mailboxId, @Nullable Consumer<OpChainId> extraCallback) { try { - return _receivingMailboxCache.get(mailboxId, () -> new ReceivingMailbox(mailboxId, _unblockOpChainCallback)); + Consumer<OpChainId> callback; + if (extraCallback == null) { + callback = _unblockOpChainCallback; + } else { + callback = opChainId -> { + extraCallback.accept(opChainId); Review Comment: This is my main contribution over @walterddr's code. Instead of storing a map in the scheduler that maps op chains to their blocking objets (in his case a blocking list of 1 element) here I'm sending a message to the opchain itself. By doing that we decouple the receiving operation from the scheduler. You can see `BaseMailboxReceiveOperator.onData` to understand what it does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org