gortiz commented on code in PR #11205: URL: https://github.com/apache/pinot/pull/11205#discussion_r1285457564
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java: ########## @@ -51,16 +51,30 @@ public class ReceivingMailbox { // TODO: Revisit if this is the correct way to apply back pressure private final BlockingQueue<TransferableBlock> _blocks = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS); private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>(); + @Nullable + private volatile Reader _reader; public ReceivingMailbox(String id, Consumer<OpChainId> receiveMailCallback) { _id = id; _receiveMailCallback = receiveMailCallback; } + public void registeredReader(Reader reader) { Review Comment: > At that moment the reader is not registered but there are contents within the mailbox Is that something that can only happen in `InMemorySendingMailbox`? I though it could also happen in the grpc one. Anyway, this supports this pattern: 1. If reader is created before the writer, no problem. It will be blocked in the queue and registered as listener. 2. In the other case, the writer will add the block to the queue and will notify nobody. It will continue producing messages until the stream is closed or the buffer is full (same conditions we had before). 3. Once the reader is registered, it won't see the previous notifications (because sender didn't send them) but it will first read the queue. 4. Once the queue is completely consumed, instead of blocking on the queue, we block on a operator defined structure (which is also a queue, but that is not relevant). That structure is shared for the operator, which may have several read mailboxes, and unblock whenever one them has data. In fact this part is very similar to what we had before. But instead of storing that structure in a map in the dispatcher /scheduler (which sounds strange now that there is nothing to schedule), it is stored in the operator. > Our use pattern is not standard, so not sure if we should put a general framework. That is not correct. First, it is quite standard. In fact reactive-streams implements the same feature (in a even more generic fashion, letting you chose between cold and hot streams). I would even suggest to use reactive patterns in future to make the code simpler from our side. But also that is not correct inside our own project. Right now, in master, both `MailboxReceiveOperator` and `SortedMailboxReceiveOperator` share this code (in the algorithmic form used in master, with noops and delegating on the dispatcher/scheduler to block). Also, as shown in the previous commits, pipeline breaker could be implemented using the same pattern (but using blocking streams instead of unblocking). I've removed that because you had some concerns, but we use that pattern in at least 2 places. Given how complex is to reason about concurrency, I think it is quite better to use a single, generic structure that can be tested in isolation and then used that in different places. If you are really concerned about the lack of reusability, then we can rollback the changes introduced in c936badf3f8f158568d51793ba59fe657acf7a13, when I've added this class in order to reuse the code between receiving mailbox and pipeline breaker and go back to the version we had in 650e4a88c032c1e9e0345cf82c577474fdd357e8, where mostly the same code was in `BaseMailboxReceiveOperator` and therefore only `MailboxReceiveOperator` and `SortedMailboxReceiveOperator`. Alternatively you may be asking to go even further back and recover the algorithm where the blocking code was in the dispatcher/scheduler. We can discuss about that, but I think that is an incorrect pattern that introduces dependencies between the scheduler and the operators. We can go back there if you think there is a correctness issue with this proposed new structure, but I don't think reusability is a strong reason recover that code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org