Copilot commented on code in PR #16903:
URL: https://github.com/apache/pinot/pull/16903#discussion_r2402194481


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -315,4 +298,377 @@ public List<DataBuffer> getSerializedStats() {
       return _serializedStats;
     }
   }
+
+  /// The state of the queue.
+  ///
+  /// ```
+  /// +-------------------+   offerEos    +-------------------+
+  /// |    FULL_OPEN      | ----------->  |  UPSTREAM_FINISHED|
+  /// +-------------------+               +-------------------+
+  ///       |                                 |
+  ///       | earlyTerminate                  | poll -- when all pending data 
is read
+  ///       v                                 v
+  /// +-------------------+   offerEos   +-------------------+
+  /// |   WAITING_EOS     | -----------> |   FULL_CLOSED     |
+  /// +-------------------+              +-------------------+
+  /// ```

Review Comment:
   The documentation uses /// for Javadoc comments instead of the standard /** 
*/ format. According to the project's coding standards, Javadoc comments should 
use either /** */ or /// as specified in JEP-467, but consistency within the 
file should be maintained. The rest of the file uses /** */ format.
   ```suggestion
     /**
      * The state of the queue.
      *
      * <pre>
      * +-------------------+   offerEos    +-------------------+
      * |    FULL_OPEN      | ----------->  |  UPSTREAM_FINISHED|
      * +-------------------+               +-------------------+
      *       |                                 |
      *       | earlyTerminate                  | poll -- when all pending data 
is read
      *       v                                 v
      * +-------------------+   offerEos   +-------------------+
      * |   WAITING_EOS     | -----------> |   FULL_CLOSED     |
      * +-------------------+              +-------------------+
      * </pre>
      */
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -315,4 +298,377 @@ public List<DataBuffer> getSerializedStats() {
       return _serializedStats;
     }
   }
+
+  /// The state of the queue.
+  ///
+  /// ```
+  /// +-------------------+   offerEos    +-------------------+
+  /// |    FULL_OPEN      | ----------->  |  UPSTREAM_FINISHED|
+  /// +-------------------+               +-------------------+
+  ///       |                                 |
+  ///       | earlyTerminate                  | poll -- when all pending data 
is read
+  ///       v                                 v
+  /// +-------------------+   offerEos   +-------------------+
+  /// |   WAITING_EOS     | -----------> |   FULL_CLOSED     |
+  /// +-------------------+              +-------------------+
+  /// ```
+  private enum State {
+    /// The queue is open for both read and write.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue, or null if the 
queue is empty.
+    /// - [#offer] accepts both data and EOS blocks.
+    ///
+    /// Transitions to [State#UPSTREAM_FINISHED] when an EOS block is offered 
or to [State#WAITING_EOS] when
+    /// [#earlyTerminate()] is called.
+    FULL_OPEN,
+    /// The downstream is not interested in reading more data but is waiting 
for an EOS block to get the stats.
+    ///
+    /// - [#poll()] returns null.
+    /// - [#offer] rejects all data blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when an EOS block is offered.
+    WAITING_EOS,
+    /// The upstream has indicated that no more data will be sent.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue and then the EOS 
block.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when the EOS block is read by 
[#poll()].
+    UPSTREAM_FINISHED,
+    /// The queue is closed for both read and write.
+    ///
+    /// - [#poll()] always returns the EOS block, which is always not null.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// No transitions out of this state.
+    FULL_CLOSED
+  }
+
+  /// This is a special bounded blocking queue implementation similar to 
ArrayBlockingQueue, but:
+  /// - Only accepts a single reader (aka downstream).
+  /// - Only accepts a multiple concurrent writers (aka upstream)
+  /// - Can be [closed for write][#closeForWrite(MseBlock.Eos, List)].
+  /// - Can be [#earlyTerminate()]d.
+  ///
+  /// Read the [State] enum to understand the different states and their 
transitions.
+  ///
+  /// All methods of this class are thread-safe and may block, although only 
[#offer] should block for a long time.

Review Comment:
   Similar documentation style inconsistency. The comment uses /// format while 
the rest of the class uses /** */ format for documentation.
   ```suggestion
     /**
      * +-------------------+   offerEos    +-------------------+
      * |    FULL_OPEN      | ----------->  |  UPSTREAM_FINISHED|
      * +-------------------+               +-------------------+
      *       |                                 |
      *       | earlyTerminate                  | poll -- when all pending data 
is read
      *       v                                 v
      * +-------------------+   offerEos   +-------------------+
      * |   WAITING_EOS     | -----------> |   FULL_CLOSED     |
      * +-------------------+              +-------------------+
      * ```
      */
     private enum State {
       /**
        * The queue is open for both read and write.
        *
        * - [#poll()] returns the pending blocks in the queue, or null if the 
queue is empty.
        * - [#offer] accepts both data and EOS blocks.
        *
        * Transitions to [State#UPSTREAM_FINISHED] when an EOS block is offered 
or to [State#WAITING_EOS] when
        * [#earlyTerminate()] is called.
        */
       FULL_OPEN,
       /**
        * The downstream is not interested in reading more data but is waiting 
for an EOS block to get the stats.
        *
        * - [#poll()] returns null.
        * - [#offer] rejects all data blocks.
        *
        * Transitions to [State#FULL_CLOSED] when an EOS block is offered.
        */
       WAITING_EOS,
       /**
        * The upstream has indicated that no more data will be sent.
        *
        * - [#poll()] returns the pending blocks in the queue and then the EOS 
block.
        * - [#offer] rejects all blocks.
        *
        * Transitions to [State#FULL_CLOSED] when the EOS block is read by 
[#poll()].
        */
       UPSTREAM_FINISHED,
       /**
        * The queue is closed for both read and write.
        *
        * - [#poll()] always returns the EOS block, which is always not null.
        * - [#offer] rejects all blocks.
        *
        * No transitions out of this state.
        */
       FULL_CLOSED
     }
   
     /**
      * This is a special bounded blocking queue implementation similar to 
ArrayBlockingQueue, but:
      * - Only accepts a single reader (aka downstream).
      * - Only accepts a multiple concurrent writers (aka upstream)
      * - Can be [closed for write][#closeForWrite(MseBlock.Eos, List)].
      * - Can be [#earlyTerminate()]d.
      *
      * Read the [State] enum to understand the different states and their 
transitions.
      *
      * All methods of this class are thread-safe and may block, although only 
[#offer] should block for a long time.
      */
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -315,4 +298,377 @@ public List<DataBuffer> getSerializedStats() {
       return _serializedStats;
     }
   }
+
+  /// The state of the queue.
+  ///
+  /// ```
+  /// +-------------------+   offerEos    +-------------------+
+  /// |    FULL_OPEN      | ----------->  |  UPSTREAM_FINISHED|
+  /// +-------------------+               +-------------------+
+  ///       |                                 |
+  ///       | earlyTerminate                  | poll -- when all pending data 
is read
+  ///       v                                 v
+  /// +-------------------+   offerEos   +-------------------+
+  /// |   WAITING_EOS     | -----------> |   FULL_CLOSED     |
+  /// +-------------------+              +-------------------+
+  /// ```
+  private enum State {
+    /// The queue is open for both read and write.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue, or null if the 
queue is empty.
+    /// - [#offer] accepts both data and EOS blocks.
+    ///
+    /// Transitions to [State#UPSTREAM_FINISHED] when an EOS block is offered 
or to [State#WAITING_EOS] when
+    /// [#earlyTerminate()] is called.
+    FULL_OPEN,
+    /// The downstream is not interested in reading more data but is waiting 
for an EOS block to get the stats.
+    ///
+    /// - [#poll()] returns null.
+    /// - [#offer] rejects all data blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when an EOS block is offered.
+    WAITING_EOS,
+    /// The upstream has indicated that no more data will be sent.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue and then the EOS 
block.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when the EOS block is read by 
[#poll()].
+    UPSTREAM_FINISHED,
+    /// The queue is closed for both read and write.
+    ///
+    /// - [#poll()] always returns the EOS block, which is always not null.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// No transitions out of this state.
+    FULL_CLOSED
+  }
+
+  /// This is a special bounded blocking queue implementation similar to 
ArrayBlockingQueue, but:
+  /// - Only accepts a single reader (aka downstream).
+  /// - Only accepts a multiple concurrent writers (aka upstream)
+  /// - Can be [closed for write][#closeForWrite(MseBlock.Eos, List)].
+  /// - Can be [#earlyTerminate()]d.
+  ///
+  /// Read the [State] enum to understand the different states and their 
transitions.
+  ///
+  /// All methods of this class are thread-safe and may block, although only 
[#offer] should block for a long time.
+  @ThreadSafe
+  private static class CancellableBlockingQueue {
+    private final String _id;
+    @Nullable
+    private volatile Reader _reader;
+    /// This is set when the queue is in [State#FULL_CLOSED] or 
[State#UPSTREAM_FINISHED].
+    @Nullable
+    @GuardedBy("_lock")
+    private MseBlockWithStats _eos;
+    /// The current state of the queue.
+    ///
+    /// All changes to this field must be done by calling [#changeState(State, 
String)] in order to log the state
+    /// transitions.
+    @GuardedBy("_lock")
+    private State _state = State.FULL_OPEN;
+    /// The items in the queue.
+    ///
+    /// This is a circular array where [#_putIndex] is the index to add the 
next item and [#_takeIndex] is the index to
+    /// take the next item from. Only data blocks are stored in this array, 
the EOS block is stored in [#_eos].
+    ///
+    /// Like in normal blocking queues, elements are added when upstream 
threads call [#offer] and removed when the
+    /// downstream thread calls [#poll]. Unlike normal blocking queues, 
elements will be [removed][#drainDataBlocks()]
+    /// when transitioning to [State#WAITING_EOS] or [State#FULL_CLOSED].
+    @GuardedBy("_lock")
+    private final MseBlock.Data[] _dataBlocks;
+    @GuardedBy("_lock")
+    private int _takeIndex;
+    @GuardedBy("_lock")
+    private int _putIndex;
+    @GuardedBy("_lock")
+    private int _count;
+    /// Threads waiting to add more data to the queue.
+    ///
+    /// This is used to prevent the following situation:
+    /// 1. The queue is full.
+    /// 2. Thread A tries to add data. Thread A will be blocked waiting for 
space in the queue.
+    /// 3. Thread B adds an EOS block, which will transition the queue to 
[State#UPSTREAM_FINISHED].
+    /// 4. Thread C reads data from the queue in a loop, the scheduler doesn't 
give time to Thread A.
+    /// 5. Thread C consumes all data from the queue and then reads the EOS 
block.
+    /// 6. Finally Thread A is unblocked and adds data to the queue, even 
though the queue is already closed for write
+    ///
+    /// As a result the block from A will be lost. Instead, we use this 
counter to return null in [#poll] when the
+    /// queue is empty but there are still threads trying to add data to the 
queue.
+    @GuardedBy("_lock")
+    private int _pendingData;
+    private final ReentrantLock _lock = new ReentrantLock();
+    private final Condition _notFull = _lock.newCondition();
+
+    public CancellableBlockingQueue(String id, int capacity) {
+      _id = id;
+      _dataBlocks = new MseBlock.Data[capacity];
+    }
+
+    /// Notifies the downstream that there is data to read.
+    private void notifyReader() {
+      Reader reader = _reader;
+      if (reader != null) {
+        LOGGER.debug("Notifying reader");
+        reader.blockReadyToRead();
+      } else {
+        LOGGER.debug("No reader to notify");
+      }
+    }
+
+    /// Offers a successful or erroneous EOS block into the queue, returning 
the status of the operation.
+    ///
+    /// This method never blocks for long, as it doesn't need to wait for 
space in the queue.

Review Comment:
   More documentation using /// format instead of the consistent /** */ format 
used in the rest of the class.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -315,4 +298,377 @@ public List<DataBuffer> getSerializedStats() {
       return _serializedStats;
     }
   }
+
+  /// The state of the queue.
+  ///
+  /// ```
+  /// +-------------------+   offerEos    +-------------------+
+  /// |    FULL_OPEN      | ----------->  |  UPSTREAM_FINISHED|
+  /// +-------------------+               +-------------------+
+  ///       |                                 |
+  ///       | earlyTerminate                  | poll -- when all pending data 
is read
+  ///       v                                 v
+  /// +-------------------+   offerEos   +-------------------+
+  /// |   WAITING_EOS     | -----------> |   FULL_CLOSED     |
+  /// +-------------------+              +-------------------+
+  /// ```
+  private enum State {
+    /// The queue is open for both read and write.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue, or null if the 
queue is empty.
+    /// - [#offer] accepts both data and EOS blocks.
+    ///
+    /// Transitions to [State#UPSTREAM_FINISHED] when an EOS block is offered 
or to [State#WAITING_EOS] when
+    /// [#earlyTerminate()] is called.
+    FULL_OPEN,
+    /// The downstream is not interested in reading more data but is waiting 
for an EOS block to get the stats.
+    ///
+    /// - [#poll()] returns null.
+    /// - [#offer] rejects all data blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when an EOS block is offered.
+    WAITING_EOS,
+    /// The upstream has indicated that no more data will be sent.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue and then the EOS 
block.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when the EOS block is read by 
[#poll()].
+    UPSTREAM_FINISHED,
+    /// The queue is closed for both read and write.
+    ///
+    /// - [#poll()] always returns the EOS block, which is always not null.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// No transitions out of this state.
+    FULL_CLOSED
+  }
+
+  /// This is a special bounded blocking queue implementation similar to 
ArrayBlockingQueue, but:
+  /// - Only accepts a single reader (aka downstream).
+  /// - Only accepts a multiple concurrent writers (aka upstream)
+  /// - Can be [closed for write][#closeForWrite(MseBlock.Eos, List)].
+  /// - Can be [#earlyTerminate()]d.
+  ///
+  /// Read the [State] enum to understand the different states and their 
transitions.
+  ///
+  /// All methods of this class are thread-safe and may block, although only 
[#offer] should block for a long time.
+  @ThreadSafe
+  private static class CancellableBlockingQueue {
+    private final String _id;
+    @Nullable
+    private volatile Reader _reader;
+    /// This is set when the queue is in [State#FULL_CLOSED] or 
[State#UPSTREAM_FINISHED].
+    @Nullable
+    @GuardedBy("_lock")
+    private MseBlockWithStats _eos;
+    /// The current state of the queue.
+    ///
+    /// All changes to this field must be done by calling [#changeState(State, 
String)] in order to log the state
+    /// transitions.
+    @GuardedBy("_lock")
+    private State _state = State.FULL_OPEN;
+    /// The items in the queue.
+    ///
+    /// This is a circular array where [#_putIndex] is the index to add the 
next item and [#_takeIndex] is the index to
+    /// take the next item from. Only data blocks are stored in this array, 
the EOS block is stored in [#_eos].
+    ///
+    /// Like in normal blocking queues, elements are added when upstream 
threads call [#offer] and removed when the
+    /// downstream thread calls [#poll]. Unlike normal blocking queues, 
elements will be [removed][#drainDataBlocks()]
+    /// when transitioning to [State#WAITING_EOS] or [State#FULL_CLOSED].
+    @GuardedBy("_lock")
+    private final MseBlock.Data[] _dataBlocks;
+    @GuardedBy("_lock")
+    private int _takeIndex;
+    @GuardedBy("_lock")
+    private int _putIndex;
+    @GuardedBy("_lock")
+    private int _count;
+    /// Threads waiting to add more data to the queue.
+    ///
+    /// This is used to prevent the following situation:
+    /// 1. The queue is full.
+    /// 2. Thread A tries to add data. Thread A will be blocked waiting for 
space in the queue.
+    /// 3. Thread B adds an EOS block, which will transition the queue to 
[State#UPSTREAM_FINISHED].
+    /// 4. Thread C reads data from the queue in a loop, the scheduler doesn't 
give time to Thread A.
+    /// 5. Thread C consumes all data from the queue and then reads the EOS 
block.
+    /// 6. Finally Thread A is unblocked and adds data to the queue, even 
though the queue is already closed for write
+    ///
+    /// As a result the block from A will be lost. Instead, we use this 
counter to return null in [#poll] when the
+    /// queue is empty but there are still threads trying to add data to the 
queue.
+    @GuardedBy("_lock")
+    private int _pendingData;
+    private final ReentrantLock _lock = new ReentrantLock();
+    private final Condition _notFull = _lock.newCondition();
+
+    public CancellableBlockingQueue(String id, int capacity) {
+      _id = id;
+      _dataBlocks = new MseBlock.Data[capacity];
+    }
+
+    /// Notifies the downstream that there is data to read.
+    private void notifyReader() {

Review Comment:
   Documentation style inconsistency continues with /// format instead of the 
standard /** */ format.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -204,59 +191,39 @@ private synchronized void updateWaitCpuTime() {
    * Sets an error block into the mailbox. No more blocks are accepted after 
calling this method.
    */
   public void setErrorBlock(ErrorMseBlock errorBlock, List<DataBuffer> 
serializedStats) {
-    if (_errorBlock.compareAndSet(null, new MseBlockWithStats(errorBlock, 
serializedStats))) {
-      _blocks.clear();
-      notifyReader();
-    }
+    _blocks.offerEos(errorBlock, serializedStats);
   }
 
   /**
-   * Returns the first block from the mailbox, or {@code null} if there is no 
block received yet. Error block is
-   * returned if exists.
+   * Returns the first block from the mailbox, or {@code null} if there is no 
block received yet.
    */
   @Nullable
   public MseBlockWithStats poll() {
-    Preconditions.checkState(_reader != null, "A reader must be registered");
-    MseBlockWithStats errorBlock = _errorBlock.get();
-    return errorBlock != null ? errorBlock : _blocks.poll();
+    return _blocks.poll();
   }
 
   /**
-   * Early terminate the mailbox, called when upstream doesn't expect any more 
data block.
+   * Early terminate the mailbox, called when upstream doesn't expect any more 
<em>data</em> block.
    */
   public void earlyTerminate() {
-    _isEarlyTerminated = true;
+    _blocks.earlyTerminate();
   }
 
   /**
-   * Cancels the mailbox. No more blocks are accepted after calling this 
method. Should only be called by the receive
-   * operator to clean up the remaining blocks.
+   * Cancels the mailbox. No more blocks are accepted after calling this 
method and {@link #poll()} will always return
+   * an error block.
    */
   public void cancel() {
     LOGGER.debug("Cancelling mailbox: {}", _id);
-    if (_errorBlock.get() == null) {
-      MseBlockWithStats errorBlock = new MseBlockWithStats(
-          ErrorMseBlock.fromError(QueryErrorCode.EXECUTION_TIMEOUT, "Cancelled 
by receiver"),
-          Collections.emptyList());
-      if (_errorBlock.compareAndSet(null, errorBlock)) {
-        _cancelledErrorBlock = errorBlock;
-        _blocks.clear();
-      }
-    }
+    _blocks.offerEos(ErrorMseBlock.fromException(null), List.of());
   }
 
+  /// Returns the number of pending **data** blocks in the mailbox.
+  ///
+  /// EOS blocks are not counted because they will be stored separately and 
once returned, following calls to [poll]
+  /// will always return the same EOS block.

Review Comment:
   More documentation style inconsistency with /// format instead of /** */ 
format used elsewhere in the class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to