gortiz commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1285457564


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -51,16 +51,30 @@ public class ReceivingMailbox {
   // TODO: Revisit if this is the correct way to apply back pressure
   private final BlockingQueue<TransferableBlock> _blocks = new 
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
   private final AtomicReference<TransferableBlock> _errorBlock = new 
AtomicReference<>();
+  @Nullable
+  private volatile Reader _reader;
 
   public ReceivingMailbox(String id, Consumer<OpChainId> receiveMailCallback) {
     _id = id;
     _receiveMailCallback = receiveMailCallback;
   }
 
+  public void registeredReader(Reader reader) {

Review Comment:
   > At that moment the reader is not registered but there are contents within 
the mailbox
   
   Is that something that can only happen in `InMemorySendingMailbox`? I though 
it could also happen in the grpc one. Anyway, this supports this pattern:
   1. If reader is created before the writer, no problem. It will be blocked in 
the queue and registered as listener.
   2. In the other case, the writer will add the block to the queue and will 
notify nobody. It will continue producing messages until the stream is closed 
or the buffer is full (same conditions we had before).
   3. Once the reader is registered, it won't see the previous notifications 
(because sender didn't send them) but it will first read the queue.
   4. Once the queue is completely consumed, instead of blocking on the queue, 
we block on a operator defined structure (which is also a queue, but that is 
not relevant). That structure is shared for the operator, which may have 
several read mailboxes, and unblock whenever one them has data. In fact this 
part is very similar to what we had before. But instead of storing that 
structure in a map in the dispatcher /scheduler (which sounds strange now that 
there is nothing to schedule), it is stored in the operator.
   
   > Our use pattern is not standard, so not sure if we should put a general 
framework.
   
   That is not correct. First, it is quite standard. In fact reactive-streams 
implements the same feature (in a even more generic fashion, letting you chose 
between cold and hot streams). I would even suggest to use reactive patterns in 
future to make the code simpler from our side.
   
   But also that is not correct inside our own project. Right now, in master, 
both `MailboxReceiveOperator` and `SortedMailboxReceiveOperator` share this 
code (in the algorithmic form used in master, with noops and delegating on the 
dispatcher/scheduler to block). Also, as shown in the previous commits, 
pipeline breaker could be implemented using the same pattern (but using 
blocking streams instead of unblocking). I've removed that because you had some 
concerns, but we use that pattern in at least 2 places. Given how complex is to 
reason about concurrency, I think it is quite better to use a single, generic 
structure that can be tested in isolation and then used that in different 
places.
   
   If you are really concerned about the lack of reusability, then we can 
rollback the changes introduced in  c936badf3f8f158568d51793ba59fe657acf7a13, 
when I've added this class in order to reuse the code between receiving mailbox 
and pipeline breaker and go back to the version we had in  
650e4a88c032c1e9e0345cf82c577474fdd357e8, where mostly the same code was in 
`BaseMailboxReceiveOperator` and therefore only `MailboxReceiveOperator` and 
`SortedMailboxReceiveOperator`.
   
   Alternatively you may be asking to go even further back and recover the 
algorithm where the blocking code was in the dispatcher/scheduler. We can 
discuss about that, but I think that is an incorrect pattern that introduces 
dependencies between the scheduler and the operators. We can go back there if 
you think there is a correctness issue with this proposed new structure, but I 
don't think reusability is a strong reason recover that code.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to