This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3aeae40e01 Ensure stream consumer is only closed once (#15372) 3aeae40e01 is described below commit 3aeae40e01e4a5492cf905f3400c8d04174da000 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Mar 27 07:26:17 2025 -0600 Ensure stream consumer is only closed once (#15372) --- .../realtime/RealtimeSegmentDataManager.java | 38 ++++++++++++---------- .../realtime/RealtimeSegmentDataManagerTest.java | 4 +-- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 9011318891..730055a89c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -244,6 +244,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { private final int _segmentMaxRowCount; private final String _resourceDataDir; private final Schema _schema; + private final AtomicBoolean _streamConsumerClosed = new AtomicBoolean(false); // Semaphore for each partitionGroupId only, which is to prevent two different stream consumers // from consuming with the same partitionGroupId in parallel in the same host. // See the comments in {@link RealtimeTableDataManager}. @@ -252,7 +253,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // This boolean is needed because the semaphore is shared by threads; every thread holding this semaphore can // modify the permit. This boolean make sure the semaphore gets released only once when the partition group stops // consuming. - private final AtomicBoolean _acquiredConsumerSemaphore; + private final AtomicBoolean _consumerSemaphoreAcquired = new AtomicBoolean(false); private final ServerMetrics _serverMetrics; private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager; private final PartitionDedupMetadataManager _partitionDedupMetadataManager; @@ -1070,14 +1071,14 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { } @VisibleForTesting - AtomicBoolean getAcquiredConsumerSemaphore() { - return _acquiredConsumerSemaphore; + AtomicBoolean getConsumerSemaphoreAcquired() { + return _consumerSemaphoreAcquired; } @VisibleForTesting protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { if (_parallelSegmentConsumptionPolicy.isAllowedDuringBuild()) { - closeStreamConsumers(); + closeStreamConsumer(); } // Do not allow building segment when table data manager is already shut down if (_realtimeTableDataManager.isShutDown()) { @@ -1276,12 +1277,11 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { return true; } - private void closeStreamConsumers() { - closePartitionGroupConsumer(); - closePartitionMetadataProvider(); - if (_acquiredConsumerSemaphore.compareAndSet(true, false)) { - _segmentLogger.info("Releasing the consumer semaphore"); - _consumerCoordinator.release(); + private void closeStreamConsumer() { + if (_streamConsumerClosed.compareAndSet(false, true)) { + closePartitionGroupConsumer(); + closePartitionMetadataProvider(); + releaseConsumerSemaphore(); } } @@ -1303,6 +1303,13 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { } } + private void releaseConsumerSemaphore() { + if (_consumerSemaphoreAcquired.compareAndSet(true, false)) { + _segmentLogger.info("Releasing consumer semaphore"); + _consumerCoordinator.release(); + } + } + /** * Cleans up the metrics that reflects the state of the realtime segment. * This step is essential as the instance may not be the target location for some of the partitions. @@ -1461,7 +1468,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata) throws Exception { if (_parallelSegmentConsumptionPolicy.isAllowedDuringDownload()) { - closeStreamConsumers(); + closeStreamConsumer(); } _realtimeTableDataManager.downloadAndReplaceConsumingSegment(segmentZKMetadata); } @@ -1500,7 +1507,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { } catch (Exception e) { _segmentLogger.error("Caught exception while stopping the consumer thread", e); } - closeStreamConsumers(); + closeStreamConsumer(); cleanupMetrics(); _realtimeSegment.offload(); } @@ -1586,7 +1593,6 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { : _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()), _segmentZKMetadata.getStatus().toString()); _consumerCoordinator = consumerCoordinator; - _acquiredConsumerSemaphore = new AtomicBoolean(false); InstanceDataManagerConfig instanceDataManagerConfig = indexLoadingConfig.getInstanceDataManagerConfig(); String clientIdSuffix = instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null; @@ -1680,7 +1686,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // Acquire semaphore to create stream consumers try { _consumerCoordinator.acquire(llcSegmentName); - _acquiredConsumerSemaphore.set(true); + _consumerSemaphoreAcquired.set(true); } catch (InterruptedException e) { String errorMsg = "InterruptedException when acquiring the partitionConsumerSemaphore"; _segmentLogger.error(errorMsg); @@ -1710,9 +1716,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released. // Hence releasing the semaphore here to unblock reset operation via Helix Admin. - _segmentLogger.info("Releasing the consumer semaphore"); - _consumerCoordinator.release(); - _acquiredConsumerSemaphore.set(false); + releaseConsumerSemaphore(); _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), "Failed to initialize segment data manager", t)); _segmentLogger.warn( diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index 72dbf26acb..a332818164 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -755,7 +755,7 @@ public class RealtimeSegmentDataManagerTest { throws Exception { long timeout = 10_000L; FakeRealtimeSegmentDataManager firstSegmentDataManager = createFakeSegmentManager(); - Assert.assertTrue(firstSegmentDataManager.getAcquiredConsumerSemaphore().get()); + Assert.assertTrue(firstSegmentDataManager.getConsumerSemaphoreAcquired().get()); Semaphore firstSemaphore = firstSegmentDataManager.getPartitionGroupConsumerSemaphore(); Assert.assertEquals(firstSemaphore.availablePermits(), 0); Assert.assertFalse(firstSemaphore.hasQueuedThreads()); @@ -787,7 +787,7 @@ public class RealtimeSegmentDataManagerTest { TestUtils.waitForCondition(aVoid -> secondSegmentDataManager.get() != null, timeout, "Failed to acquire the semaphore for the second segment manager in " + timeout + "ms"); - Assert.assertTrue(secondSegmentDataManager.get().getAcquiredConsumerSemaphore().get()); + Assert.assertTrue(secondSegmentDataManager.get().getConsumerSemaphoreAcquired().get()); Semaphore secondSemaphore = secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore(); Assert.assertEquals(firstSemaphore, secondSemaphore); Assert.assertEquals(secondSemaphore.availablePermits(), 0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org