gortiz commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1277731284


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -51,16 +51,30 @@ public class ReceivingMailbox {
   // TODO: Revisit if this is the correct way to apply back pressure
   private final BlockingQueue<TransferableBlock> _blocks = new 
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
   private final AtomicReference<TransferableBlock> _errorBlock = new 
AtomicReference<>();
+  @Nullable
+  private volatile Reader _reader;
 
   public ReceivingMailbox(String id, Consumer<OpChainId> receiveMailCallback) {
     _id = id;
     _receiveMailCallback = receiveMailCallback;
   }
 
+  public void registeredReader(Reader reader) {

Review Comment:
   This is my main contribution over @walterddr's code. Instead of storing a 
map in the scheduler that maps op chains to their blocking objects (in his case 
a blocking list of 1 element) here I'm registering the ReceivingOperator on the 
mailbox itself. By doing that we decouple the receiving operation from the 
scheduler.
   
   
   
   The ReceivingMailbox will notify the reader (if any) whenever a message is 
received. The older `_receiveMailCallback` is maintained because some tests use 
it, but we may decide to change it in a way that it doesn't need to allocate 
OpChainIds for each message sent.
   
   You can see BaseMailboxReceiveOperator.onData to understand how this 
callback is implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to