walterddr commented on code in PR #10322: URL: https://github.com/apache/pinot/pull/10322#discussion_r1126672929
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java: ########## @@ -18,45 +18,62 @@ */ package org.apache.pinot.query.mailbox; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import io.grpc.ManagedChannel; -import io.grpc.stub.StreamObserver; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import org.apache.pinot.common.proto.Mailbox; +import javax.annotation.Nullable; import org.apache.pinot.common.proto.PinotMailboxGrpc; import org.apache.pinot.query.mailbox.channel.ChannelManager; import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * GRPC-based implementation of {@link MailboxService}. + * GRPC-based implementation of {@link MailboxService}. Note that there can be cases where the ReceivingMailbox + * and/or the underlying connection can be leaked: * - * <p>It maintains a collection of connected mailbox servers and clients to remote hosts. All indexed by the - * mailboxID in the format of: <code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code> + * <ol> + * <li>When the OpChain corresponding to the receiver was never registered.</li> + * <li>When the receiving OpChain exited before data was sent for the first time by the sender.</li> + * </ol> * - * <p>Connections are established/initiated from the sender side and only tier-down from the sender side as well. - * In the event of exception or timed out, the connection is cloased based on a mutually agreed upon timeout period - * after the last successful message sent/received. - * - * <p>Noted that: - * <ul> - * <li>the latter part of the mailboxID consist of the channelID.</li> - * <li>the job_id should be uniquely identifying a send/receving pair, for example if one bundle job requires - * to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li> - * </ul> + * To handle these cases, we store the {@link ReceivingMailbox} entries in a time-expiring cache. If there was a + * leak, the entry would be evicted, and in that case we also issue a cancel to ensure the underlying stream is also + * released. */ public class GrpcMailboxService implements MailboxService<TransferableBlock> { + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcMailboxService.class); // channel manager + private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = Duration.ofMinutes(5); private final ChannelManager _channelManager; private final String _hostname; private final int _mailboxPort; - // maintaining a list of registered mailboxes. - private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap = - new ConcurrentHashMap<>(); + // We use a cache to ensure that the receiving mailbox and the underlying gRPC stream are not leaked in the cases + // where the corresponding OpChain is either never registered or died before the sender sent data for the first time. + private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache = + CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(), TimeUnit.MINUTES) + .removalListener(new RemovalListener<String, GrpcReceivingMailbox>() { + @Override + public void onRemoval(RemovalNotification<String, GrpcReceivingMailbox> notification) { + if (notification.wasEvicted()) { + // TODO: This should be tied with query deadline, but for that we need to know the query deadline + // when the GrpcReceivingMailbox is initialized in MailboxContentStreamObserver. + LOGGER.info("Removing dangling GrpcReceivingMailbox: {}", notification.getKey()); + notification.getValue().cancel(); Review Comment: cancel needs to be idempotent. there's a chance it got called multiple times. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java: ########## @@ -18,45 +18,62 @@ */ package org.apache.pinot.query.mailbox; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import io.grpc.ManagedChannel; -import io.grpc.stub.StreamObserver; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import org.apache.pinot.common.proto.Mailbox; +import javax.annotation.Nullable; import org.apache.pinot.common.proto.PinotMailboxGrpc; import org.apache.pinot.query.mailbox.channel.ChannelManager; import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * GRPC-based implementation of {@link MailboxService}. + * GRPC-based implementation of {@link MailboxService}. Note that there can be cases where the ReceivingMailbox + * and/or the underlying connection can be leaked: * - * <p>It maintains a collection of connected mailbox servers and clients to remote hosts. All indexed by the - * mailboxID in the format of: <code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code> + * <ol> + * <li>When the OpChain corresponding to the receiver was never registered.</li> + * <li>When the receiving OpChain exited before data was sent for the first time by the sender.</li> + * </ol> * - * <p>Connections are established/initiated from the sender side and only tier-down from the sender side as well. - * In the event of exception or timed out, the connection is cloased based on a mutually agreed upon timeout period - * after the last successful message sent/received. - * - * <p>Noted that: - * <ul> - * <li>the latter part of the mailboxID consist of the channelID.</li> - * <li>the job_id should be uniquely identifying a send/receving pair, for example if one bundle job requires - * to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li> - * </ul> + * To handle these cases, we store the {@link ReceivingMailbox} entries in a time-expiring cache. If there was a + * leak, the entry would be evicted, and in that case we also issue a cancel to ensure the underlying stream is also + * released. */ public class GrpcMailboxService implements MailboxService<TransferableBlock> { + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcMailboxService.class); // channel manager + private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = Duration.ofMinutes(5); private final ChannelManager _channelManager; private final String _hostname; private final int _mailboxPort; - // maintaining a list of registered mailboxes. - private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap = - new ConcurrentHashMap<>(); + // We use a cache to ensure that the receiving mailbox and the underlying gRPC stream are not leaked in the cases + // where the corresponding OpChain is either never registered or died before the sender sent data for the first time. + private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache = + CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(), TimeUnit.MINUTES) + .removalListener(new RemovalListener<String, GrpcReceivingMailbox>() { + @Override + public void onRemoval(RemovalNotification<String, GrpcReceivingMailbox> notification) { + if (notification.wasEvicted()) { + // TODO: This should be tied with query deadline, but for that we need to know the query deadline + // when the GrpcReceivingMailbox is initialized in MailboxContentStreamObserver. + LOGGER.info("Removing dangling GrpcReceivingMailbox: {}", notification.getKey()); Review Comment: log level to warning? Ideally the opChain should cancel mailboxes. gradually we should make it less depend on the cache removal. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java: ########## @@ -50,21 +55,51 @@ int getMailboxPort(); /** - * Look up a receiving mailbox by {@link MailboxIdentifier}. - * - * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been - * initialized. + * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}. * * @param mailboxId mailbox identifier. * @return a receiving mailbox. */ ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId); /** - * Look up a sending mailbox by {@link MailboxIdentifier}. + * Same as {@link #getReceivingMailbox} but this would return null if the mailbox isn't already created. + */ + @Nullable + ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier mailboxId); + + /** + * Return a sending-mailbox for the given {@link MailboxIdentifier}. The returned {@link SendingMailbox} is + * uninitialized, i.e. it will not open the underlying channel or acquire any additional resources. Instead the + * {@link SendingMailbox} will initialize lazily when the data is sent for the first time through it. * * @param mailboxId mailbox identifier. + * @param deadlineMs deadline in milliseconds, which is usually the same as the query deadline. * @return a sending mailbox. */ - SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId); + SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs); + + /** + * A {@link ReceivingMailbox} for a given {@link OpChain} may be created before the OpChain is even registered. + * Reason being that the sender starts sending data, and the receiver starts receiving the same without waiting for + * the OpChain to be registered. The ownership for the ReceivingMailbox hence lies with the MailboxService and not + * the OpChain. There are two ways in which a MailboxService may release its references to a ReceivingMailbox and + * the underlying resources: + * + * <ol> + * <li> + * If the OpChain corresponding to a ReceivingMailbox was closed or cancelled. In that case, + * {@link MailboxReceiveOperator} will call this method as part of its close/cancel call. This is the main + * reason why this method exists. + * </li> + * <li> + * There can be cases where the corresponding OpChain was never registered with the scheduler. In that case, it + * is up to the {@link MailboxService} to ensure that there are no leaks of resources. E.g. it could setup a + * periodic job to detect such mailbox and do any clean-up. Note that for this case, it is not mandatory for + * the {@link MailboxService} to use this method. It can use any internal method it needs to do the clean-up. + * </li> + * </ol> + * @param mailboxId + */ + void releaseReceivingMailbox(MailboxIdentifier mailboxId); Review Comment: IMO. 1. `MailboxService` ALWAYS owns the `MailboxContentStreamObserver` (and thus the `GRPCReceivingMailbox`) ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java: ########## @@ -50,21 +55,51 @@ int getMailboxPort(); /** - * Look up a receiving mailbox by {@link MailboxIdentifier}. - * - * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been - * initialized. + * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}. * * @param mailboxId mailbox identifier. * @return a receiving mailbox. */ ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId); /** - * Look up a sending mailbox by {@link MailboxIdentifier}. + * Same as {@link #getReceivingMailbox} but this would return null if the mailbox isn't already created. + */ + @Nullable + ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier mailboxId); Review Comment: This API is not needed. make it private and annotate the above method as Nullable. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java: ########## @@ -50,21 +55,51 @@ int getMailboxPort(); /** - * Look up a receiving mailbox by {@link MailboxIdentifier}. - * - * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been - * initialized. + * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}. * * @param mailboxId mailbox identifier. * @return a receiving mailbox. */ ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId); /** - * Look up a sending mailbox by {@link MailboxIdentifier}. + * Same as {@link #getReceivingMailbox} but this would return null if the mailbox isn't already created. + */ + @Nullable + ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier mailboxId); + + /** + * Return a sending-mailbox for the given {@link MailboxIdentifier}. The returned {@link SendingMailbox} is + * uninitialized, i.e. it will not open the underlying channel or acquire any additional resources. Instead the + * {@link SendingMailbox} will initialize lazily when the data is sent for the first time through it. * * @param mailboxId mailbox identifier. + * @param deadlineMs deadline in milliseconds, which is usually the same as the query deadline. * @return a sending mailbox. */ - SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId); + SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs); + + /** + * A {@link ReceivingMailbox} for a given {@link OpChain} may be created before the OpChain is even registered. + * Reason being that the sender starts sending data, and the receiver starts receiving the same without waiting for + * the OpChain to be registered. The ownership for the ReceivingMailbox hence lies with the MailboxService and not + * the OpChain. There are two ways in which a MailboxService may release its references to a ReceivingMailbox and + * the underlying resources: + * + * <ol> + * <li> + * If the OpChain corresponding to a ReceivingMailbox was closed or cancelled. In that case, + * {@link MailboxReceiveOperator} will call this method as part of its close/cancel call. This is the main + * reason why this method exists. + * </li> + * <li> + * There can be cases where the corresponding OpChain was never registered with the scheduler. In that case, it + * is up to the {@link MailboxService} to ensure that there are no leaks of resources. E.g. it could setup a + * periodic job to detect such mailbox and do any clean-up. Note that for this case, it is not mandatory for + * the {@link MailboxService} to use this method. It can use any internal method it needs to do the clean-up. + * </li> + * </ol> + * @param mailboxId + */ + void releaseReceivingMailbox(MailboxIdentifier mailboxId); Review Comment: i think releaseReceivingMailbox should only invalidate the mailbox cached inside the MailboxService. MailboxOperator will have 1. info regarding the actual mailbox 2. the mailbox service that caches the mailboxID -> mailbox object mapping entry we need to make the ownership relation a bit more clear. to me with the current model, cancel/close needs to be idempotent and states needs to be kept and even though it is only done 1 once per mailbox. each call into these state transition will have to be guarded by locks and it is not ideal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org