sajjad-moradi commented on code in PR #13695:
URL: https://github.com/apache/pinot/pull/13695#discussion_r1725762115


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -679,6 +699,249 @@ private boolean processStreamEvents(MessageBatch 
messageBatch, long idlePipeSlee
     return prematureExit;
   }
 
+  /**
+   * TODO: move all processStreamEvents to a separate interface
+   * prcoessStreamEventsAsync is a method that processes a batch of messages 
asynchronously. It will asynchronously
+   * doing the (decode + transform) and indexing of the messages.
+   * It will keep the order of the messages and do same strict _currentOffset 
update as serial processing.
+   * @param messageBatch batch of messages to process
+   * @param idlePipeSleepTimeMillis wait time in case no messages were read
+   * @return returns <code>true</code> if the process loop ended before 
processing the batch, <code>false</code>
+   * otherwise
+   */
+  private boolean processStreamEventsAsync(MessageBatch messageBatch, long 
idlePipeSleepTimeMillis) {
+    int messageCount = messageBatch.getMessageCount();
+    _partitionRateLimiter.throttle(messageCount);
+    _serverRateLimiter.throttle(messageCount);
+
+    ThreadLocal<PinotMeter> realtimeRowsConsumedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeBytesIngestedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeRowsDroppedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeIncompleteRowsConsumedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+
+    AtomicInteger indexedMessageCount = new AtomicInteger(0);
+    AtomicInteger streamMessageCount = new AtomicInteger(0);
+    AtomicBoolean canTakeMore = new AtomicBoolean(true);
+    boolean hasTransformedRows = false;
+
+    AtomicBoolean prematureExit = new AtomicBoolean(false);
+    RowMetadata msgMetadata = null;
+
+    BlockingQueue<Pair<List<GenericRow>, Integer>> transformedQueue = new 
LinkedBlockingQueue<>();
+    AtomicInteger submittedMsgCount = new AtomicInteger(0);
+    // TODO: tune the number of threads
+    ExecutorService decodeAndTransformExecutor = 
Executors.newFixedThreadPool(1);
+    Thread indexingThread = null;
+
+    for (int index = 0; index < messageCount; index++) {
+      prematureExit.set(prematureExit.get() || _shouldStop);
+      if (prematureExit.get()) {
+        if (_segmentLogger.isDebugEnabled()) {
+          _segmentLogger.debug("stop processing message batch early 
shouldStop: {}", _shouldStop);
+        }
+        break;
+      }
+      if (!canTakeMore.get()) {
+        // The RealtimeSegmentImpl that we are pushing rows into has indicated 
that it cannot accept any more
+        // rows. This can happen in one of two conditions:
+        // 1. We are in INITIAL_CONSUMING state, and we somehow exceeded the 
max number of rows we are allowed to
+        // consume
+        //    for this row. Something is seriously wrong, because 
endCriteriaReached() should have returned true when
+        //    we hit the row limit.
+        //    Throw an exception.
+        //
+        // 2. We are in CATCHING_UP state, and we legally hit this error due 
to unclean leader election where
+        //    offsets get changed with higher generation numbers for some 
pinot servers but not others. So, if another
+        //    server (who got a larger stream offset) asked us to catch up to 
that offset, but we are connected to a
+        //    broker who has smaller offsets, then we may try to push more 
rows into the buffer than maximum. This
+        //    is a rare case, and we really don't know how to handle this at 
this time.
+        //    Throw an exception.
+        //
+        _segmentLogger
+            .error("Buffer full with {} rows consumed (row limit {}, indexed 
{})", _numRowsConsumed, _numRowsIndexed,
+                _segmentMaxRowCount);
+        throw new RuntimeException("Realtime segment full");
+      }
+
+      final int idx = index; // To bypass the final check in lambda
+      submittedMsgCount.incrementAndGet();
+      decodeAndTransformExecutor.submit(() -> {
+        try {
+          if (prematureExit.get()) {
+            return;
+          }
+          // TODO: ReusedResult cannot be easily used in parallel processing, 
find a better way to reduce mem usage
+          Pair<List<GenericRow>, Integer> transformedResults =
+              decodeAndTransformEvent(messageBatch, idx,
+                  realtimeRowsDroppedMeterThreadLocal.get(), 
realtimeIncompleteRowsConsumedMeterThreadLocal.get());
+          if (transformedResults != null) {
+            transformedQueue.put(transformedResults);
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt(); // Handle interruption
+          _segmentLogger.error("Caught InterruptedException while decoding and 
transforming event", e);
+        } finally {
+          submittedMsgCount.decrementAndGet();
+        }
+      });
+    }
+
+    indexingThread = new Thread(() -> {
+      try {
+        while (true) {
+          Pair<List<GenericRow>, Integer> transformedResults =
+              transformedQueue.poll(1, TimeUnit.SECONDS); // Poll with timeout
+          if (transformedResults != null) {
+            indexRows(transformedResults, messageBatch,
+                canTakeMore, prematureExit, indexedMessageCount, 
streamMessageCount,
+                realtimeRowsConsumedMeterThreadLocal.get(), 
realtimeBytesIngestedMeterThreadLocal.get());
+          }
+          if (prematureExit.get() || submittedMsgCount.get() == 0 && 
transformedQueue.isEmpty()) {
+            break; // Exit if all transformation tasks are done and queue is 
empty
+          }
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt(); // Handle interruption
+      }
+    });
+
+    indexingThread.start();

Review Comment:
   Consumer thread for the partition is the one kicking off this 
"indexingThread". I don't understand why we kick off a separate thread, and 
then in the next line, we wait for it to finish. What's the difference if we 
don't spin off a new thread, and use the main thread (consuming thread) to do 
the indexing?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -679,6 +699,249 @@ private boolean processStreamEvents(MessageBatch 
messageBatch, long idlePipeSlee
     return prematureExit;
   }
 
+  /**
+   * TODO: move all processStreamEvents to a separate interface
+   * prcoessStreamEventsAsync is a method that processes a batch of messages 
asynchronously. It will asynchronously
+   * doing the (decode + transform) and indexing of the messages.
+   * It will keep the order of the messages and do same strict _currentOffset 
update as serial processing.
+   * @param messageBatch batch of messages to process
+   * @param idlePipeSleepTimeMillis wait time in case no messages were read
+   * @return returns <code>true</code> if the process loop ended before 
processing the batch, <code>false</code>
+   * otherwise
+   */
+  private boolean processStreamEventsAsync(MessageBatch messageBatch, long 
idlePipeSleepTimeMillis) {
+    int messageCount = messageBatch.getMessageCount();
+    _partitionRateLimiter.throttle(messageCount);
+    _serverRateLimiter.throttle(messageCount);
+
+    ThreadLocal<PinotMeter> realtimeRowsConsumedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeBytesIngestedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeRowsDroppedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeIncompleteRowsConsumedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+
+    AtomicInteger indexedMessageCount = new AtomicInteger(0);
+    AtomicInteger streamMessageCount = new AtomicInteger(0);
+    AtomicBoolean canTakeMore = new AtomicBoolean(true);
+    boolean hasTransformedRows = false;
+
+    AtomicBoolean prematureExit = new AtomicBoolean(false);
+    RowMetadata msgMetadata = null;
+
+    BlockingQueue<Pair<List<GenericRow>, Integer>> transformedQueue = new 
LinkedBlockingQueue<>();
+    AtomicInteger submittedMsgCount = new AtomicInteger(0);
+    // TODO: tune the number of threads

Review Comment:
   We can't have more than one thread here otherwise the order of indexed rows 
will be different, and that's something we can't tolerate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to