walterddr commented on code in PR #10322: URL: https://github.com/apache/pinot/pull/10322#discussion_r1126724681
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java: ########## @@ -50,21 +55,51 @@ int getMailboxPort(); /** - * Look up a receiving mailbox by {@link MailboxIdentifier}. - * - * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been - * initialized. + * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}. * * @param mailboxId mailbox identifier. * @return a receiving mailbox. */ ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId); /** - * Look up a sending mailbox by {@link MailboxIdentifier}. + * Same as {@link #getReceivingMailbox} but this would return null if the mailbox isn't already created. + */ + @Nullable + ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier mailboxId); + + /** + * Return a sending-mailbox for the given {@link MailboxIdentifier}. The returned {@link SendingMailbox} is + * uninitialized, i.e. it will not open the underlying channel or acquire any additional resources. Instead the + * {@link SendingMailbox} will initialize lazily when the data is sent for the first time through it. * * @param mailboxId mailbox identifier. + * @param deadlineMs deadline in milliseconds, which is usually the same as the query deadline. * @return a sending mailbox. */ - SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId); + SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs); + + /** + * A {@link ReceivingMailbox} for a given {@link OpChain} may be created before the OpChain is even registered. + * Reason being that the sender starts sending data, and the receiver starts receiving the same without waiting for + * the OpChain to be registered. The ownership for the ReceivingMailbox hence lies with the MailboxService and not + * the OpChain. There are two ways in which a MailboxService may release its references to a ReceivingMailbox and + * the underlying resources: + * + * <ol> + * <li> + * If the OpChain corresponding to a ReceivingMailbox was closed or cancelled. In that case, + * {@link MailboxReceiveOperator} will call this method as part of its close/cancel call. This is the main + * reason why this method exists. + * </li> + * <li> + * There can be cases where the corresponding OpChain was never registered with the scheduler. In that case, it + * is up to the {@link MailboxService} to ensure that there are no leaks of resources. E.g. it could setup a + * periodic job to detect such mailbox and do any clean-up. Note that for this case, it is not mandatory for + * the {@link MailboxService} to use this method. It can use any internal method it needs to do the clean-up. + * </li> + * </ol> + * @param mailboxId + */ + void releaseReceivingMailbox(MailboxIdentifier mailboxId); Review Comment: on the contrary, sending side, mailboxservice NEVER owns the GRPCSendingMailbox, b/c it is always constructed and initialized by the `MailboxSendObserver`, so mailbox send observer holds the reference to the StatusStreamObserver and thus responsible for calling close / cancel on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org