This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 1fe72ae1d3 Minor Refactoring and fixes (#15419) 1fe72ae1d3 is described below commit 1fe72ae1d3ecb0dfcd9df39edf26cd95abff58fd Author: NOOB <43700604+noob-se...@users.noreply.github.com> AuthorDate: Tue Apr 15 23:46:55 2025 +0530 Minor Refactoring and fixes (#15419) --- .../manager/realtime/RealtimeSegmentDataManager.java | 16 +++++++++------- .../manager/realtime/RealtimeTableDataManager.java | 8 +++----- .../realtime/RealtimeSegmentDataManagerTest.java | 20 +++++++++++--------- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index c25ce54f72..1293f3345d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -311,13 +311,12 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { private final AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0); private final String _instanceId; private final ServerSegmentCompletionProtocolHandler _protocolHandler; - private final long _consumeStartTime; private final StreamPartitionMsgOffset _startOffset; private final StreamConfig _streamConfig; private RowMetadata _lastRowMetadata; private long _lastConsumedTimestampMs = -1; - + private long _consumeStartTime = -1; private long _lastLogTime = 0; private int _lastConsumedCount = 0; private String _stopReason = null; @@ -747,6 +746,10 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { _segmentLogger.info("Acquired consumer semaphore."); + _consumeStartTime = now(); + _segmentLogger.info("Starting consumption on segment: {}, maxRowCount: {}, maxEndTime: {}.", _llcSegmentName, + _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC)); + // TODO: // When reaching here, the current consuming segment has already acquired the consumer semaphore, but there is // no guarantee that the previous consuming segment is already persisted (replaced with immutable segment). It @@ -1503,6 +1506,9 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { private boolean catchupToFinalOffset(StreamPartitionMsgOffset endOffset, long timeoutMs) { _finalOffset = endOffset; + if (_consumeStartTime == -1) { + _consumeStartTime = now(); + } _consumeEndTime = now() + timeoutMs; _state = State.CONSUMING_TO_ONLINE; _shouldStop = false; @@ -1721,17 +1727,13 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { } _state = State.INITIAL_CONSUMING; _latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000); - _consumeStartTime = now(); - setConsumeEndTime(segmentZKMetadata, _consumeStartTime); + setConsumeEndTime(segmentZKMetadata, now()); _segmentCommitterFactory = new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics); - _segmentLogger.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", - llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC)); } catch (Throwable t) { // In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released. // Hence releasing the semaphore here to unblock reset operation via Helix Admin. - releaseConsumerSemaphore(); _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), "Failed to initialize segment data manager", t)); _segmentLogger.warn( diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 0db35b57a4..b59e3a58e6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -519,14 +519,12 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private void doAddConsumingSegment(String segmentName) throws AttemptsExceededException, RetriableOperationException { SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName); - if (!_enforceConsumptionInOrder && zkMetadata.getStatus().isCompleted()) { + if (zkMetadata.getStatus().isCompleted()) { // NOTE: - // 1. When consumption order is enforced, we always create the RealtimeSegmentDataManager to coordinate the - // consumption. - // 2. When segment is COMMITTING (for pauseless consumption), we still create the RealtimeSegmentDataManager + // 1. When segment is COMMITTING (for pauseless consumption), we still create the RealtimeSegmentDataManager // because there is no guarantee that the segment will be committed soon. This way the slow server can still // catch up. - // 3. We do not throw exception here because the segment might have just been committed before the state + // 2. We do not throw exception here because the segment might have just been committed before the state // transition is processed. We can skip adding this segment, and the segment will enter CONSUMING state in // Helix, then we can rely on the following CONSUMING -> ONLINE state transition to add it. _logger.warn("Segment: {} is already completed, skipping adding it as CONSUMING segment", segmentName); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index 429f77ec53..a3ac5c906c 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -785,8 +785,9 @@ public class RealtimeSegmentDataManagerTest { @Override public Long get() { long now = System.currentTimeMillis(); - // now() is called once in the run() method, once before each batch reading and once for every row indexation - if (_timeCheckCounter.incrementAndGet() <= FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 4) { + // now() is called once in the run() method, then once on setting consumeStartTime, once before each batch + // reading and once for every row indexation + if (_timeCheckCounter.incrementAndGet() <= FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 5) { return now; } // Exceed segment time threshold @@ -810,10 +811,10 @@ public class RealtimeSegmentDataManagerTest { consumer.run(); - // millis() is called first in run before consumption, then once for each batch and once for each message in - // the batch, then once more when metrics are updated after each batch is processed and then 4 more times in - // run() after consume loop - Assert.assertEquals(timeSupplier._timeCheckCounter.get(), FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 8); + // millis() is called first in run before consumption, then once on setting consumeStartTime, then once for + // each batch and once for each message in the batch, then once more when metrics are updated after each batch + // is processed and then 4 more times in run() after consume loop + Assert.assertEquals(timeSupplier._timeCheckCounter.get(), FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 9); Assert.assertEquals(((LongMsgOffset) segmentDataManager.getCurrentOffset()).getOffset(), START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS); Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(), @@ -845,9 +846,10 @@ public class RealtimeSegmentDataManagerTest { consumer.run(); - // millis() is called first in run before consumption, then once for each batch and once for each message in - // the batch, then once for metrics updates and then 4 more times in run() after consume loop - Assert.assertEquals(timeSupplier._timeCheckCounter.get(), FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 6); + // millis() is called first in run before consumption, then once on setting consumeStartTime, then once for + // each batch and once for each message in the batch, then once for metrics updates and then 4 more times in + // run() after consume loop + Assert.assertEquals(timeSupplier._timeCheckCounter.get(), FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 7); Assert.assertEquals(((LongMsgOffset) segmentDataManager.getCurrentOffset()).getOffset(), START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS); Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org