ankitsultana commented on code in PR #10322: URL: https://github.com/apache/pinot/pull/10322#discussion_r1128158528
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java: ########## @@ -18,45 +18,62 @@ */ package org.apache.pinot.query.mailbox; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import io.grpc.ManagedChannel; -import io.grpc.stub.StreamObserver; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import org.apache.pinot.common.proto.Mailbox; +import javax.annotation.Nullable; import org.apache.pinot.common.proto.PinotMailboxGrpc; import org.apache.pinot.query.mailbox.channel.ChannelManager; import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * GRPC-based implementation of {@link MailboxService}. + * GRPC-based implementation of {@link MailboxService}. Note that there can be cases where the ReceivingMailbox + * and/or the underlying connection can be leaked: * - * <p>It maintains a collection of connected mailbox servers and clients to remote hosts. All indexed by the - * mailboxID in the format of: <code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code> + * <ol> + * <li>When the OpChain corresponding to the receiver was never registered.</li> + * <li>When the receiving OpChain exited before data was sent for the first time by the sender.</li> + * </ol> * - * <p>Connections are established/initiated from the sender side and only tier-down from the sender side as well. - * In the event of exception or timed out, the connection is cloased based on a mutually agreed upon timeout period - * after the last successful message sent/received. - * - * <p>Noted that: - * <ul> - * <li>the latter part of the mailboxID consist of the channelID.</li> - * <li>the job_id should be uniquely identifying a send/receving pair, for example if one bundle job requires - * to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li> - * </ul> + * To handle these cases, we store the {@link ReceivingMailbox} entries in a time-expiring cache. If there was a + * leak, the entry would be evicted, and in that case we also issue a cancel to ensure the underlying stream is also + * released. */ public class GrpcMailboxService implements MailboxService<TransferableBlock> { + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcMailboxService.class); // channel manager + private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = Duration.ofMinutes(5); private final ChannelManager _channelManager; private final String _hostname; private final int _mailboxPort; - // maintaining a list of registered mailboxes. - private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap = - new ConcurrentHashMap<>(); + // We use a cache to ensure that the receiving mailbox and the underlying gRPC stream are not leaked in the cases + // where the corresponding OpChain is either never registered or died before the sender sent data for the first time. + private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache = + CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(), TimeUnit.MINUTES) + .removalListener(new RemovalListener<String, GrpcReceivingMailbox>() { + @Override + public void onRemoval(RemovalNotification<String, GrpcReceivingMailbox> notification) { + if (notification.wasEvicted()) { + // TODO: This should be tied with query deadline, but for that we need to know the query deadline + // when the GrpcReceivingMailbox is initialized in MailboxContentStreamObserver. + LOGGER.info("Removing dangling GrpcReceivingMailbox: {}", notification.getKey()); + notification.getValue().cancel(); Review Comment: Yeah that's required. I'll add it to the interface javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org