Jackie-Jiang commented on code in PR #13695:
URL: https://github.com/apache/pinot/pull/13695#discussion_r1719178241


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -679,6 +699,249 @@ private boolean processStreamEvents(MessageBatch 
messageBatch, long idlePipeSlee
     return prematureExit;
   }
 
+  /**
+   * TODO: move all processStreamEvents to a separate interface
+   * prcoessStreamEventsAsync is a method that processes a batch of messages 
asynchronously. It will asynchronously
+   * doing the (decode + transform) and indexing of the messages.
+   * It will keep the order of the messages and do same strict _currentOffset 
update as serial processing.
+   * @param messageBatch batch of messages to process
+   * @param idlePipeSleepTimeMillis wait time in case no messages were read
+   * @return returns <code>true</code> if the process loop ended before 
processing the batch, <code>false</code>
+   * otherwise
+   */
+  private boolean processStreamEventsAsync(MessageBatch messageBatch, long 
idlePipeSleepTimeMillis) {
+    int messageCount = messageBatch.getMessageCount();
+    _partitionRateLimiter.throttle(messageCount);
+    _serverRateLimiter.throttle(messageCount);
+
+    ThreadLocal<PinotMeter> realtimeRowsConsumedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeBytesIngestedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeRowsDroppedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeIncompleteRowsConsumedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+
+    AtomicInteger indexedMessageCount = new AtomicInteger(0);
+    AtomicInteger streamMessageCount = new AtomicInteger(0);
+    AtomicBoolean canTakeMore = new AtomicBoolean(true);
+    boolean hasTransformedRows = false;
+
+    AtomicBoolean prematureExit = new AtomicBoolean(false);
+    RowMetadata msgMetadata = null;
+
+    BlockingQueue<Pair<List<GenericRow>, Integer>> transformedQueue = new 
LinkedBlockingQueue<>();
+    AtomicInteger submittedMsgCount = new AtomicInteger(0);
+    // TODO: tune the number of threads
+    ExecutorService decodeAndTransformExecutor = 
Executors.newFixedThreadPool(1);
+    Thread indexingThread = null;
+
+    for (int index = 0; index < messageCount; index++) {
+      prematureExit.set(prematureExit.get() || _shouldStop);
+      if (prematureExit.get()) {
+        if (_segmentLogger.isDebugEnabled()) {
+          _segmentLogger.debug("stop processing message batch early 
shouldStop: {}", _shouldStop);
+        }
+        break;
+      }
+      if (!canTakeMore.get()) {
+        // The RealtimeSegmentImpl that we are pushing rows into has indicated 
that it cannot accept any more
+        // rows. This can happen in one of two conditions:
+        // 1. We are in INITIAL_CONSUMING state, and we somehow exceeded the 
max number of rows we are allowed to
+        // consume
+        //    for this row. Something is seriously wrong, because 
endCriteriaReached() should have returned true when
+        //    we hit the row limit.
+        //    Throw an exception.
+        //
+        // 2. We are in CATCHING_UP state, and we legally hit this error due 
to unclean leader election where
+        //    offsets get changed with higher generation numbers for some 
pinot servers but not others. So, if another
+        //    server (who got a larger stream offset) asked us to catch up to 
that offset, but we are connected to a
+        //    broker who has smaller offsets, then we may try to push more 
rows into the buffer than maximum. This
+        //    is a rare case, and we really don't know how to handle this at 
this time.
+        //    Throw an exception.
+        //
+        _segmentLogger
+            .error("Buffer full with {} rows consumed (row limit {}, indexed 
{})", _numRowsConsumed, _numRowsIndexed,
+                _segmentMaxRowCount);
+        throw new RuntimeException("Realtime segment full");
+      }
+
+      final int idx = index; // To bypass the final check in lambda
+      submittedMsgCount.incrementAndGet();
+      decodeAndTransformExecutor.submit(() -> {
+        try {
+          if (prematureExit.get()) {
+            return;
+          }
+          // TODO: ReusedResult cannot be easily used in parallel processing, 
find a better way to reduce mem usage
+          Pair<List<GenericRow>, Integer> transformedResults =
+              decodeAndTransformEvent(messageBatch, idx,
+                  realtimeRowsDroppedMeterThreadLocal.get(), 
realtimeIncompleteRowsConsumedMeterThreadLocal.get());
+          if (transformedResults != null) {
+            transformedQueue.put(transformedResults);
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt(); // Handle interruption
+          _segmentLogger.error("Caught InterruptedException while decoding and 
transforming event", e);
+        } finally {
+          submittedMsgCount.decrementAndGet();
+        }
+      });
+    }
+
+    indexingThread = new Thread(() -> {
+      try {
+        while (true) {
+          Pair<List<GenericRow>, Integer> transformedResults =
+              transformedQueue.poll(1, TimeUnit.SECONDS); // Poll with timeout
+          if (transformedResults != null) {
+            indexRows(transformedResults, messageBatch,
+                canTakeMore, prematureExit, indexedMessageCount, 
streamMessageCount,
+                realtimeRowsConsumedMeterThreadLocal.get(), 
realtimeBytesIngestedMeterThreadLocal.get());
+          }
+          if (prematureExit.get() || submittedMsgCount.get() == 0 && 
transformedQueue.isEmpty()) {
+            break; // Exit if all transformation tasks are done and queue is 
empty
+          }
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt(); // Handle interruption
+      }
+    });
+
+    indexingThread.start();

Review Comment:
   Who is executing this thread?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -254,9 +261,10 @@ public void deleteSegmentFile() {
   private final MutableSegmentImpl _realtimeSegment;
   private volatile StreamPartitionMsgOffset _currentOffset; // Next offset to 
be consumed
   private volatile State _state;
-  private volatile int _numRowsConsumed = 0;
-  private volatile int _numRowsIndexed = 0; // Can be different from 
_numRowsConsumed when metrics update is enabled.
-  private volatile int _numRowsErrored = 0;
+  private AtomicInteger _numRowsConsumed = new AtomicInteger(0);
+  // Can be different from _numRowsConsumed when metrics update is enabled.
+  private AtomicInteger _numRowsIndexed = new AtomicInteger(0);
+  private AtomicInteger _numRowsErrored = new AtomicInteger(0);

Review Comment:
   These can be final?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -679,6 +699,249 @@ private boolean processStreamEvents(MessageBatch 
messageBatch, long idlePipeSlee
     return prematureExit;
   }
 
+  /**
+   * TODO: move all processStreamEvents to a separate interface
+   * prcoessStreamEventsAsync is a method that processes a batch of messages 
asynchronously. It will asynchronously
+   * doing the (decode + transform) and indexing of the messages.
+   * It will keep the order of the messages and do same strict _currentOffset 
update as serial processing.
+   * @param messageBatch batch of messages to process
+   * @param idlePipeSleepTimeMillis wait time in case no messages were read
+   * @return returns <code>true</code> if the process loop ended before 
processing the batch, <code>false</code>
+   * otherwise
+   */
+  private boolean processStreamEventsAsync(MessageBatch messageBatch, long 
idlePipeSleepTimeMillis) {
+    int messageCount = messageBatch.getMessageCount();
+    _partitionRateLimiter.throttle(messageCount);
+    _serverRateLimiter.throttle(messageCount);
+
+    ThreadLocal<PinotMeter> realtimeRowsConsumedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeBytesIngestedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeRowsDroppedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+    ThreadLocal<PinotMeter> realtimeIncompleteRowsConsumedMeterThreadLocal = 
ThreadLocal.withInitial(() -> null);
+
+    AtomicInteger indexedMessageCount = new AtomicInteger(0);
+    AtomicInteger streamMessageCount = new AtomicInteger(0);
+    AtomicBoolean canTakeMore = new AtomicBoolean(true);
+    boolean hasTransformedRows = false;
+
+    AtomicBoolean prematureExit = new AtomicBoolean(false);
+    RowMetadata msgMetadata = null;
+
+    BlockingQueue<Pair<List<GenericRow>, Integer>> transformedQueue = new 
LinkedBlockingQueue<>();
+    AtomicInteger submittedMsgCount = new AtomicInteger(0);
+    // TODO: tune the number of threads
+    ExecutorService decodeAndTransformExecutor = 
Executors.newFixedThreadPool(1);

Review Comment:
   Starting a new executor per message batch can create big overhead. Consider 
creating an executor to be shared for different batches



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to