This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fe72ae1d3 Minor Refactoring and fixes (#15419)
1fe72ae1d3 is described below

commit 1fe72ae1d3ecb0dfcd9df39edf26cd95abff58fd
Author: NOOB <43700604+noob-se...@users.noreply.github.com>
AuthorDate: Tue Apr 15 23:46:55 2025 +0530

    Minor Refactoring and fixes (#15419)
---
 .../manager/realtime/RealtimeSegmentDataManager.java | 16 +++++++++-------
 .../manager/realtime/RealtimeTableDataManager.java   |  8 +++-----
 .../realtime/RealtimeSegmentDataManagerTest.java     | 20 +++++++++++---------
 3 files changed, 23 insertions(+), 21 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index c25ce54f72..1293f3345d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -311,13 +311,12 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private final AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0);
   private final String _instanceId;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
-  private final long _consumeStartTime;
   private final StreamPartitionMsgOffset _startOffset;
   private final StreamConfig _streamConfig;
 
   private RowMetadata _lastRowMetadata;
   private long _lastConsumedTimestampMs = -1;
-
+  private long _consumeStartTime = -1;
   private long _lastLogTime = 0;
   private int _lastConsumedCount = 0;
   private String _stopReason = null;
@@ -747,6 +746,10 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
 
         _segmentLogger.info("Acquired consumer semaphore.");
 
+        _consumeStartTime = now();
+        _segmentLogger.info("Starting consumption on segment: {}, maxRowCount: 
{}, maxEndTime: {}.", _llcSegmentName,
+            _segmentMaxRowCount, new DateTime(_consumeEndTime, 
DateTimeZone.UTC));
+
         // TODO:
         //   When reaching here, the current consuming segment has already 
acquired the consumer semaphore, but there is
         //   no guarantee that the previous consuming segment is already 
persisted (replaced with immutable segment). It
@@ -1503,6 +1506,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
 
   private boolean catchupToFinalOffset(StreamPartitionMsgOffset endOffset, 
long timeoutMs) {
     _finalOffset = endOffset;
+    if (_consumeStartTime == -1) {
+      _consumeStartTime = now();
+    }
     _consumeEndTime = now() + timeoutMs;
     _state = State.CONSUMING_TO_ONLINE;
     _shouldStop = false;
@@ -1721,17 +1727,13 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       }
       _state = State.INITIAL_CONSUMING;
       _latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000);
-      _consumeStartTime = now();
-      setConsumeEndTime(segmentZKMetadata, _consumeStartTime);
+      setConsumeEndTime(segmentZKMetadata, now());
       _segmentCommitterFactory =
           new SegmentCommitterFactory(_segmentLogger, _protocolHandler, 
tableConfig, indexLoadingConfig, serverMetrics);
-      _segmentLogger.info("Starting consumption on realtime consuming segment 
{} maxRowCount {} maxEndTime {}",
-          llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime, 
DateTimeZone.UTC));
     } catch (Throwable t) {
       // In case of exception thrown here, segment goes to ERROR state. Then 
any attempt to reset the segment from
       // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the 
semaphore is acquired, but not released.
       // Hence releasing the semaphore here to unblock reset operation via 
Helix Admin.
-      releaseConsumerSemaphore();
       _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(),
           "Failed to initialize segment data manager", t));
       _segmentLogger.warn(
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 0db35b57a4..b59e3a58e6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -519,14 +519,12 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   private void doAddConsumingSegment(String segmentName)
       throws AttemptsExceededException, RetriableOperationException {
     SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
-    if (!_enforceConsumptionInOrder && zkMetadata.getStatus().isCompleted()) {
+    if (zkMetadata.getStatus().isCompleted()) {
       // NOTE:
-      // 1. When consumption order is enforced, we always create the 
RealtimeSegmentDataManager to coordinate the
-      //    consumption.
-      // 2. When segment is COMMITTING (for pauseless consumption), we still 
create the RealtimeSegmentDataManager
+      // 1. When segment is COMMITTING (for pauseless consumption), we still 
create the RealtimeSegmentDataManager
       //    because there is no guarantee that the segment will be committed 
soon. This way the slow server can still
       //    catch up.
-      // 3. We do not throw exception here because the segment might have just 
been committed before the state
+      // 2. We do not throw exception here because the segment might have just 
been committed before the state
       //    transition is processed. We can skip adding this segment, and the 
segment will enter CONSUMING state in
       //    Helix, then we can rely on the following CONSUMING -> ONLINE state 
transition to add it.
       _logger.warn("Segment: {} is already completed, skipping adding it as 
CONSUMING segment", segmentName);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 429f77ec53..a3ac5c906c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -785,8 +785,9 @@ public class RealtimeSegmentDataManagerTest {
       @Override
       public Long get() {
         long now = System.currentTimeMillis();
-        // now() is called once in the run() method, once before each batch 
reading and once for every row indexation
-        if (_timeCheckCounter.incrementAndGet() <= 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 4) {
+        // now() is called once in the run() method, then once on setting 
consumeStartTime, once before each batch
+        // reading and once for every row indexation
+        if (_timeCheckCounter.incrementAndGet() <= 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 5) {
           return now;
         }
         // Exceed segment time threshold
@@ -810,10 +811,10 @@ public class RealtimeSegmentDataManagerTest {
 
       consumer.run();
 
-      // millis() is called first in run before consumption, then once for 
each batch and once for each message in
-      // the batch, then once more when metrics are updated after each batch 
is processed and then 4 more times in
-      // run() after consume loop
-      Assert.assertEquals(timeSupplier._timeCheckCounter.get(), 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 8);
+      // millis() is called first in run before consumption, then once on 
setting consumeStartTime, then once for
+      // each batch and once for each message in the batch, then once more 
when metrics are updated after each batch
+      // is processed and then 4 more times in run() after consume loop
+      Assert.assertEquals(timeSupplier._timeCheckCounter.get(), 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 9);
       Assert.assertEquals(((LongMsgOffset) 
segmentDataManager.getCurrentOffset()).getOffset(),
           START_OFFSET_VALUE + 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
       Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),
@@ -845,9 +846,10 @@ public class RealtimeSegmentDataManagerTest {
 
       consumer.run();
 
-      // millis() is called first in run before consumption, then once for 
each batch and once for each message in
-      // the batch, then once for metrics updates and then 4 more times in 
run() after consume loop
-      Assert.assertEquals(timeSupplier._timeCheckCounter.get(), 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 6);
+      // millis() is called first in run before consumption, then once on 
setting consumeStartTime, then once for
+      // each batch and once for each message in the batch, then once for 
metrics updates and then 4 more times in
+      // run() after consume loop
+      Assert.assertEquals(timeSupplier._timeCheckCounter.get(), 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 7);
       Assert.assertEquals(((LongMsgOffset) 
segmentDataManager.getCurrentOffset()).getOffset(),
           START_OFFSET_VALUE + 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
       Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to