sajjad-moradi commented on code in PR #13695: URL: https://github.com/apache/pinot/pull/13695#discussion_r1725762115
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -679,6 +699,249 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee return prematureExit; } + /** + * TODO: move all processStreamEvents to a separate interface + * prcoessStreamEventsAsync is a method that processes a batch of messages asynchronously. It will asynchronously + * doing the (decode + transform) and indexing of the messages. + * It will keep the order of the messages and do same strict _currentOffset update as serial processing. + * @param messageBatch batch of messages to process + * @param idlePipeSleepTimeMillis wait time in case no messages were read + * @return returns <code>true</code> if the process loop ended before processing the batch, <code>false</code> + * otherwise + */ + private boolean processStreamEventsAsync(MessageBatch messageBatch, long idlePipeSleepTimeMillis) { + int messageCount = messageBatch.getMessageCount(); + _partitionRateLimiter.throttle(messageCount); + _serverRateLimiter.throttle(messageCount); + + ThreadLocal<PinotMeter> realtimeRowsConsumedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeBytesIngestedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeRowsDroppedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeIncompleteRowsConsumedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + + AtomicInteger indexedMessageCount = new AtomicInteger(0); + AtomicInteger streamMessageCount = new AtomicInteger(0); + AtomicBoolean canTakeMore = new AtomicBoolean(true); + boolean hasTransformedRows = false; + + AtomicBoolean prematureExit = new AtomicBoolean(false); + RowMetadata msgMetadata = null; + + BlockingQueue<Pair<List<GenericRow>, Integer>> transformedQueue = new LinkedBlockingQueue<>(); + AtomicInteger submittedMsgCount = new AtomicInteger(0); + // TODO: tune the number of threads + ExecutorService decodeAndTransformExecutor = Executors.newFixedThreadPool(1); + Thread indexingThread = null; + + for (int index = 0; index < messageCount; index++) { + prematureExit.set(prematureExit.get() || _shouldStop); + if (prematureExit.get()) { + if (_segmentLogger.isDebugEnabled()) { + _segmentLogger.debug("stop processing message batch early shouldStop: {}", _shouldStop); + } + break; + } + if (!canTakeMore.get()) { + // The RealtimeSegmentImpl that we are pushing rows into has indicated that it cannot accept any more + // rows. This can happen in one of two conditions: + // 1. We are in INITIAL_CONSUMING state, and we somehow exceeded the max number of rows we are allowed to + // consume + // for this row. Something is seriously wrong, because endCriteriaReached() should have returned true when + // we hit the row limit. + // Throw an exception. + // + // 2. We are in CATCHING_UP state, and we legally hit this error due to unclean leader election where + // offsets get changed with higher generation numbers for some pinot servers but not others. So, if another + // server (who got a larger stream offset) asked us to catch up to that offset, but we are connected to a + // broker who has smaller offsets, then we may try to push more rows into the buffer than maximum. This + // is a rare case, and we really don't know how to handle this at this time. + // Throw an exception. + // + _segmentLogger + .error("Buffer full with {} rows consumed (row limit {}, indexed {})", _numRowsConsumed, _numRowsIndexed, + _segmentMaxRowCount); + throw new RuntimeException("Realtime segment full"); + } + + final int idx = index; // To bypass the final check in lambda + submittedMsgCount.incrementAndGet(); + decodeAndTransformExecutor.submit(() -> { + try { + if (prematureExit.get()) { + return; + } + // TODO: ReusedResult cannot be easily used in parallel processing, find a better way to reduce mem usage + Pair<List<GenericRow>, Integer> transformedResults = + decodeAndTransformEvent(messageBatch, idx, + realtimeRowsDroppedMeterThreadLocal.get(), realtimeIncompleteRowsConsumedMeterThreadLocal.get()); + if (transformedResults != null) { + transformedQueue.put(transformedResults); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Handle interruption + _segmentLogger.error("Caught InterruptedException while decoding and transforming event", e); + } finally { + submittedMsgCount.decrementAndGet(); + } + }); + } + + indexingThread = new Thread(() -> { + try { + while (true) { + Pair<List<GenericRow>, Integer> transformedResults = + transformedQueue.poll(1, TimeUnit.SECONDS); // Poll with timeout + if (transformedResults != null) { + indexRows(transformedResults, messageBatch, + canTakeMore, prematureExit, indexedMessageCount, streamMessageCount, + realtimeRowsConsumedMeterThreadLocal.get(), realtimeBytesIngestedMeterThreadLocal.get()); + } + if (prematureExit.get() || submittedMsgCount.get() == 0 && transformedQueue.isEmpty()) { + break; // Exit if all transformation tasks are done and queue is empty + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Handle interruption + } + }); + + indexingThread.start(); Review Comment: Consumer thread for the partition is the one kicking off this "indexingThread". I don't understand why we kick off a separate thread, and then in the next line, we wait for it to finish. What's the difference if we don't spin off a new thread, and use the main thread (consuming thread) to do the indexing? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -679,6 +699,249 @@ private boolean processStreamEvents(MessageBatch messageBatch, long idlePipeSlee return prematureExit; } + /** + * TODO: move all processStreamEvents to a separate interface + * prcoessStreamEventsAsync is a method that processes a batch of messages asynchronously. It will asynchronously + * doing the (decode + transform) and indexing of the messages. + * It will keep the order of the messages and do same strict _currentOffset update as serial processing. + * @param messageBatch batch of messages to process + * @param idlePipeSleepTimeMillis wait time in case no messages were read + * @return returns <code>true</code> if the process loop ended before processing the batch, <code>false</code> + * otherwise + */ + private boolean processStreamEventsAsync(MessageBatch messageBatch, long idlePipeSleepTimeMillis) { + int messageCount = messageBatch.getMessageCount(); + _partitionRateLimiter.throttle(messageCount); + _serverRateLimiter.throttle(messageCount); + + ThreadLocal<PinotMeter> realtimeRowsConsumedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeBytesIngestedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeRowsDroppedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + ThreadLocal<PinotMeter> realtimeIncompleteRowsConsumedMeterThreadLocal = ThreadLocal.withInitial(() -> null); + + AtomicInteger indexedMessageCount = new AtomicInteger(0); + AtomicInteger streamMessageCount = new AtomicInteger(0); + AtomicBoolean canTakeMore = new AtomicBoolean(true); + boolean hasTransformedRows = false; + + AtomicBoolean prematureExit = new AtomicBoolean(false); + RowMetadata msgMetadata = null; + + BlockingQueue<Pair<List<GenericRow>, Integer>> transformedQueue = new LinkedBlockingQueue<>(); + AtomicInteger submittedMsgCount = new AtomicInteger(0); + // TODO: tune the number of threads Review Comment: We can't have more than one thread here otherwise the order of indexed rows will be different, and that's something we can't tolerate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org