gortiz commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1277707830


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -144,17 +150,30 @@ public SendingMailbox getSendingMailbox(String hostname, 
int port, String mailbo
     }
   }
 
-  /**
-   * Returns the receiving mailbox for the given mailbox id.
-   */
-  public ReceivingMailbox getReceivingMailbox(String mailboxId) {
+  public ReceivingMailbox getReceivingMailbox(String mailboxId, @Nullable 
Consumer<OpChainId> extraCallback) {
     try {
-      return _receivingMailboxCache.get(mailboxId, () -> new 
ReceivingMailbox(mailboxId, _unblockOpChainCallback));
+      Consumer<OpChainId> callback;
+      if (extraCallback == null) {
+        callback = _unblockOpChainCallback;
+      } else {
+        callback = opChainId -> {
+          extraCallback.accept(opChainId);

Review Comment:
   This is my main contribution over @walterddr's code. Instead of storing a 
map in the scheduler that maps op chains to their blocking objets (in his case 
a blocking list of 1 element) here I'm sending a message to the opchain itself. 
By doing that we decouple the receiving operation from the scheduler.
   
   You can see `BaseMailboxReceiveOperator.onData` to understand what it does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to