This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e5d9a04650 Unblocks helix thread for online segment (#15246)
e5d9a04650 is described below

commit e5d9a046506270124b8c3d90b8db4e301cc7701d
Author: NOOB <43700604+noob-se...@users.noreply.github.com>
AuthorDate: Fri Mar 28 15:00:02 2025 +0530

    Unblocks helix thread for online segment (#15246)
    
    * Fixes helix threads getting blocked edge case
    
    * Increase wait time
    
    * addresses PR comment
    
    * nit
    
    * Releases partitionGroupConsumerSemaphore incase of Errors
    
    * keeps condition consistent with RealtimeTableDataManager
    
    * nit
    
    * Adds dedup and upsert condition
    
    * handle npe
    
    * refactoring
    
    * nmit
    
    * nit
    
    * nit
    
    * decrease time wait
    
    * fix condition
    
    * updates allowConsumptionDuringCommit
    
    * fixes lint
    
    * nit
    
    * nit
    
    * unblocks helix threads
    
    * reverts prev condition
    
    * fixes lint
    
    * Adds test
    
    * Adds comment
    
    * nit
    
    * fixes test
    
    * nit
---
 .../data/manager/realtime/ConsumerCoordinator.java | 47 ++++++++++++++++++++--
 .../manager/realtime/RealtimeTableDataManager.java | 24 ++++++++---
 .../realtime/SegmentAlreadyConsumedException.java  | 26 ++++++++++++
 .../manager/realtime/ConsumerCoordinatorTest.java  | 29 +++++++++++++
 4 files changed, 117 insertions(+), 9 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
index 75ba5f73d4..35dac8b2ec 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nullable;
 import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -87,8 +88,13 @@ public class ConsumerCoordinator {
 
     long startTimeMs = System.currentTimeMillis();
     while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
-      LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: 
{}ms. Retrying.", llcSegmentName,
+      String currSegmentName = llcSegmentName.getSegmentName();
+      LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: 
{}ms. Retrying.", currSegmentName,
           System.currentTimeMillis() - startTimeMs);
+
+      if (isSegmentAlreadyConsumed(currSegmentName)) {
+        throw new SegmentAlreadyConsumedException(currSegmentName);
+      }
     }
   }
 
