This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aeae40e01 Ensure stream consumer is only closed once (#15372)
3aeae40e01 is described below

commit 3aeae40e01e4a5492cf905f3400c8d04174da000
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Thu Mar 27 07:26:17 2025 -0600

    Ensure stream consumer is only closed once (#15372)
---
 .../realtime/RealtimeSegmentDataManager.java       | 38 ++++++++++++----------
 .../realtime/RealtimeSegmentDataManagerTest.java   |  4 +--
 2 files changed, 23 insertions(+), 19 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 9011318891..730055a89c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -244,6 +244,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private final int _segmentMaxRowCount;
   private final String _resourceDataDir;
   private final Schema _schema;
+  private final AtomicBoolean _streamConsumerClosed = new AtomicBoolean(false);
   // Semaphore for each partitionGroupId only, which is to prevent two 
different stream consumers
   // from consuming with the same partitionGroupId in parallel in the same 
host.
   // See the comments in {@link RealtimeTableDataManager}.
@@ -252,7 +253,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   // This boolean is needed because the semaphore is shared by threads; every 
thread holding this semaphore can
   // modify the permit. This boolean make sure the semaphore gets released 
only once when the partition group stops
   // consuming.
-  private final AtomicBoolean _acquiredConsumerSemaphore;
+  private final AtomicBoolean _consumerSemaphoreAcquired = new 
AtomicBoolean(false);
   private final ServerMetrics _serverMetrics;
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
@@ -1070,14 +1071,14 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   }
 
   @VisibleForTesting
-  AtomicBoolean getAcquiredConsumerSemaphore() {
-    return _acquiredConsumerSemaphore;
+  AtomicBoolean getConsumerSemaphoreAcquired() {
+    return _consumerSemaphoreAcquired;
   }
 
   @VisibleForTesting
   protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
     if (_parallelSegmentConsumptionPolicy.isAllowedDuringBuild()) {
-      closeStreamConsumers();
+      closeStreamConsumer();
     }
     // Do not allow building segment when table data manager is already shut 
down
     if (_realtimeTableDataManager.isShutDown()) {
@@ -1276,12 +1277,11 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     return true;
   }
 
-  private void closeStreamConsumers() {
-    closePartitionGroupConsumer();
-    closePartitionMetadataProvider();
-    if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
-      _segmentLogger.info("Releasing the consumer semaphore");
-      _consumerCoordinator.release();
+  private void closeStreamConsumer() {
+    if (_streamConsumerClosed.compareAndSet(false, true)) {
+      closePartitionGroupConsumer();
+      closePartitionMetadataProvider();
+      releaseConsumerSemaphore();
     }
   }
 
@@ -1303,6 +1303,13 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     }
   }
 
+  private void releaseConsumerSemaphore() {
+    if (_consumerSemaphoreAcquired.compareAndSet(true, false)) {
+      _segmentLogger.info("Releasing consumer semaphore");
+      _consumerCoordinator.release();
+    }
+  }
+
   /**
    * Cleans up the metrics that reflects the state of the realtime segment.
    * This step is essential as the instance may not be the target location for 
some of the partitions.
@@ -1461,7 +1468,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata)
       throws Exception {
     if (_parallelSegmentConsumptionPolicy.isAllowedDuringDownload()) {
-      closeStreamConsumers();
+      closeStreamConsumer();
     }
     
_realtimeTableDataManager.downloadAndReplaceConsumingSegment(segmentZKMetadata);
   }
@@ -1500,7 +1507,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     } catch (Exception e) {
       _segmentLogger.error("Caught exception while stopping the consumer 
thread", e);
     }
-    closeStreamConsumers();
+    closeStreamConsumer();
     cleanupMetrics();
     _realtimeSegment.offload();
   }
@@ -1586,7 +1593,6 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
                 : 
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()),
             _segmentZKMetadata.getStatus().toString());
     _consumerCoordinator = consumerCoordinator;
-    _acquiredConsumerSemaphore = new AtomicBoolean(false);
     InstanceDataManagerConfig instanceDataManagerConfig = 
indexLoadingConfig.getInstanceDataManagerConfig();
     String clientIdSuffix =
         instanceDataManagerConfig != null ? 
instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
@@ -1680,7 +1686,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     // Acquire semaphore to create stream consumers
     try {
       _consumerCoordinator.acquire(llcSegmentName);
-      _acquiredConsumerSemaphore.set(true);
+      _consumerSemaphoreAcquired.set(true);
     } catch (InterruptedException e) {
       String errorMsg = "InterruptedException when acquiring the 
partitionConsumerSemaphore";
       _segmentLogger.error(errorMsg);
@@ -1710,9 +1716,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       // In case of exception thrown here, segment goes to ERROR state. Then 
any attempt to reset the segment from
       // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the 
semaphore is acquired, but not released.
       // Hence releasing the semaphore here to unblock reset operation via 
Helix Admin.
-      _segmentLogger.info("Releasing the consumer semaphore");
-      _consumerCoordinator.release();
-      _acquiredConsumerSemaphore.set(false);
+      releaseConsumerSemaphore();
       _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(),
           "Failed to initialize segment data manager", t));
       _segmentLogger.warn(
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 72dbf26acb..a332818164 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -755,7 +755,7 @@ public class RealtimeSegmentDataManagerTest {
       throws Exception {
     long timeout = 10_000L;
     FakeRealtimeSegmentDataManager firstSegmentDataManager = 
createFakeSegmentManager();
-    
Assert.assertTrue(firstSegmentDataManager.getAcquiredConsumerSemaphore().get());
+    
Assert.assertTrue(firstSegmentDataManager.getConsumerSemaphoreAcquired().get());
     Semaphore firstSemaphore = 
firstSegmentDataManager.getPartitionGroupConsumerSemaphore();
     Assert.assertEquals(firstSemaphore.availablePermits(), 0);
     Assert.assertFalse(firstSemaphore.hasQueuedThreads());
@@ -787,7 +787,7 @@ public class RealtimeSegmentDataManagerTest {
     TestUtils.waitForCondition(aVoid -> secondSegmentDataManager.get() != 
null, timeout,
         "Failed to acquire the semaphore for the second segment manager in " + 
timeout + "ms");
 
-    
Assert.assertTrue(secondSegmentDataManager.get().getAcquiredConsumerSemaphore().get());
+    
Assert.assertTrue(secondSegmentDataManager.get().getConsumerSemaphoreAcquired().get());
     Semaphore secondSemaphore = 
secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore();
     Assert.assertEquals(firstSemaphore, secondSemaphore);
     Assert.assertEquals(secondSemaphore.availablePermits(), 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to