gortiz commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1289857744


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java:
##########
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.utils;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BlockingMultiStreamConsumer.class);
+  private final Object _id;
+  protected final List<? extends AsyncStream<E>> _mailboxes;
+  protected final ArrayBlockingQueue<Boolean> _newDataReady = new 
ArrayBlockingQueue<>(1);
+  private final long _deadlineMs;
+  /**
+   * An index that used to calculate where do we are going to start reading.
+   * The invariant is that we are always going to start reading from {@code 
_lastRead + 1}.
+   * Therefore {@link #_lastRead} must be in the range {@code [-1, 
mailbox.size() - 1]}
+   */
+  protected int _lastRead;
+  private E _errorBlock = null;
+
+  public BlockingMultiStreamConsumer(Object id, long deadlineMs, List<? 
extends AsyncStream<E>> asyncProducers) {
+    _id = id;
+    _deadlineMs = deadlineMs;
+    AsyncStream.OnNewData onNewData = this::onData;
+    _mailboxes = asyncProducers;
+    _mailboxes.forEach(blockProducer -> 
blockProducer.addOnNewDataListener(onNewData));
+    _lastRead = _mailboxes.size() - 1;
+  }
+
+  protected abstract boolean isError(E element);
+
+  protected abstract boolean isEos(E element);
+
+  protected abstract E onTimeout();
+
+  protected abstract E onException(Exception e);
+
+  protected abstract E onEos();
+
+  @Override
+  public void close() {
+    cancelRemainingMailboxes();
+  }
+
+  public void cancel(Throwable t) {
+    cancelRemainingMailboxes();
+  }
+
+  protected void cancelRemainingMailboxes() {
+    for (AsyncStream<E> mailbox : _mailboxes) {
+      mailbox.cancel();
+    }
+  }
+
+  public void onData() {
+    if (_newDataReady.offer(Boolean.TRUE)) {
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("New data notification delivered on " + _id + ". " + 
System.identityHashCode(_newDataReady));
+      }
+    } else if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("New data notification ignored on " + _id + ". " + 
System.identityHashCode(_newDataReady));
+    }
+  }
+
+  /**
+   * Reads the next block for any ready mailbox or blocks until some of them 
is ready.
+   *
+   * The method implements a sequential read semantic. Meaning that:
+   * <ol>
+   *   <li>EOS is only returned when all mailboxes already emitted EOS or 
there are no mailboxes</li>
+   *   <li>If an error is read from a mailbox, the error is returned</li>
+   *   <li>If data is read from a mailbox, that data block is returned</li>
+   *   <li>If no mailbox is ready, the calling thread is blocked</li>
+   * </ol>
+   *
+   * Right now the implementation tries to be fair. If one call returned the 
block from mailbox {@code i}, then next
+   * call will look for mailbox {@code i+1}, {@code i+2}... in a circular 
manner.
+   *
+   * In order to unblock a thread blocked here, {@link #onData()} should be 
called.   *
+   */
+  public E readBlockBlocking() {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("==[RECEIVE]== Enter getNextBlock from: " + _id + " 
mailboxSize: " + _mailboxes.size());
+    }
+    // Standard optimistic execution. First we try to read without acquiring 
the lock.
+    E block = readDroppingSuccessEos();
+    long timeoutMs = _deadlineMs - System.currentTimeMillis();
+    boolean timeout = false;
+    try {
+      while (block == null) { // we didn't find a mailbox ready to read, so we 
need to be pessimistic

Review Comment:
   > Once we reach the timeout we should immediately return timeout error.
   
   We do, although the code may not be 100% clear. We can timeout either in 
`timeout = _newDataReady.poll(timeoutMs, TimeUnit.MILLISECONDS) == null;` a 
couple of lines above or in `readDroppingSuccessEos`, in which case it will 
return a not null block and while will finish.
   
   We can modify the while predicate to be `while (!timeout && block == null)` 
if you found it easier to read.
   
   > Also, why do we have a while loop here? We should wait on _newDataReady 
once?
   > I somehow see why you want to have this while check
   
   Also, we protect ourself in case we the sender adds a null block. When this 
code was also used in pipeline breaker it was an important detail.
   
   But in general when dealing with concurrency is better to repeat the 
condition after the synchronization. As you say, it is difficult to reason 
about concurrency. In this case `_newDataReady` does not guarantee that there 
are data on the queue. It is just used to unblock the reader thread if waiting 
for more data. You can see in ArrayBlockingQueue implementation that the 
`notEmpty` condition there is used in the same way. In general, you should 
always check the precondition in a while when waiting on a signal.
   
   I would like to add a jcstress test here, but they are not easy to be 
include in a maven pipeline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to