@@ -159,6 +165,15 @@ public class ConsumerCoordinator {
           if (!_condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
             LOGGER.warn("Semaphore access denied to segment: {}. Waited on 
previous segment: {} for: {}ms.",
                 currSegment.getSegmentName(), previousSegment, 
System.currentTimeMillis() - startTimeMs);
+
+            if (isSegmentAlreadyConsumed(currSegment.getSegmentName())) {
+              // if segment is already consumed, just return from here.
+              // NOTE: if segment is deleted, this segment will never be 
registered and helix thread waiting on
+              // watermark for prev segment won't be notified. All such helix 
threads will fallback to rely on ideal
+              // state for previous segment.
+              throw new 
SegmentAlreadyConsumedException(currSegment.getSegmentName());
+            }
+
             // waited until timeout, fetch previous segment again from ideal 
state as previous segment might be
             // changed in ideal state.
             previousSegment = getPreviousSegmentFromIdealState(currSegment);
@@ -197,6 +212,15 @@ public class ConsumerCoordinator {
           LOGGER.warn(
               "Semaphore access denied to segment: {}. Waited on previous 
segment with sequence number: {} for: {}ms.",
               currSegment.getSegmentName(), prevSeqNum, 
System.currentTimeMillis() - startTimeMs);
+
+          if (isSegmentAlreadyConsumed(currSegment.getSegmentName())) {
+            // if segment is already consumed, just return from here.
+            // NOTE: if segment is deleted, this segment will never be 
registered and helix thread waiting on
+            // watermark for prev segment won't be notified. All such helix 
threads will fallback to rely on ideal
+            // state for previous segment.
+            throw new 
SegmentAlreadyConsumedException(currSegment.getSegmentName());
+          }
+
           // waited until the timeout. Rely on ideal state now.
           return _maxSegmentSeqNumRegistered >= prevSeqNum;
         }
@@ -256,8 +280,8 @@ public class ConsumerCoordinator {
     }
 
     long timeSpentMs = System.currentTimeMillis() - startTimeMs;
-    LOGGER.info("Fetched previous segment: {} to current segment: {} in: 
{}ms.", previousSegment, currSegment,
-        timeSpentMs);
+    LOGGER.info("Fetched previous segment: {} to current segment: {} in: 
{}ms.", previousSegment,
+        currSegment.getSegmentName(), timeSpentMs);
     _serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(),
         ServerTimer.PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS, timeSpentMs, 
TimeUnit.MILLISECONDS);
 
@@ -288,4 +312,21 @@ public class ConsumerCoordinator {
   int getMaxSegmentSeqNumLoaded() {
     return _maxSegmentSeqNumRegistered;
   }
+
+  @VisibleForTesting
+  boolean isSegmentAlreadyConsumed(String currSegmentName) {
+    SegmentZKMetadata segmentZKMetadata = 
_realtimeTableDataManager.fetchZKMetadata(currSegmentName);
+    if (segmentZKMetadata == null) {
+      // segment is deleted. no need to consume.
+      LOGGER.warn("Skipping consumption for segment: {} because ZK metadata 
does not exists.", currSegmentName);
+      return true;
+    }
+    if (segmentZKMetadata.getStatus().isCompleted()) {
+      // if segment is done or uploaded, no need to consume.
+      LOGGER.warn("Skipping consumption for segment: {} because ZK status is 
already marked as completed.",
+          currSegmentName);
+      return true;
+    }
+    return false;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 7bffd4fdd8..668da1b748 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -516,11 +516,11 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   private void doAddConsumingSegment(String segmentName)
       throws AttemptsExceededException, RetriableOperationException {
     SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
-    if ((zkMetadata.getStatus() != Status.IN_PROGRESS) && 
(!_enforceConsumptionInOrder)) {
+    if ((zkMetadata == null) || (zkMetadata.getStatus().isCompleted())) {
       // NOTE: We do not throw exception here because the segment might have 
just been committed before the state
       //       transition is processed. We can skip adding this segment, and 
the segment will enter CONSUMING state in
       //       Helix, then we can rely on the following CONSUMING -> ONLINE 
state transition to add it.
-      _logger.warn("Segment: {} is already committed, skipping adding it as 
CONSUMING segment", segmentName);
+      _logger.warn("Segment: {} is already consumed, skipping adding it as 
CONSUMING segment", segmentName);
       return;
     }
     IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
@@ -557,10 +557,22 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     PartitionDedupMetadataManager partitionDedupMetadataManager =
         _tableDedupMetadataManager != null ? 
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
             : null;
-    RealtimeSegmentDataManager realtimeSegmentDataManager =
-        createRealtimeSegmentDataManager(zkMetadata, tableConfig, 
indexLoadingConfig, schema, llcSegmentName,
-            consumerCoordinator, partitionUpsertMetadataManager, 
partitionDedupMetadataManager,
-            _isTableReadyToConsumeData);
+    RealtimeSegmentDataManager realtimeSegmentDataManager;
+    try {
+      realtimeSegmentDataManager =
+          createRealtimeSegmentDataManager(zkMetadata, tableConfig, 
indexLoadingConfig, schema, llcSegmentName,
+              consumerCoordinator, partitionUpsertMetadataManager, 
partitionDedupMetadataManager,
+              _isTableReadyToConsumeData);
+    } catch (SegmentAlreadyConsumedException e) {
+      // Don't register segment.
+      // If segment is not deleted, Eventually this server should receive a 
CONSUMING -> ONLINE helix state transition.
+      // If consumption in order is enforced:
+      // 1. If segment was deleted: Helix thread waiting on this deleted 
segment will fallback to fetch prev segment
+      //    from ideal state.
+      // 2. If segment is not deleted, Helix thread waiting on this segment 
will be notified and unblocked during
+      //    consuming -> online transition of this segment.
+      return;
+    }
     registerSegment(segmentName, realtimeSegmentDataManager, 
partitionUpsertMetadataManager);
     if (partitionUpsertMetadataManager != null) {
       partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java
new file mode 100644
index 0000000000..6b04920dff
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+public class SegmentAlreadyConsumedException extends RuntimeException {
+
+  public SegmentAlreadyConsumedException(String currSegmentName) {
+    super("Skipping consumption for segment: " + currSegmentName);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
index a2b01a44ea..5041bfade7 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
@@ -25,8 +25,10 @@ import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -455,6 +457,33 @@ public class ConsumerCoordinatorTest {
     Assert.assertNull(previousSegment);
   }
 
+  @Test
+  public void testIfSegmentIsConsumed() {
+    RealtimeTableDataManager realtimeTableDataManager = 
Mockito.mock(RealtimeTableDataManager.class);
+    
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(null);
+
+    FakeConsumerCoordinator consumerCoordinator = new 
FakeConsumerCoordinator(true, realtimeTableDataManager);
+    
Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
+
+    SegmentZKMetadata mockSegmentZKMetadata = 
Mockito.mock(SegmentZKMetadata.class);
+
+    
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+    
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
+    
Assert.assertFalse(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
+
+    
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.COMMITTING);
+    
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
+    
Assert.assertFalse(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
+
+    
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.DONE);
+    
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
+    
Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
+
+    
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.UPLOADED);
+    
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
+    
Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
+  }
+
   private Thread getNewThread(FakeConsumerCoordinator consumerCoordinator, 
LLCSegmentName llcSegmentName) {
     return new Thread(() -> {
       try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to