gortiz commented on code in PR #16903:
URL: https://github.com/apache/pinot/pull/16903#discussion_r2409860185


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -42,7 +42,15 @@ public class InMemorySendingMailbox implements 
SendingMailbox {
   private final long _deadlineMs;
 
   private ReceivingMailbox _receivingMailbox;
+  /**

Review Comment:
   Done



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -71,31 +79,42 @@ public void send(MseBlock.Eos block, List<DataBuffer> 
serializedStats)
     sendPrivate(block, serializedStats);
   }
 
-  private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)
-      throws TimeoutException {
+  private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats) {
     if (isTerminated() || (isEarlyTerminated() && block.isData())) {
+      LOGGER.debug("Mailbox {} already terminated, ignoring block {}", _id, 
block);
       return;
     }
     if (_receivingMailbox == null) {
       _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
     }
     _statMap.merge(MailboxSendOperator.StatKey.IN_MEMORY_MESSAGES, 1);
     long timeoutMs = _deadlineMs - System.currentTimeMillis();
-    ReceivingMailbox.ReceivingMailboxStatus status = 
_receivingMailbox.offer(block, serializedStats, timeoutMs);
-
+    ReceivingMailbox.ReceivingMailboxStatus status;
+    try {
+      status = _receivingMailbox.offer(block, serializedStats, timeoutMs);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();

Review Comment:
   We can skip this if and only if we guarantee that the code that catches the 
query exception won't block. Otherwise, we would miss the interruption call and 
the thread will continue to do its stuff.
   
   The alternative (set the flag) means that if the catching logic needs to 
block on any method that checks the flag, it will fail. This is safer, as we 
make sure the thread is actually interrupted.
   
   I would say in the current code, there is no difference between restoring or 
not restoring the flag. I would feel more confident about the future code if we 
restore it, but I'll remove it if you prefer to not restore it.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -71,31 +79,42 @@ public void send(MseBlock.Eos block, List<DataBuffer> 
serializedStats)
     sendPrivate(block, serializedStats);
   }
 
-  private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)
-      throws TimeoutException {
+  private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats) {
     if (isTerminated() || (isEarlyTerminated() && block.isData())) {
+      LOGGER.debug("Mailbox {} already terminated, ignoring block {}", _id, 
block);
       return;
     }
     if (_receivingMailbox == null) {
       _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
     }
     _statMap.merge(MailboxSendOperator.StatKey.IN_MEMORY_MESSAGES, 1);
     long timeoutMs = _deadlineMs - System.currentTimeMillis();
-    ReceivingMailbox.ReceivingMailboxStatus status = 
_receivingMailbox.offer(block, serializedStats, timeoutMs);
-
+    ReceivingMailbox.ReceivingMailboxStatus status;
+    try {
+      status = _receivingMailbox.offer(block, serializedStats, timeoutMs);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new QueryException(QueryErrorCode.INTERNAL, "Interrupted while 
sending data to mailbox: " + _id, e);
+    } catch (TimeoutException e) {
+      throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT,
+          String.format("Timed out adding block into mailbox: %s with timeout: 
%dms", _id, timeoutMs));
+    }
+    _isEarlyTerminated = status != 
ReceivingMailbox.ReceivingMailboxStatus.SUCCESS;
     switch (status) {
       case SUCCESS:
+      case WAITING_EOS:
+        break;
+      case LAST_BLOCK:
+        _isTerminated = true;
         break;
-      case CANCELLED:
-        throw new QueryCancelledException(String.format("Mailbox: %s already 
cancelled from upstream", _id));
-      case ERROR:
-        throw new QueryException(QueryErrorCode.INTERNAL, String.format(
-            "Mailbox: %s already errored out (received error block before)", 
_id));
-      case TIMEOUT:
-        throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT,
-            String.format("Timed out adding block into mailbox: %s with 
timeout: %dms", _id, timeoutMs));
-      case EARLY_TERMINATED:
-        _isEarlyTerminated = true;
+      case ALREADY_TERMINATED:
+        if (_isTerminated) {
+          LOGGER.debug("Local mailbox received a late message once the stream 
was closed. This can happen due to "
+              + "race condition between sending the last block and closing the 
stream on the sender side");
+        } else {
+          throw new QueryException(QueryErrorCode.INTERNAL, String.format(

Review Comment:
   If we reach this code, the implementation is not correct. ALREADY_TERMINATED 
is only returned when the receiving mailbox is in FULL_CLOSED or 
UPSTREAM_FINISHED. This means it could only have happened if the same thread 
sent an EOS and then more blocks, which should never have happened. IIRC this 
is similar to the older ERROR case.
   
   What do we do in this situation? Throwing an exception is the best idea. In 
fact, I would even remove the `if (_isTerminated)` branch, as that is the 
impossible case. If `is_terminated` is true, we don't even offer the block, as 
we have a check at the beginning of this method.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -315,4 +279,377 @@ public List<DataBuffer> getSerializedStats() {
       return _serializedStats;
     }
   }
+
+  /// The state of the queue.
+  ///
+  /// ```
+  /// +-------------------+   offerEos    +-------------------+
+  /// |    FULL_OPEN      | ----------->  |  UPSTREAM_FINISHED|
+  /// +-------------------+               +-------------------+
+  ///       |                                 |
+  ///       | earlyTerminate                  | poll -- when all pending data 
is read
+  ///       v                                 v
+  /// +-------------------+   offerEos   +-------------------+
+  /// |   WAITING_EOS     | -----------> |   FULL_CLOSED     |
+  /// +-------------------+              +-------------------+
+  /// ```
+  private enum State {
+    /// The queue is open for both read and write.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue, or null if the 
queue is empty.
+    /// - [#offer] accepts both data and EOS blocks.
+    ///
+    /// Transitions to [State#UPSTREAM_FINISHED] when an EOS block is offered 
or to [State#WAITING_EOS] when
+    /// [#earlyTerminate()] is called.
+    FULL_OPEN,
+    /// The downstream is not interested in reading more data but is waiting 
for an EOS block to get the stats.
+    ///
+    /// - [#poll()] returns null.
+    /// - [#offer] rejects all data blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when an EOS block is offered.
+    WAITING_EOS,
+    /// The upstream has indicated that no more data will be sent.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue and then the EOS 
block.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when the EOS block is read by 
[#poll()].
+    UPSTREAM_FINISHED,
+    /// The queue is closed for both read and write.
+    ///
+    /// - [#poll()] always returns the EOS block, which is always not null.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// No transitions out of this state.
+    FULL_CLOSED
+  }
+
+  /// This is a special bounded blocking queue implementation similar to 
ArrayBlockingQueue, but:
+  /// - Only accepts a single reader (aka downstream).
+  /// - Only accepts a multiple concurrent writers (aka upstream)
+  /// - Can be [closed for write][#closeForWrite(MseBlock.Eos, List)].
+  /// - Can be [#earlyTerminate()]d.
+  ///
+  /// Read the [State] enum to understand the different states and their 
transitions.
+  ///
+  /// All methods of this class are thread-safe and may block, although only 
[#offer] should block for a long time.
+  @ThreadSafe
+  private static class CancellableBlockingQueue {

Review Comment:
   I've added some tests, but it is difficult to test all situations without 
using brute force (ie running the test multiple times)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java:
##########
@@ -42,6 +42,7 @@ public interface SendingMailbox {
    * and they should <b>not</b> acquire any resources when they are created. 
This method should throw if there was an
    * error sending the data, since that would allow {@link BlockExchange} to 
exit early.
    */
+  // TODO: Remove throws as we don't throw these exceptions anymore

Review Comment:
   I already applied that change but reverted it to reduce the number of 
changes in this PR and keep the focus on the queue change, which is already 
complex.
   
   But if you are ok, I can add that change back.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -55,11 +55,14 @@ public class MailboxService {
       
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY_SECONDS,
 TimeUnit.SECONDS)
           .removalListener((RemovalListener<String, ReceivingMailbox>) 
notification -> {
             if (notification.wasEvicted()) {
-              int numPendingBlocks = 
notification.getValue().getNumPendingBlocks();
+              ReceivingMailbox receivingMailbox = notification.getValue();
+              int numPendingBlocks = receivingMailbox.getNumPendingBlocks();
               if (numPendingBlocks > 0) {
                 LOGGER.warn("Evicting dangling receiving mailbox: {} with {} 
pending blocks", notification.getKey(),
                     numPendingBlocks);
               }
+              // In case there is a leak, we should cancel the mailbox to 
unblock any waiters and release resources.
+              receivingMailbox.cancel();

Review Comment:
   Probably this won't ever be called now, given we also unblock waiters when 
the receiver operator finishes (either successfully or not), but it is safer to 
keep it just in case we missed something



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -268,7 +216,23 @@ public interface Reader {
   }
 
   public enum ReceivingMailboxStatus {
-    SUCCESS, FIRST_ERROR, ERROR, TIMEOUT, CANCELLED, EARLY_TERMINATED
+    /// The block was successfully added to the mailbox.
+    ///
+    /// More blocks can be sent.
+    SUCCESS,
+    /// The block is rejected because downstream has early terminated and now 
is only waiting for EOS in order to
+    /// get the stats.
+    WAITING_EOS,
+    /// The received message is the last block the mailbox will ever read.
+    ///
+    /// This happens for example when an EOS block is added to the mailbox.
+    ///
+    /// No more blocks can be sent.
+    LAST_BLOCK,
+    /// The mailbox has been closed for write. Only EOS blocks are expected.

Review Comment:
   No, the javadoc is incorrect here. I've just fixed it



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -71,31 +79,42 @@ public void send(MseBlock.Eos block, List<DataBuffer> 
serializedStats)
     sendPrivate(block, serializedStats);
   }
 
-  private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)
-      throws TimeoutException {
+  private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats) {
     if (isTerminated() || (isEarlyTerminated() && block.isData())) {
+      LOGGER.debug("Mailbox {} already terminated, ignoring block {}", _id, 
block);
       return;
     }
     if (_receivingMailbox == null) {
       _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
     }
     _statMap.merge(MailboxSendOperator.StatKey.IN_MEMORY_MESSAGES, 1);
     long timeoutMs = _deadlineMs - System.currentTimeMillis();
-    ReceivingMailbox.ReceivingMailboxStatus status = 
_receivingMailbox.offer(block, serializedStats, timeoutMs);
-
+    ReceivingMailbox.ReceivingMailboxStatus status;
+    try {
+      status = _receivingMailbox.offer(block, serializedStats, timeoutMs);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new QueryException(QueryErrorCode.INTERNAL, "Interrupted while 
sending data to mailbox: " + _id, e);
+    } catch (TimeoutException e) {
+      throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT,
+          String.format("Timed out adding block into mailbox: %s with timeout: 
%dms", _id, timeoutMs));
+    }
+    _isEarlyTerminated = status != 
ReceivingMailbox.ReceivingMailboxStatus.SUCCESS;

Review Comment:
   That is the same discussion we had in 
https://github.com/apache/pinot/pull/16919#discussion_r2393173235. I think it 
is safe to mark early terminated in all cases but SUCCESS, but let's keep it as 
you suggest



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -315,4 +279,377 @@ public List<DataBuffer> getSerializedStats() {
       return _serializedStats;
     }
   }
+
+  /// The state of the queue.
+  ///
+  /// ```
+  /// +-------------------+   offerEos    +-------------------+
+  /// |    FULL_OPEN      | ----------->  |  UPSTREAM_FINISHED|
+  /// +-------------------+               +-------------------+
+  ///       |                                 |
+  ///       | earlyTerminate                  | poll -- when all pending data 
is read
+  ///       v                                 v
+  /// +-------------------+   offerEos   +-------------------+
+  /// |   WAITING_EOS     | -----------> |   FULL_CLOSED     |
+  /// +-------------------+              +-------------------+
+  /// ```
+  private enum State {
+    /// The queue is open for both read and write.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue, or null if the 
queue is empty.
+    /// - [#offer] accepts both data and EOS blocks.
+    ///
+    /// Transitions to [State#UPSTREAM_FINISHED] when an EOS block is offered 
or to [State#WAITING_EOS] when
+    /// [#earlyTerminate()] is called.
+    FULL_OPEN,
+    /// The downstream is not interested in reading more data but is waiting 
for an EOS block to get the stats.
+    ///
+    /// - [#poll()] returns null.
+    /// - [#offer] rejects all data blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when an EOS block is offered.
+    WAITING_EOS,
+    /// The upstream has indicated that no more data will be sent.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue and then the EOS 
block.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when the EOS block is read by 
[#poll()].
+    UPSTREAM_FINISHED,
+    /// The queue is closed for both read and write.
+    ///
+    /// - [#poll()] always returns the EOS block, which is always not null.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// No transitions out of this state.
+    FULL_CLOSED
+  }
+
+  /// This is a special bounded blocking queue implementation similar to 
ArrayBlockingQueue, but:
+  /// - Only accepts a single reader (aka downstream).
+  /// - Only accepts a multiple concurrent writers (aka upstream)
+  /// - Can be [closed for write][#closeForWrite(MseBlock.Eos, List)].
+  /// - Can be [#earlyTerminate()]d.
+  ///
+  /// Read the [State] enum to understand the different states and their 
transitions.
+  ///
+  /// All methods of this class are thread-safe and may block, although only 
[#offer] should block for a long time.
+  @ThreadSafe
+  private static class CancellableBlockingQueue {
+    private final String _id;
+    @Nullable
+    private volatile Reader _reader;
+    /// This is set when the queue is in [State#FULL_CLOSED] or 
[State#UPSTREAM_FINISHED].
+    @Nullable
+    @GuardedBy("_lock")
+    private MseBlockWithStats _eos;
+    /// The current state of the queue.
+    ///
+    /// All changes to this field must be done by calling [#changeState(State, 
String)] in order to log the state
+    /// transitions.
+    @GuardedBy("_lock")
+    private State _state = State.FULL_OPEN;
+    /// The items in the queue.
+    ///
+    /// This is a circular array where [#_putIndex] is the index to add the 
next item and [#_takeIndex] is the index to
+    /// take the next item from. Only data blocks are stored in this array, 
the EOS block is stored in [#_eos].
+    ///
+    /// Like in normal blocking queues, elements are added when upstream 
threads call [#offer] and removed when the
+    /// downstream thread calls [#poll]. Unlike normal blocking queues, 
elements will be [removed][#drainDataBlocks()]
+    /// when transitioning to [State#WAITING_EOS] or [State#FULL_CLOSED].
+    @GuardedBy("_lock")
+    private final MseBlock.Data[] _dataBlocks;
+    @GuardedBy("_lock")
+    private int _takeIndex;
+    @GuardedBy("_lock")
+    private int _putIndex;
+    @GuardedBy("_lock")
+    private int _count;
+    /// Threads waiting to add more data to the queue.
+    ///
+    /// This is used to prevent the following situation:
+    /// 1. The queue is full.
+    /// 2. Thread A tries to add data. Thread A will be blocked waiting for 
space in the queue.
+    /// 3. Thread B adds an EOS block, which will transition the queue to 
[State#UPSTREAM_FINISHED].
+    /// 4. Thread C reads data from the queue in a loop, the scheduler doesn't 
give time to Thread A.
+    /// 5. Thread C consumes all data from the queue and then reads the EOS 
block.
+    /// 6. Finally Thread A is unblocked and adds data to the queue, even 
though the queue is already closed for write
+    ///
+    /// As a result the block from A will be lost. Instead, we use this 
counter to return null in [#poll] when the
+    /// queue is empty but there are still threads trying to add data to the 
queue.
+    @GuardedBy("_lock")
+    private int _pendingData;
+    private final ReentrantLock _lock = new ReentrantLock();
+    private final Condition _notFull = _lock.newCondition();
+
+    public CancellableBlockingQueue(String id, int capacity) {
+      _id = id;
+      _dataBlocks = new MseBlock.Data[capacity];
+    }
+
+    /// Notifies the downstream that there is data to read.
+    private void notifyReader() {
+      Reader reader = _reader;
+      if (reader != null) {
+        LOGGER.debug("Notifying reader");
+        reader.blockReadyToRead();
+      } else {
+        LOGGER.debug("No reader to notify");
+      }
+    }
+
+    /// Offers a successful or erroneous EOS block into the queue, returning 
the status of the operation.
+    ///
+    /// This method never blocks for long, as it doesn't need to wait for 
space in the queue.
+    public ReceivingMailboxStatus offerEos(MseBlock.Eos block, 
List<DataBuffer> stats) {
+      ReentrantLock lock = _lock;
+      lock.lock();
+      try {
+        switch (_state) {
+          case FULL_CLOSED:
+          case UPSTREAM_FINISHED:
+            // The queue is closed for write. Always reject the block.
+            LOGGER.debug("Mailbox: {} is already closed for write, ignoring 
the late {} block", _id, block);
+            return ReceivingMailboxStatus.ALREADY_TERMINATED;
+          case WAITING_EOS:
+            // We got the EOS block we expected. Close the queue for both read 
and write.
+            changeState(State.FULL_CLOSED, "received EOS block");
+            _eos = new MseBlockWithStats(block, stats);
+            notifyReader();
+            return ReceivingMailboxStatus.LAST_BLOCK;
+          case FULL_OPEN:
+            changeState(State.UPSTREAM_FINISHED, "received EOS block");
+            _eos = new MseBlockWithStats(block, stats);
+            notifyReader();
+            if (block.isError()) {
+              drainDataBlocks();
+              _notFull.signal();
+            }
+            return ReceivingMailboxStatus.LAST_BLOCK;
+          default:
+            throw new IllegalStateException("Unexpected state: " + _state);
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    /// Offers a data block into the queue within the timeout specified, 
returning the status of the operation.
+    public ReceivingMailboxStatus offerData(MseBlock.Data block, long timeout, 
TimeUnit timeUnit)
+        throws InterruptedException, TimeoutException {
+      ReentrantLock lock = _lock;
+      lock.lockInterruptibly();
+      try {
+        while (true) {
+          switch (_state) {
+            case FULL_CLOSED:
+            case UPSTREAM_FINISHED:
+              // The queue is closed for write. Always reject the block.
+              LOGGER.debug("Mailbox: {} is already closed for write, ignoring 
the late data block", _id);
+              return ReceivingMailboxStatus.ALREADY_TERMINATED;
+            case WAITING_EOS:
+              // The downstream is not interested in reading more data.
+              LOGGER.debug("Mailbox: {} is not interesting in late data 
block", _id);
+              return ReceivingMailboxStatus.WAITING_EOS;
+            case FULL_OPEN:
+              if (offerDataToBuffer(block, timeout, timeUnit)) {
+                notifyReader();
+                return ReceivingMailboxStatus.SUCCESS;
+              }
+              // otherwise transitioned to FULL_CLOSED or WAITING_EOS while 
waiting for space in the queue
+              // and we need to re-evaluate the state
+              break;
+            default:
+              throw new IllegalStateException("Unexpected state: " + _state);
+          }
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    /// Offers a data block into the queue within the timeout specified, 
returning true if the block was added
+    /// successfully.
+    ///
+    /// This method can only be called while the queue is in the FULL_OPEN 
state and the lock is held.
+    ///
+    /// This method can time out, in which case we automatically transition to 
the [State#FULL_CLOSED] state.
+    /// But instead of returning false, we throw a [TimeoutException]. This is 
because the caller may want to
+    /// distinguish between a timeout and other reasons for not being able to 
add the block to the queue in order to
+    /// report different error messages.
+    ///
+    /// @return true if the block was added successfully, false if the state 
changed while waiting.
+    /// @throws InterruptedException if the thread is interrupted while 
waiting for space in the queue.
+    /// @throws TimeoutException if the timeout specified elapsed before space 
was available in the queue.
+    @GuardedBy("_lock")
+    private boolean offerDataToBuffer(MseBlock.Data block, long timeout, 
TimeUnit timeUnit)
+        throws InterruptedException, TimeoutException {
+
+      assert _state == State.FULL_OPEN;
+
+      long nanos = timeUnit.toNanos(timeout);
+      MseBlock.Data[] items = _dataBlocks;
+      _pendingData++;
+      try {
+        while (_count == items.length && nanos > 0L) {
+          nanos = _notFull.awaitNanos(nanos);
+
+          switch (_state) {
+            case FULL_OPEN: // we are in the same state, continue waiting for 
space
+              break;
+            case FULL_CLOSED:
+            case WAITING_EOS:
+              // The queue is closed and the reader is not interested in 
reading more data.
+              return false;
+            case UPSTREAM_FINISHED:
+              // Another thread offered the EOS while we were waiting for 
space.
+              assert _eos != null;
+              if (_eos._block.isSuccess()) { // If closed with EOS, the reader 
is still interested in reading our block
+                continue;
+              }
+              // if closed with an error, the reader is not interested in 
reading our block
+              return false;
+            default:
+              throw new IllegalStateException("Unexpected state: " + _state);
+          }
+        }
+        if (nanos <= 0L) { // timed out
+          String errorMessage = "Timed out while waiting for receive operator 
to consume data from mailbox: " + _id;
+          ErrorMseBlock timeoutBlock = 
ErrorMseBlock.fromError(QueryErrorCode.EXECUTION_TIMEOUT, errorMessage);
+          changeState(State.FULL_CLOSED, "timed out while waiting to offer 
data block");
+          drainDataBlocks();
+          _eos = new MseBlockWithStats(timeoutBlock, List.of());
+          notifyReader();
+          throw new TimeoutException(errorMessage);
+        }
+        items[_putIndex] = block;
+        if (++_putIndex == items.length) {
+          _putIndex = 0;
+        }
+        _count++;
+        return true;
+      } finally {
+        _pendingData--;
+      }
+    }
+
+    /// Returns the first block from the queue, or `null` if there is no block 
in the queue. The returned block will be
+    /// an error block if the queue has been cancelled or has encountered an 
error.
+    ///
+    /// This method may block briefly while acquiring the lock, but it doesn't 
actually require waiting for data in the
+    /// queue.
+    @Nullable
+    public MseBlockWithStats poll() {
+      Preconditions.checkState(_reader != null, "A reader must be registered");
+      ReentrantLock lock = _lock;
+      lock.lock();
+      try {
+        switch (_state) {
+          case FULL_CLOSED:
+            // The queue is closed for both read and write. Always return the 
error block.
+            assert _eos != null;
+            return _eos;
+          case WAITING_EOS:
+            // The downstream is not interested in reading more data but is 
waiting for an EOS block to get the stats.
+            // Polls returns null and only EOS blocks are accepted by offer.
+            assert _eos == null;
+            return null;
+          case UPSTREAM_FINISHED:
+            // The upstream has indicated that no more data will be sent. Poll 
returns pending blocks and then the EOS
+            // block.
+            if (_count == 0) {
+              if (_pendingData > 0) {
+                // There are still threads trying to add data to the queue. We 
should wait for them to finish.
+                LOGGER.debug("Mailbox: {} has pending {} data blocks, waiting 
for them to finish", _id, _pendingData);
+                return null;
+              } else {
+                changeState(State.FULL_CLOSED, "read all data blocks");
+                return _eos;
+              }
+            }
+            break;
+          case FULL_OPEN:
+            if (_count == 0) {
+              assert _eos == null;
+              return null;
+            }
+            break;
+          default:
+            throw new IllegalStateException("Unexpected state: " + _state);
+        }
+        assert _count > 0 : "if we reach here, there must be data in the 
queue";
+        MseBlock.Data[] items = _dataBlocks;
+        MseBlock.Data block = items[_takeIndex];
+        assert block != null : "data block in the queue must not be null";
+        items[_takeIndex] = null;
+        if (++_takeIndex == items.length) {
+          _takeIndex = 0;
+        }
+        _count--;
+        _notFull.signal();
+
+        return new MseBlockWithStats(block, List.of());
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @GuardedBy("_lock")
+    private void changeState(State newState, String desc) {
+      LOGGER.debug("Mailbox: {} {}, transitioning from {} to {}", _id, desc, 
_state, newState);
+      _state = newState;
+    }
+
+    @GuardedBy("_lock")
+    private void drainDataBlocks() {
+      Arrays.fill(_dataBlocks, null);
+      _count = 0;

Review Comment:
   Yes, you are right. I'm moving the signal from the caller of this method to 
this method. There were some callers who (incorrectly) didn't signal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to