Jackie-Jiang commented on code in PR #13695: URL: https://github.com/apache/pinot/pull/13695#discussion_r1719178241
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -679,6 +699,249 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee return prematureExit; } + /** + * TODO: move all processStreamEvents to a separate interface + * prcoessStreamEventsAsync is a method that processes a batch of messages asynchronously. It will asynchronously + * doing the (decode + transform) and indexing of the messages. + * It will keep the order of the messages and do same strict _currentOffset update as serial processing. + * @param messageBatch batch of messages to process + * @param idlePipeSleepTimeMillis wait time in case no messages were read + * @return returns <code>true</code> if the process loop ended before processing the batch, <code>false</code> + * otherwise + */ + private boolean processStreamEventsAsync(MessageBatch messageBatch, long idlePipeSleepTimeMillis) { + int messageCount = messageBatch.getMessageCount(); + _partitionRateLimiter.throttle(messageCount); + _serverRateLimiter.throttle(messageCount); + + ThreadLocal<PinotMeter> realtimeRowsConsumedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeBytesIngestedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeRowsDroppedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeIncompleteRowsConsumedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + + AtomicInteger indexedMessageCount = new AtomicInteger(0); + AtomicInteger streamMessageCount = new AtomicInteger(0); + AtomicBoolean canTakeMore = new AtomicBoolean(true); + boolean hasTransformedRows = false; + + AtomicBoolean prematureExit = new AtomicBoolean(false); + RowMetadata msgMetadata = null; + + BlockingQueue<Pair<List<GenericRow>, Integer>> transformedQueue = new LinkedBlockingQueue<>(); + AtomicInteger submittedMsgCount = new AtomicInteger(0); + // TODO: tune the number of threads + ExecutorService decodeAndTransformExecutor = Executors.newFixedThreadPool(1); + Thread indexingThread = null; + + for (int index = 0; index < messageCount; index++) { + prematureExit.set(prematureExit.get() || _shouldStop); + if (prematureExit.get()) { + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("stop processing message batch early shouldStop: {}", _shouldStop); + } + break; + } + if (!canTakeMore.get()) { + // The RealtimeSegmentImpl that we are pushing rows into has indicated that it cannot accept any more + // rows. This can happen in one of two conditions: + // 1. We are in INITIAL_CONSUMING state, and we somehow exceeded the max number of rows we are allowed to + // consume + // for this row. Something is seriously wrong, because endCriteriaReached() should have returned true when + // we hit the row limit. + // Throw an exception. + // + // 2. We are in CATCHING_UP state, and we legally hit this error due to unclean leader election where + // offsets get changed with higher generation numbers for some pinot servers but not others. So, if another + // server (who got a larger stream offset) asked us to catch up to that offset, but we are connected to a + // broker who has smaller offsets, then we may try to push more rows into the buffer than maximum. This + // is a rare case, and we really don't know how to handle this at this time. + // Throw an exception. + // + _segmentLogger + .error("Buffer full with {} rows consumed (row limit {}, indexed {})", _numRowsConsumed, _numRowsIndexed, + _segmentMaxRowCount); + throw new RuntimeException("Realtime segment full"); + } + + final int idx = index; // To bypass the final check in lambda + submittedMsgCount.incrementAndGet(); + decodeAndTransformExecutor.submit(() -> { + try { + if (prematureExit.get()) { + return; + } + // TODO: ReusedResult cannot be easily used in parallel processing, find a better way to reduce mem usage + Pair<List<GenericRow>, Integer> transformedResults = + decodeAndTransformEvent(messageBatch, idx, + realtimeRowsDroppedMeterThreadLocal.get(), realtimeIncompleteRowsConsumedMeterThreadLocal.get()); + if (transformedResults != null) { + transformedQueue.put(transformedResults); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Handle interruption + _segmentLogger.error("Caught InterruptedException while decoding and transforming event", e); + } finally { + submittedMsgCount.decrementAndGet(); + } + }); + } + + indexingThread = new Thread(() -> { + try { + while (true) { + Pair<List<GenericRow>, Integer> transformedResults = + transformedQueue.poll(1, TimeUnit.SECONDS); // Poll with timeout + if (transformedResults != null) { + indexRows(transformedResults, messageBatch, + canTakeMore, prematureExit, indexedMessageCount, streamMessageCount, + realtimeRowsConsumedMeterThreadLocal.get(), realtimeBytesIngestedMeterThreadLocal.get()); + } + if (prematureExit.get() || submittedMsgCount.get() == 0 && transformedQueue.isEmpty()) { + break; // Exit if all transformation tasks are done and queue is empty + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Handle interruption + } + }); + + indexingThread.start(); Review Comment: Who is executing this thread? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -254,9 +261,10 @@ public void deleteSegmentFile() { private final MutableSegmentImpl _realtimeSegment; private volatile StreamPartitionMsgOffset _currentOffset; // Next offset to be consumed private volatile State _state; - private volatile int _numRowsConsumed = 0; - private volatile int _numRowsIndexed = 0; // Can be different from _numRowsConsumed when metrics update is enabled. - private volatile int _numRowsErrored = 0; + private AtomicInteger _numRowsConsumed = new AtomicInteger(0); + // Can be different from _numRowsConsumed when metrics update is enabled. + private AtomicInteger _numRowsIndexed = new AtomicInteger(0); + private AtomicInteger _numRowsErrored = new AtomicInteger(0); Review Comment: These can be final? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -679,6 +699,249 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee return prematureExit; } + /** + * TODO: move all processStreamEvents to a separate interface + * prcoessStreamEventsAsync is a method that processes a batch of messages asynchronously. It will asynchronously + * doing the (decode + transform) and indexing of the messages. + * It will keep the order of the messages and do same strict _currentOffset update as serial processing. + * @param messageBatch batch of messages to process + * @param idlePipeSleepTimeMillis wait time in case no messages were read + * @return returns <code>true</code> if the process loop ended before processing the batch, <code>false</code> + * otherwise + */ + private boolean processStreamEventsAsync(MessageBatch messageBatch, long idlePipeSleepTimeMillis) { + int messageCount = messageBatch.getMessageCount(); + _partitionRateLimiter.throttle(messageCount); + _serverRateLimiter.throttle(messageCount); + + ThreadLocal<PinotMeter> realtimeRowsConsumedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeBytesIngestedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeRowsDroppedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeIncompleteRowsConsumedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + + AtomicInteger indexedMessageCount = new AtomicInteger(0); + AtomicInteger streamMessageCount = new AtomicInteger(0); + AtomicBoolean canTakeMore = new AtomicBoolean(true); + boolean hasTransformedRows = false; + + AtomicBoolean prematureExit = new AtomicBoolean(false); + RowMetadata msgMetadata = null; + + BlockingQueue<Pair<List<GenericRow>, Integer>> transformedQueue = new LinkedBlockingQueue<>(); + AtomicInteger submittedMsgCount = new AtomicInteger(0); + // TODO: tune the number of threads + ExecutorService decodeAndTransformExecutor = Executors.newFixedThreadPool(1); Review Comment: Starting a new executor per message batch can create big overhead. Consider creating an executor to be shared for different batches -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org