walterddr commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1126952976


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -18,79 +18,104 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
+import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC implementation of the {@link SendingMailbox}.
+ * gRPC implementation of the {@link SendingMailbox}. The gRPC stream is 
created on the first call to {@link #send}.
  */
 public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcSendingMailbox.class);
   private final String _mailboxId;
   private final AtomicBoolean _initialized = new AtomicBoolean(false);
-  private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
 
-  private final CountDownLatch _finishLatch;
-  private final StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+  private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+  private final Function<Long, StreamObserver<MailboxContent>> 
_mailboxContentStreamObserverSupplier;
+  private final MailboxStatusStreamObserver _statusObserver;
+  private final long _deadlineMs;
 
-  public GrpcSendingMailbox(String mailboxId, StreamObserver<MailboxContent> 
mailboxContentStreamObserver,
-      CountDownLatch latch) {
+  public GrpcSendingMailbox(String mailboxId, MailboxStatusStreamObserver 
statusObserver,
+      Function<Long, StreamObserver<MailboxContent>> 
contentStreamObserverSupplier, long deadlineMs) {
     _mailboxId = mailboxId;
-    _mailboxContentStreamObserver = mailboxContentStreamObserver;
-    _finishLatch = latch;
-    _initialized.set(false);
+    _mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
+    _statusObserver = statusObserver;
+    _deadlineMs = deadlineMs;
   }
 
   @Override
   public void send(TransferableBlock block)
-      throws UnsupportedOperationException {
+      throws Exception {
     if (!_initialized.get()) {
-      // initialization is special
       open();
     }
+    Preconditions.checkState(!_statusObserver.isFinished(),
+        "Called send when stream is already closed for mailbox=" + _mailboxId);
     MailboxContent data = toMailboxContent(block.getDataBlock());
     _mailboxContentStreamObserver.onNext(data);
-    _totalMsgSent.incrementAndGet();
   }
 
   @Override
-  public void complete() {
+  public void complete()
+      throws Exception {
     _mailboxContentStreamObserver.onCompleted();
   }
 
   @Override
-  public void open() {
-    // TODO: Get rid of init call.
-    // send a begin-of-stream message.
-    
_mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
-        .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, 
"true").build());
-    _initialized.set(true);
+  public boolean isInitialized() {
+    return _initialized.get();
   }
 
+  /**
+   * As required by {@link SendingMailbox#isClosed()}, we return true only if 
this mailbox is done sending all the
+   * data and has released all underlying resources. To check whether all 
resources have been released, i.e. the
+   * underlying gRPC stream has been closed, we use {@link 
MailboxStatusStreamObserver#isFinished()}.
+   */
   @Override
-  public String getMailboxId() {
-    return _mailboxId;
+  public boolean isClosed() {

Review Comment:
   i dont think this API is needed. 
   
   mailbox should only expose. 
   send, complete, cancel, close. whether it is closed should be an internal 
state of mailbox. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -70,29 +79,37 @@ public Consumer<MailboxIdentifier> 
init(MailboxContentStreamObserver streamObser
    *  2. If the received block from the sender is a data-block with 0 rows.
    * </p>
    */
+  @Nullable
   @Override
-  public TransferableBlock receive()
-      throws Exception {
+  public TransferableBlock receive() throws Exception {
     if (!waitForInitialize()) {
       return null;
     }
     MailboxContent mailboxContent = _contentStreamObserver.poll();
-    _totalMsgReceived.incrementAndGet();
     return mailboxContent == null ? null : fromMailboxContent(mailboxContent);
   }
 
   @Override
   public boolean isInitialized() {
-    return _initializationLatch.getCount() <= 0;
+    return _initializationLatch.getCount() == 0;
   }
 
   @Override
   public boolean isClosed() {
-    return isInitialized() && _contentStreamObserver.isCompleted();
+    return isInitialized() && _contentStreamObserver.hasConsumedAllData();
   }
 
   @Override
-  public void cancel(Throwable e) {
+  public void cancel() {
+    if (isInitialized()) {
+      try {
+        _statusStreamObserver.onError(Status.CANCELLED.asRuntimeException());
+      } catch (Exception e) {
+        // TODO: This can happen if the call is already closed. Consider 
removing this log altogether or find a way
+        //  to check if the stream is already closed.
+        LOGGER.info("Tried to cancel receiving mailbox", e);

Review Comment:
   let's search for a better way to deal with already closed issue. 
   I think i read some article regarding close/half-close conditions. but we 
can follow up later. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -18,79 +18,104 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
+import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC implementation of the {@link SendingMailbox}.
+ * gRPC implementation of the {@link SendingMailbox}. The gRPC stream is 
created on the first call to {@link #send}.
  */
 public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcSendingMailbox.class);
   private final String _mailboxId;
   private final AtomicBoolean _initialized = new AtomicBoolean(false);
-  private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
 
-  private final CountDownLatch _finishLatch;
-  private final StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+  private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+  private final Function<Long, StreamObserver<MailboxContent>> 
_mailboxContentStreamObserverSupplier;
+  private final MailboxStatusStreamObserver _statusObserver;
+  private final long _deadlineMs;
 
-  public GrpcSendingMailbox(String mailboxId, StreamObserver<MailboxContent> 
mailboxContentStreamObserver,
-      CountDownLatch latch) {
+  public GrpcSendingMailbox(String mailboxId, MailboxStatusStreamObserver 
statusObserver,
+      Function<Long, StreamObserver<MailboxContent>> 
contentStreamObserverSupplier, long deadlineMs) {
     _mailboxId = mailboxId;
-    _mailboxContentStreamObserver = mailboxContentStreamObserver;
-    _finishLatch = latch;
-    _initialized.set(false);
+    _mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
+    _statusObserver = statusObserver;
+    _deadlineMs = deadlineMs;
   }
 
   @Override
   public void send(TransferableBlock block)
-      throws UnsupportedOperationException {
+      throws Exception {
     if (!_initialized.get()) {
-      // initialization is special
       open();
     }
+    Preconditions.checkState(!_statusObserver.isFinished(),
+        "Called send when stream is already closed for mailbox=" + _mailboxId);
     MailboxContent data = toMailboxContent(block.getDataBlock());
     _mailboxContentStreamObserver.onNext(data);
-    _totalMsgSent.incrementAndGet();
   }
 
   @Override
-  public void complete() {
+  public void complete()
+      throws Exception {
     _mailboxContentStreamObserver.onCompleted();
   }
 
   @Override
-  public void open() {
-    // TODO: Get rid of init call.
-    // send a begin-of-stream message.
-    
_mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
-        .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, 
"true").build());
-    _initialized.set(true);
+  public boolean isInitialized() {
+    return _initialized.get();
   }
 
+  /**
+   * As required by {@link SendingMailbox#isClosed()}, we return true only if 
this mailbox is done sending all the
+   * data and has released all underlying resources. To check whether all 
resources have been released, i.e. the
+   * underlying gRPC stream has been closed, we use {@link 
MailboxStatusStreamObserver#isFinished()}.
+   */
   @Override
-  public String getMailboxId() {
-    return _mailboxId;
+  public boolean isClosed() {
+    return _initialized.get() && _statusObserver.isFinished();
   }
 
   @Override
-  public void waitForFinish(long timeout, TimeUnit unit)
-      throws InterruptedException {
-    _finishLatch.await(timeout, unit);
+  public void cancel(Throwable t) {
+    if (_initialized.get() && !_statusObserver.isFinished()) {

Review Comment:
   as you also check this here. there's no need to check isClosed()



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java:
##########
@@ -49,12 +48,20 @@
  */
 public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.MailboxContent> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MailboxContentStreamObserver.class);
+  private static final int DEFAULT_MAX_PENDING_MAILBOX_CONTENT = 5;
+  private static final long DEFAULT_QUEUE_POLL_TIMEOUT_MS = 120_000;

Review Comment:
   what's the reason for this? and how is this determined?
   since it is controlled by the ReceiveOperator to move on to the next mailbox 
if the current one doesn'thave data. shouldn't we not put any wait here and 
simply return a null so that ReceiveOperator can decided what to do?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java:
##########
@@ -179,23 +158,38 @@ public void onNext(Mailbox.MailboxContent mailboxContent) 
{
 
   @Override
   public void onError(Throwable e) {
-    try {
-      _errorLock.writeLock().lock();
-      _errorContent = createErrorContent(e);
-      _gotMailCallback.accept(_mailboxId);
-      throw new RuntimeException(e);
-    } catch (IOException ioe) {
-      throw new RuntimeException("Unable to encode exception for cascade 
reporting: " + e, ioe);
-    } finally {
-      _errorLock.writeLock().unlock();
-      LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", 
_mailboxId);
-    }
+    _errorContent = createErrorContent(e);
+    _streamFinished = true;
+    _gotMailCallback.accept(_mailboxId);
   }
 
   @Override
   public void onCompleted() {
     _isCompleted.set(true);
+    _streamFinished = true;
     _responseObserver.onCompleted();
-    LOGGER.debug("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", 
_mailboxId);
+  }
+
+  public boolean hasConsumedAllData() {
+    return _isCompleted.get() && _receivingBuffer.isEmpty();
+  }
+
+  public boolean hasStreamFinished() {
+    return _streamFinished;
+  }

Review Comment:
   javadoc please. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -68,6 +68,7 @@ protected void run()
         @Override
         public void runJob() {
           boolean isFinished = false;
+          boolean returnedErrorBlock = false;

Review Comment:
   IMO this file's change can be checked in separately



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -71,15 +74,19 @@ protected 
BlockExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes
     _splitter = splitter;
   }
 
-  public void send(TransferableBlock block) {
+  public void send(TransferableBlock block)
+      throws Exception {
     if (block.isEndOfStreamBlock()) {
-      _sendingMailboxes.forEach(destination -> sendBlock(destination, block));
+      for (SendingMailbox sendingMailbox : _sendingMailboxes) {

Review Comment:
   generic 
   ```suggestion
         for (SendingMailbox<TransferableBlock> sendingMailbox : 
_sendingMailboxes) {
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -186,4 +186,25 @@ protected TransferableBlock getNextBlock() {
             : TransferableBlockUtils.getEndOfStreamTransferableBlock();
     return block;
   }
+
+  @Override
+  public void close() {
+    super.close();
+    for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+      _mailboxService.releaseReceivingMailbox(sendingMailbox);
+    }
+  }
+
+  @Override
+  public void cancel(Throwable t) {
+    super.cancel(t);
+    for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+      ReceivingMailbox<TransferableBlock> receivingMailbox = _mailboxService
+          .getReceivingMailboxIfPresent(sendingMailbox);
+      if (receivingMailbox != null) {
+        receivingMailbox.cancel();

Review Comment:
   dont call cancel here. instead rely on releaseReceivingMailbox to call 
cancel.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -93,5 +100,25 @@ protected void sendBlock(SendingMailbox<TransferableBlock> 
sendingMailbox, Trans
     }
   }
 
-  protected abstract void route(List<SendingMailbox<TransferableBlock>> 
destinations, TransferableBlock block);
+  protected abstract void route(List<SendingMailbox<TransferableBlock>> 
destinations, TransferableBlock block)
+      throws Exception;
+
+  // Called when the OpChain gracefully returns.
+  // TODO: This is a no-op right now.
+  public void close() {
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      if (sendingMailbox.isInitialized() && !sendingMailbox.isClosed()) {
+        LOGGER.info("SendingMailbox={} was not closed presumably because 
receiver hasn't completed processing",

Review Comment:
   with the suggested change we dont need to log here at all. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -93,5 +100,25 @@ protected void sendBlock(SendingMailbox<TransferableBlock> 
sendingMailbox, Trans
     }
   }
 
-  protected abstract void route(List<SendingMailbox<TransferableBlock>> 
destinations, TransferableBlock block);
+  protected abstract void route(List<SendingMailbox<TransferableBlock>> 
destinations, TransferableBlock block)
+      throws Exception;
+
+  // Called when the OpChain gracefully returns.
+  // TODO: This is a no-op right now.
+  public void close() {
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      if (sendingMailbox.isInitialized() && !sendingMailbox.isClosed()) {
+        LOGGER.info("SendingMailbox={} was not closed presumably because 
receiver hasn't completed processing",
+            sendingMailbox.getMailboxId());
+      }
+    }
+  }
+
+  public void cancel(Throwable t) {
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      if (sendingMailbox.isInitialized() && !sendingMailbox.isClosed()) {
+        sendingMailbox.cancel(t);
+      }
+    }
+  }

Review Comment:
   for symmetric point of view. we can also add a "releaseSendingMailbox" 
method in MailboxService, 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to