ankitsultana commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1128162136


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java:
##########
@@ -18,45 +18,62 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.apache.pinot.common.proto.Mailbox;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC-based implementation of {@link MailboxService}.
+ * GRPC-based implementation of {@link MailboxService}. Note that there can be 
cases where the ReceivingMailbox
+ * and/or the underlying connection can be leaked:
  *
- * <p>It maintains a collection of connected mailbox servers and clients to 
remote hosts. All indexed by the
- * mailboxID in the format of: 
<code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code>
+ * <ol>
+ *   <li>When the OpChain corresponding to the receiver was never 
registered.</li>
+ *   <li>When the receiving OpChain exited before data was sent for the first 
time by the sender.</li>
+ * </ol>
  *
- * <p>Connections are established/initiated from the sender side and only 
tier-down from the sender side as well.
- * In the event of exception or timed out, the connection is cloased based on 
a mutually agreed upon timeout period
- * after the last successful message sent/received.
- *
- * <p>Noted that:
- * <ul>
- *   <li>the latter part of the mailboxID consist of the channelID.</li>
- *   <li>the job_id should be uniquely identifying a send/receving pair, for 
example if one bundle job requires
- *   to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to 
distinguish the 2 different mailbox.</li>
- * </ul>
+ * To handle these cases, we store the {@link ReceivingMailbox} entries in a 
time-expiring cache. If there was a
+ * leak, the entry would be evicted, and in that case we also issue a cancel 
to ensure the underlying stream is also
+ * released.
  */
 public class GrpcMailboxService implements MailboxService<TransferableBlock> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcMailboxService.class);
   // channel manager
+  private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = 
Duration.ofMinutes(5);
   private final ChannelManager _channelManager;
   private final String _hostname;
   private final int _mailboxPort;
 
-  // maintaining a list of registered mailboxes.
-  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> 
_receivingMailboxMap =
-      new ConcurrentHashMap<>();
+  // We use a cache to ensure that the receiving mailbox and the underlying 
gRPC stream are not leaked in the cases
+  // where the corresponding OpChain is either never registered or died before 
the sender sent data for the first time.
+  private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache =
+      
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(),
 TimeUnit.MINUTES)
+          .removalListener(new RemovalListener<String, GrpcReceivingMailbox>() 
{
+            @Override
+            public void onRemoval(RemovalNotification<String, 
GrpcReceivingMailbox> notification) {
+              if (notification.wasEvicted()) {
+                // TODO: This should be tied with query deadline, but for that 
we need to know the query deadline
+                //  when the GrpcReceivingMailbox is initialized in 
MailboxContentStreamObserver.
+                LOGGER.info("Removing dangling GrpcReceivingMailbox: {}", 
notification.getKey());

Review Comment:
   Yeah we can change the log-level to warning.
   
   The cache based cleanup is only for cases where the corresponding OpChain 
was never registered (either the OpChain died before any data was received or 
the OpChain was never registered).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to