This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 20ec20da88 Fix starvation in consumer lock (#15404) 20ec20da88 is described below commit 20ec20da88cf822705043f38213e435626a33de6 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Mar 28 22:25:40 2025 -0600 Fix starvation in consumer lock (#15404) --- .../core/data/manager/BaseTableDataManager.java | 5 + .../data/manager/realtime/ConsumerCoordinator.java | 290 +++++++++------------ .../realtime/RealtimeSegmentDataManager.java | 34 +-- .../manager/realtime/RealtimeTableDataManager.java | 97 +++---- .../realtime/SegmentAlreadyConsumedException.java | 26 -- .../manager/realtime/ConsumerCoordinatorTest.java | 148 ++++------- .../realtime/RealtimeSegmentDataManagerTest.java | 55 ---- 7 files changed, 229 insertions(+), 426 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 13edaf56cf..0070c8562f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -344,6 +344,11 @@ public abstract class BaseTableDataManager implements TableDataManager { protected abstract void doAddOnlineSegment(String segmentName) throws Exception; + @Nullable + public SegmentZKMetadata fetchZKMetadataNullable(String segmentName) { + return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, _tableNameWithType, segmentName); + } + @Override public SegmentZKMetadata fetchZKMetadata(String segmentName) { SegmentZKMetadata zkMetadata = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java index 8d82a17e9a..6510418598 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java @@ -34,9 +34,8 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerTimer; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.helix.HelixHelper; -import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; -import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,61 +47,63 @@ public class ConsumerCoordinator { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerCoordinator.class); private static final long WAIT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(3); - private final Semaphore _semaphore; private final boolean _enforceConsumptionInOrder; - private final Condition _condition; - private final Lock _lock; - private final ServerMetrics _serverMetrics; - private final boolean _alwaysRelyOnIdealState; private final RealtimeTableDataManager _realtimeTableDataManager; - private final AtomicBoolean _firstTransitionProcessed; + private final boolean _useIdealStateToCalculatePreviousSegment; + private final ServerMetrics _serverMetrics; + + // We use semaphore of 1 permit instead of lock because the semaphore is shared across multiple threads, and it can be + // released by a different thread than the one that acquired it. There is no out-of-box Lock implementation that + // allows releasing the lock from a different thread. + private final Semaphore _semaphore = new Semaphore(1); + private final Lock _lock = new ReentrantLock(); + private final Condition _condition = _lock.newCondition(); + private final AtomicBoolean _firstTransitionProcessed = new AtomicBoolean(false); - private volatile int _maxSegmentSeqNumRegistered = -1; + private volatile int _maxSequenceNumberRegistered = -1; public ConsumerCoordinator(boolean enforceConsumptionInOrder, RealtimeTableDataManager realtimeTableDataManager) { - _semaphore = new Semaphore(1); - _lock = new ReentrantLock(); - _condition = _lock.newCondition(); _enforceConsumptionInOrder = enforceConsumptionInOrder; _realtimeTableDataManager = realtimeTableDataManager; StreamIngestionConfig streamIngestionConfig = realtimeTableDataManager.getStreamIngestionConfig(); - if (streamIngestionConfig != null) { - // if isUseIdealStateToCalculatePreviousSegment is true, server relies on ideal state to fetch previous segment - // to a segment for all helix transitions. - _alwaysRelyOnIdealState = streamIngestionConfig.isUseIdealStateToCalculatePreviousSegment(); - } else { - _alwaysRelyOnIdealState = false; - } - _firstTransitionProcessed = new AtomicBoolean(false); + _useIdealStateToCalculatePreviousSegment = + streamIngestionConfig != null && streamIngestionConfig.isUseIdealStateToCalculatePreviousSegment(); _serverMetrics = ServerMetrics.get(); } public void acquire(LLCSegmentName llcSegmentName) - throws InterruptedException { + throws InterruptedException, ShouldNotConsumeException { + String segmentName = llcSegmentName.getSegmentName(); if (_enforceConsumptionInOrder) { long startTimeMs = System.currentTimeMillis(); - waitForPrevSegment(llcSegmentName); + SegmentZKMetadata segmentZKMetadata = waitForPreviousSegment(llcSegmentName); _serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(), ServerTimer.PREV_SEGMENT_WAIT_TIME_MS, System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS); - if (isSegmentAlreadyConsumed(llcSegmentName.getSegmentName())) { - // if segment is already consumed, just return from here. - // NOTE: if segment is deleted, this segment will never be registered and helix thread waiting on - // watermark for prev segment won't be notified. All such helix threads will fallback to rely on ideal - // state for previous segment. - throw new SegmentAlreadyConsumedException(llcSegmentName.getSegmentName()); + // When consumption order is enforced, unless the segment is deleted, we wait until the previous segment is + // registered regardless of whether ZK metadata status has changed to guarantee the consumption ordering. + // + // Prevent the following scenario: + // - Seg 100 (OFFLINE -> CONSUMING pending) + // + // - Seg 101 (OFFLINE -> CONSUMING returned because of status change) + // - Seg 101 (CONSUMING -> ONLINE processed) + // + // - Seg 102 (OFFLINE -> CONSUMING started consuming while 100 is not registered) + if (segmentZKMetadata != null) { + checkSegmentStatus(segmentZKMetadata); } } long startTimeMs = System.currentTimeMillis(); while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) { - String currSegmentName = llcSegmentName.getSegmentName(); - LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: {}ms. Retrying.", currSegmentName, + LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: {}ms. Retrying.", segmentName, System.currentTimeMillis() - startTimeMs); - - if (isSegmentAlreadyConsumed(currSegmentName)) { - throw new SegmentAlreadyConsumedException(currSegmentName); + SegmentZKMetadata segmentZKMetadata = _realtimeTableDataManager.fetchZKMetadataNullable(segmentName); + if (segmentZKMetadata == null) { + throw new ShouldNotConsumeException("Segment: " + segmentName + " is deleted"); } + checkSegmentStatus(segmentZKMetadata); } } @@ -115,177 +116,126 @@ public class ConsumerCoordinator { return _semaphore; } - public void trackSegment(LLCSegmentName llcSegmentName) { + public void register(LLCSegmentName llcSegmentName) { _lock.lock(); try { - if (!_alwaysRelyOnIdealState) { - _maxSegmentSeqNumRegistered = Math.max(_maxSegmentSeqNumRegistered, llcSegmentName.getSequenceNumber()); + int sequenceNumber = llcSegmentName.getSequenceNumber(); + if (sequenceNumber > _maxSequenceNumberRegistered) { + _maxSequenceNumberRegistered = sequenceNumber; + // notify all helix threads waiting for their offline -> consuming segment's prev segment to be loaded + _condition.signalAll(); } - // notify all helix threads waiting for their offline -> consuming segment's prev segment to be loaded - _condition.signalAll(); } finally { _lock.unlock(); } } - private void waitForPrevSegment(LLCSegmentName currSegment) - throws InterruptedException { - - if (_alwaysRelyOnIdealState || !_firstTransitionProcessed.get()) { - // if _alwaysRelyOnIdealState or no offline -> consuming transition has been processed, it means rely on - // ideal state to fetch previous segment. - awaitForPreviousSegmentFromIdealState(currSegment); - - // the first transition will always be prone to error, consider edge case where segment previous to current - // helix transition's segment was deleted and this server came alive after successful deletion. the prev - // segment will not exist, hence first transition is handled using isFirstTransitionSuccessful. - _firstTransitionProcessed.set(true); - return; - } - - // rely on _maxSegmentSeqNumRegistered watermark for previous segment. - if (awaitForPreviousSegmentSequenceNumber(currSegment, WAIT_INTERVAL_MS)) { - return; - } - - // tried using prevSegSeqNumber watermark, but could not acquire the previous segment. - // fallback to acquire prev segment from ideal state. - awaitForPreviousSegmentFromIdealState(currSegment); - } - - private void awaitForPreviousSegmentFromIdealState(LLCSegmentName currSegment) - throws InterruptedException { - String previousSegment = getPreviousSegmentFromIdealState(currSegment); - if (previousSegment == null) { - // previous segment can only be null if either all the previous segments are deleted or this is the starting - // sequence segment of the partition Group. - return; - } - - SegmentDataManager segmentDataManager = _realtimeTableDataManager.acquireSegment(previousSegment); - try { - long startTimeMs = System.currentTimeMillis(); - _lock.lock(); - try { - while (segmentDataManager == null) { - // if segmentDataManager == null, it means segment is not loaded in the server. - // wait until it's loaded. - if (!_condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) { - LOGGER.warn("Semaphore access denied to segment: {}. Waited on previous segment: {} for: {}ms.", - currSegment.getSegmentName(), previousSegment, System.currentTimeMillis() - startTimeMs); - - // waited until timeout, fetch previous segment again from ideal state as previous segment might be - // changed in ideal state. - previousSegment = getPreviousSegmentFromIdealState(currSegment); - if (previousSegment == null) { - return; - } - } - segmentDataManager = _realtimeTableDataManager.acquireSegment(previousSegment); - } - } finally { - _lock.unlock(); - } - } finally { - if (segmentDataManager != null) { - _realtimeTableDataManager.releaseSegment(segmentDataManager); + /** + * Waits for the previous segment to be registered to the server. Returns the segment ZK metadata fetched during the + * wait to reduce unnecessary ZK read. + */ + @Nullable + private SegmentZKMetadata waitForPreviousSegment(LLCSegmentName currentSegment) + throws InterruptedException, ShouldNotConsumeException { + if (!_firstTransitionProcessed.get() || _useIdealStateToCalculatePreviousSegment) { + SegmentZKMetadata segmentZKMetadata = null; + if (_maxSequenceNumberRegistered < currentSegment.getSequenceNumber() - 1) { + int previousSegmentSequenceNumber = getPreviousSegmentSequenceNumberFromIdealState(currentSegment); + segmentZKMetadata = waitForPreviousSegment(currentSegment, previousSegmentSequenceNumber); } + _firstTransitionProcessed.set(true); + return segmentZKMetadata; + } else { + return waitForPreviousSegment(currentSegment, currentSegment.getSequenceNumber() - 1); } } - /*** - * @param currSegment is the segment of current helix transition. - * @param timeoutMs is max time to wait in millis - * @return true if previous Segment was registered to the server, else false. - * @throws InterruptedException + /** + * Waits for the previous segment with the sequence number to be registered to the server. Returns the segment ZK + * metadata fetched during the wait to reduce unnecessary ZK read.. */ + @Nullable @VisibleForTesting - boolean awaitForPreviousSegmentSequenceNumber(LLCSegmentName currSegment, long timeoutMs) - throws InterruptedException { + SegmentZKMetadata waitForPreviousSegment(LLCSegmentName currentSegment, int previousSegmentSequenceNumber) + throws InterruptedException, ShouldNotConsumeException { + if (previousSegmentSequenceNumber <= _maxSequenceNumberRegistered) { + return null; + } + SegmentZKMetadata segmentZKMetadata = null; long startTimeMs = System.currentTimeMillis(); - int prevSeqNum = currSegment.getSequenceNumber() - 1; _lock.lock(); try { - while (_maxSegmentSeqNumRegistered < prevSeqNum) { + while (previousSegmentSequenceNumber > _maxSequenceNumberRegistered) { // it means the previous segment is not loaded in the server. Wait until it's loaded. - if (!_condition.await(timeoutMs, TimeUnit.MILLISECONDS)) { - LOGGER.warn( - "Semaphore access denied to segment: {}. Waited on previous segment with sequence number: {} for: {}ms.", - currSegment.getSegmentName(), prevSeqNum, System.currentTimeMillis() - startTimeMs); - - // waited until the timeout. Rely on ideal state now. - return _maxSegmentSeqNumRegistered >= prevSeqNum; + if (!_condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) { + String segmentName = currentSegment.getSegmentName(); + LOGGER.warn("Waited on previous segment with sequence number: {} for: {}ms. " + + "Refreshing the previous segment sequence number for current segment: {}", + previousSegmentSequenceNumber, System.currentTimeMillis() - startTimeMs, segmentName); + segmentZKMetadata = _realtimeTableDataManager.fetchZKMetadataNullable(segmentName); + if (segmentZKMetadata == null) { + throw new ShouldNotConsumeException("Segment: " + segmentName + " is deleted"); + } + previousSegmentSequenceNumber = getPreviousSegmentSequenceNumberFromIdealState(currentSegment); } } - return true; + return segmentZKMetadata; } finally { _lock.unlock(); } } @VisibleForTesting - @Nullable - String getPreviousSegmentFromIdealState(LLCSegmentName currSegment) { + int getPreviousSegmentSequenceNumberFromIdealState(LLCSegmentName currentSegment) { long startTimeMs = System.currentTimeMillis(); - // if seq num of current segment is 102, maxSequenceNumBelowCurrentSegment must be highest seq num of any segment - // created before current segment - int maxSequenceNumBelowCurrentSegment = -1; - String previousSegment = null; - int currPartitionGroupId = currSegment.getPartitionGroupId(); - int currSequenceNum = currSegment.getSequenceNumber(); - Map<String, Map<String, String>> segmentAssignment = getSegmentAssignment(); - String currentServerInstanceId = _realtimeTableDataManager.getServerInstance(); - - for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map<String, String> instanceStateMap = entry.getValue(); - String state = instanceStateMap.get(currentServerInstanceId); - - if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) { + // Track the highest sequence number of any segment created before the current segment. If there is none, return -1 + // so that it can always pass the check. + int maxSequenceNumberBelowCurrentSegment = -1; + String instanceId = _realtimeTableDataManager.getServerInstance(); + int partitionId = currentSegment.getPartitionGroupId(); + int currentSequenceNumber = currentSegment.getSequenceNumber(); + + for (Map.Entry<String, Map<String, String>> entry : getSegmentAssignment().entrySet()) { + String state = entry.getValue().get(instanceId); + if (!SegmentStateModel.ONLINE.equals(state)) { // if server is looking for previous segment to current transition's segment, it means the previous segment // has to be online in the instance. If all previous segments are not online, we just allow the current helix // transition to go ahead. continue; } - LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey()); if (llcSegmentName == null) { // ignore uploaded segments continue; } - if (llcSegmentName.getPartitionGroupId() != currPartitionGroupId) { + if (llcSegmentName.getPartitionGroupId() != partitionId) { // ignore segments of different partitions. continue; } - if (llcSegmentName.getSequenceNumber() >= currSequenceNum) { - // ignore segments with higher sequence number than existing helix transition segment. - continue; - } - - if (llcSegmentName.getSequenceNumber() > maxSequenceNumBelowCurrentSegment) { - maxSequenceNumBelowCurrentSegment = llcSegmentName.getSequenceNumber(); - // also track the name of segment - previousSegment = segmentName; + int sequenceNumber = llcSegmentName.getSequenceNumber(); + if (sequenceNumber > maxSequenceNumberBelowCurrentSegment && sequenceNumber < currentSequenceNumber) { + maxSequenceNumberBelowCurrentSegment = sequenceNumber; } } long timeSpentMs = System.currentTimeMillis() - startTimeMs; - LOGGER.info("Fetched previous segment: {} to current segment: {} in: {}ms.", previousSegment, - currSegment.getSegmentName(), timeSpentMs); + LOGGER.info("Fetched previous segment sequence number: {} to current segment: {} in: {}ms.", + maxSequenceNumberBelowCurrentSegment, currentSegment.getSegmentName(), timeSpentMs); _serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(), ServerTimer.PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS, timeSpentMs, TimeUnit.MILLISECONDS); - return previousSegment; + return maxSequenceNumberBelowCurrentSegment; } @VisibleForTesting Map<String, Map<String, String>> getSegmentAssignment() { - IdealState idealState = HelixHelper.getTableIdealState(_realtimeTableDataManager.getHelixManager(), - _realtimeTableDataManager.getTableName()); - Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", - _realtimeTableDataManager.getTableName()); + String realtimeTableName = _realtimeTableDataManager.getTableName(); + IdealState idealState = + HelixHelper.getTableIdealState(_realtimeTableDataManager.getHelixManager(), realtimeTableName); + Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", realtimeTableName); return idealState.getRecord().getMapFields(); } @@ -299,26 +249,32 @@ public class ConsumerCoordinator { return _firstTransitionProcessed; } - // this should not be used outside of tests. @VisibleForTesting - int getMaxSegmentSeqNumLoaded() { - return _maxSegmentSeqNumRegistered; + int getMaxSequenceNumberRegistered() { + return _maxSequenceNumberRegistered; } - @VisibleForTesting - boolean isSegmentAlreadyConsumed(String currSegmentName) { - SegmentZKMetadata segmentZKMetadata = _realtimeTableDataManager.fetchZKMetadata(currSegmentName); - if (segmentZKMetadata == null) { - // segment is deleted. no need to consume. - LOGGER.warn("Skipping consumption for segment: {} because ZK metadata does not exists.", currSegmentName); - return true; - } + private static void checkSegmentStatus(SegmentZKMetadata segmentZKMetadata) + throws ShouldNotConsumeException { if (segmentZKMetadata.getStatus().isCompleted()) { - // if segment is done or uploaded, no need to consume. - LOGGER.warn("Skipping consumption for segment: {} because ZK status is already marked as completed.", - currSegmentName); - return true; + throw new ShouldNotConsumeException( + "Segment: " + segmentZKMetadata.getSegmentName() + " is already completed with status: " + + segmentZKMetadata.getStatus()); + } + } + + /** + * This exception is thrown when attempting to acquire the consumer semaphore for a segment that should not be + * consumed anymore: + * - Segment is in completed status (DONE/UPLOADED) + * - Segment is deleted + * + * We allow consumption when segment is COMMITTING (for pauseless consumption) because there is no guarantee that the + * segment will be committed soon. This way the slow server can still catch up. + */ + public static class ShouldNotConsumeException extends Exception { + public ShouldNotConsumeException(String message) { + super(message); } - return false; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index a5ac3c2e95..0979782428 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -244,6 +244,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { private final int _segmentMaxRowCount; private final String _resourceDataDir; private final Schema _schema; + private final LLCSegmentName _llcSegmentName; private final AtomicBoolean _streamConsumerClosed = new AtomicBoolean(false); // Semaphore for each partitionGroupId only, which is to prevent two different stream consumers // from consuming with the same partitionGroupId in parallel in the same host. @@ -734,6 +735,16 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { } while (!_shouldStop && !_isReadyToConsumeData.getAsBoolean()); } + // Acquire semaphore before consuming data + try { + _consumerCoordinator.acquire(_llcSegmentName); + } catch (ConsumerCoordinator.ShouldNotConsumeException e) { + _segmentLogger.info("Skipping consumption because: {}", e.getMessage()); + return; + } + _consumerSemaphoreAcquired.set(true); + _consumerCoordinator.register(_llcSegmentName); + // TODO: // When reaching here, the current consuming segment has already acquired the consumer semaphore, but there is // no guarantee that the previous consuming segment is already persisted (replaced with immutable segment). It @@ -1068,16 +1079,6 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { return _segmentBuildDescriptor; } - @VisibleForTesting - Semaphore getPartitionGroupConsumerSemaphore() { - return _consumerCoordinator.getSemaphore(); - } - - @VisibleForTesting - AtomicBoolean getConsumerSemaphoreAcquired() { - return _consumerSemaphoreAcquired; - } - @VisibleForTesting protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { if (_parallelSegmentConsumptionPolicy.isAllowedDuringBuild()) { @@ -1569,6 +1570,8 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { _realtimeTableDataManager = realtimeTableDataManager; _resourceDataDir = resourceDataDir; _schema = schema; + _llcSegmentName = llcSegmentName; + _consumerCoordinator = consumerCoordinator; _serverMetrics = serverMetrics; _partitionUpsertMetadataManager = partitionUpsertMetadataManager; _partitionDedupMetadataManager = partitionDedupMetadataManager; @@ -1603,7 +1606,6 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { _segmentZKMetadata.getEndOffset() == null ? null : _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()), _segmentZKMetadata.getStatus().toString()); - _consumerCoordinator = consumerCoordinator; InstanceDataManagerConfig instanceDataManagerConfig = indexLoadingConfig.getInstanceDataManagerConfig(); String clientIdSuffix = instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null; @@ -1694,16 +1696,6 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { throw e; } - // Acquire semaphore to create stream consumers - try { - _consumerCoordinator.acquire(llcSegmentName); - _consumerSemaphoreAcquired.set(true); - } catch (InterruptedException e) { - String errorMsg = "InterruptedException when acquiring the partitionConsumerSemaphore"; - _segmentLogger.error(errorMsg); - throw new RuntimeException(errorMsg + " for segment: " + _segmentNameStr); - } - try { _startOffset = _partitionGroupConsumptionStatus.getStartOffset(); _currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index fdc74f3918..53ced97032 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -99,17 +99,16 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private RealtimeSegmentStatsHistory _statsHistory; private final Semaphore _segmentBuildSemaphore; - // Maintains a map of partition id to semaphore. + // Maintains a map from partition id to consumer coordinator. The consumer coordinator uses a semaphore to ensure that + // exactly one PartitionConsumer instance consumes from any stream partition. + // In some streams, it's possible that having multiple consumers (with the same consumer name on the same host) + // consuming from the same stream partition can lead to bugs. // We use semaphore of 1 permit instead of lock because the semaphore is shared across multiple threads, and it can be // released by a different thread than the one that acquired it. There is no out-of-box Lock implementation that // allows releasing the lock from a different thread. - // The semaphore ensures that exactly one PartitionConsumer instance consumes from any stream partition. - // In some streams, it's possible that having multiple consumers (with the same consumer name on the same host) - // consuming from the same stream partition can lead to bugs. - // The semaphores will stay in the hash map even if the consuming partitions move to a different host. - // We expect that there will be a small number of semaphores, but that may be ok. - private final Map<Integer, ConsumerCoordinator> _partitionGroupIdToConsumerCoordinatorMap = - new ConcurrentHashMap<>(); + // The consumer coordinators will stay in the map even if the consuming partitions moved to a different server. We + // expect a small number of consumer coordinators, so it should be fine to not remove them. + private final Map<Integer, ConsumerCoordinator> _partitionIdToConsumerCoordinatorMap = new ConcurrentHashMap<>(); // The old name of the stats file used to be stats.ser which we changed when we moved all packages // from com.linkedin to org.apache because of not being able to deserialize the old files using the newer classes private static final String STATS_FILE_NAME = "segment-stats.ser"; @@ -474,23 +473,26 @@ public class RealtimeTableDataManager extends BaseTableDataManager { SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); if (segmentDataManager == null) { addNewOnlineSegment(zkMetadata, indexLoadingConfig); - return; - } - if (segmentDataManager instanceof RealtimeSegmentDataManager) { + } else if (segmentDataManager instanceof RealtimeSegmentDataManager) { _logger.info("Changing segment: {} from CONSUMING to ONLINE", segmentName); ((RealtimeSegmentDataManager) segmentDataManager).goOnlineFromConsuming(zkMetadata); onConsumingToOnline(segmentName); - return; - } - // For pauseless ingestion, the segment is marked ONLINE before it's built and before the COMMIT_END_METADATA - // call completes. - // The server should replace the segment only after the CRC is set by COMMIT_END_METADATA and the segment is - // marked DONE. - // This ensures the segment's download URL is available before discarding the locally built copy, preventing - // data loss if COMMIT_END_METADATA fails. - if (zkMetadata.getStatus() == Status.DONE) { + } else if (zkMetadata.getStatus() == Status.DONE) { + // For pauseless ingestion, the segment is marked ONLINE before it's built and before the COMMIT_END_METADATA + // call completes. + // The server should replace the segment only after the CRC is set by COMMIT_END_METADATA and the segment is + // marked DONE. + // This ensures the segment's download URL is available before discarding the locally built copy, preventing + // data loss if COMMIT_END_METADATA fails. replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, indexLoadingConfig); } + // Register the segment into the consumer coordinator if consumption order is enforced. + if (_enforceConsumptionInOrder) { + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + if (llcSegmentName != null) { + getConsumerCoordinator(llcSegmentName.getPartitionGroupId()).register(llcSegmentName); + } + } } @Override @@ -516,11 +518,17 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private void doAddConsumingSegment(String segmentName) throws AttemptsExceededException, RetriableOperationException { SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName); - if ((!_enforceConsumptionInOrder) && ((zkMetadata == null) || (zkMetadata.getStatus().isCompleted()))) { - // NOTE: We do not throw exception here because the segment might have just been committed before the state - // transition is processed. We can skip adding this segment, and the segment will enter CONSUMING state in - // Helix, then we can rely on the following CONSUMING -> ONLINE state transition to add it. - _logger.warn("Segment: {} is already consumed, skipping adding it as CONSUMING segment", segmentName); + if (!_enforceConsumptionInOrder && zkMetadata.getStatus().isCompleted()) { + // NOTE: + // 1. When consumption order is enforced, we always create the RealtimeSegmentDataManager to coordinate the + // consumption. + // 2. When segment is COMMITTING (for pauseless consumption), we still create the RealtimeSegmentDataManager + // because there is no guarantee that the segment will be committed soon. This way the slow server can still + // catch up. + // 3. We do not throw exception here because the segment might have just been committed before the state + // transition is processed. We can skip adding this segment, and the segment will enter CONSUMING state in + // Helix, then we can rely on the following CONSUMING -> ONLINE state transition to add it. + _logger.warn("Segment: {} is already completed, skipping adding it as CONSUMING segment", segmentName); return; } IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig(); @@ -557,22 +565,10 @@ public class RealtimeTableDataManager extends BaseTableDataManager { PartitionDedupMetadataManager partitionDedupMetadataManager = _tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null; - RealtimeSegmentDataManager realtimeSegmentDataManager; - try { - realtimeSegmentDataManager = - createRealtimeSegmentDataManager(zkMetadata, tableConfig, indexLoadingConfig, schema, llcSegmentName, - consumerCoordinator, partitionUpsertMetadataManager, partitionDedupMetadataManager, - _isTableReadyToConsumeData); - } catch (SegmentAlreadyConsumedException e) { - // Don't register segment. - // If segment is not deleted, Eventually this server should receive a CONSUMING -> ONLINE helix state transition. - // If consumption in order is enforced: - // 1. If segment was deleted: Helix thread waiting on this deleted segment will fallback to fetch prev segment - // from ideal state. - // 2. If segment is not deleted, Helix thread waiting on this segment will be notified and unblocked during - // consuming -> online transition of this segment. - return; - } + RealtimeSegmentDataManager realtimeSegmentDataManager = + createRealtimeSegmentDataManager(zkMetadata, tableConfig, indexLoadingConfig, schema, llcSegmentName, + consumerCoordinator, partitionUpsertMetadataManager, partitionDedupMetadataManager, + _isTableReadyToConsumeData); registerSegment(segmentName, realtimeSegmentDataManager, partitionUpsertMetadataManager); if (partitionUpsertMetadataManager != null) { partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName); @@ -821,21 +817,6 @@ public class RealtimeTableDataManager extends BaseTableDataManager { registerSegment(segmentName, segmentDataManager); } - @Override - protected SegmentDataManager registerSegment(String segmentName, SegmentDataManager segmentDataManager) { - SegmentDataManager oldSegmentDataManager = super.registerSegment(segmentName, segmentDataManager); - if (_enforceConsumptionInOrder) { - // helix threads might be waiting for their respective previous segments to be loaded. - // they need to be notified here. - LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); - if (llcSegmentName != null) { - ConsumerCoordinator consumerCoordinator = getConsumerCoordinator(llcSegmentName.getPartitionGroupId()); - consumerCoordinator.trackSegment(llcSegmentName); - } - } - return oldSegmentDataManager; - } - /** * Replaces the CONSUMING segment with a downloaded committed one. */ @@ -892,8 +873,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } @VisibleForTesting - ConsumerCoordinator getConsumerCoordinator(int partitionGroupId) { - return _partitionGroupIdToConsumerCoordinatorMap.computeIfAbsent(partitionGroupId, + ConsumerCoordinator getConsumerCoordinator(int partitionId) { + return _partitionIdToConsumerCoordinatorMap.computeIfAbsent(partitionId, k -> new ConsumerCoordinator(_enforceConsumptionInOrder, this)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java deleted file mode 100644 index 6b04920dff..0000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.data.manager.realtime; - -public class SegmentAlreadyConsumedException extends RuntimeException { - - public SegmentAlreadyConsumedException(String currSegmentName) { - super("Skipping consumption for segment: " + currSegmentName); - } -} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java index 1d8ec5bc0a..775ecffc3e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java @@ -25,16 +25,15 @@ import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; -import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; -import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; +// TODO: Replace the sleep in this test to condition wait public class ConsumerCoordinatorTest { private static class FakeRealtimeTableDataManager extends RealtimeTableDataManager { @@ -98,15 +97,10 @@ public class ConsumerCoordinatorTest { public Map<String, Map<String, String>> getSegmentAssignment() { return _segmentAssignmentMap; } - - @Override - public boolean isSegmentAlreadyConsumed(String currSegmentName) { - return false; - } } @Test - public void testAwaitForPreviousSegmentSequenceNumber() + public void testWaitForPreviousSegment() throws InterruptedException { // 1. enable tracking segment seq num. FakeRealtimeTableDataManager realtimeTableDataManager = new FakeRealtimeTableDataManager(null, false); @@ -117,40 +111,24 @@ public class ConsumerCoordinatorTest { // 2. check if thread waits on prev segment seq AtomicBoolean atomicBoolean = new AtomicBoolean(false); Thread thread1 = new Thread(() -> { - LLCSegmentName llcSegmentName = getLLCSegment(101); + LLCSegmentName currentSegment = getLLCSegment(101); try { - boolean b = consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 5000); - atomicBoolean.set(b); - } catch (InterruptedException e) { - throw new RuntimeException(e); + consumerCoordinator.waitForPreviousSegment(currentSegment, 100); + atomicBoolean.set(true); + } catch (Exception e) { + Assert.fail(); } }); thread1.start(); + Thread.sleep(1000); + Assert.assertFalse(atomicBoolean.get()); // 3. add prev segment and check if thread is unblocked. - consumerCoordinator.trackSegment(getLLCSegment(100)); + consumerCoordinator.register(getLLCSegment(100)); - TestUtils.waitForCondition(aVoid -> atomicBoolean.get(), 4000, + TestUtils.waitForCondition(aVoid -> atomicBoolean.get(), 5000, "Thread waiting on previous segment should have been unblocked."); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100); - - // 4. check if second thread waits on prev segment seq until timeout and returns false - AtomicBoolean atomicBoolean2 = new AtomicBoolean(false); - Thread thread2 = new Thread(() -> { - LLCSegmentName llcSegmentName = getLLCSegment(102); - try { - boolean b = consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 500); - atomicBoolean2.set(b); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - thread2.start(); - - Thread.sleep(1500); - - Assert.assertFalse(atomicBoolean2.get()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 100); } @Test @@ -185,15 +163,15 @@ public class ConsumerCoordinatorTest { Thread.sleep(1000); // 3. load segment 100, 101, 102 - realtimeTableDataManager.registerSegment(getSegmentName(100), mockedRealtimeSegmentDataManager); - realtimeTableDataManager.registerSegment(getSegmentName(101), mockedRealtimeSegmentDataManager); - realtimeTableDataManager.registerSegment(getSegmentName(102), mockedRealtimeSegmentDataManager); + consumerCoordinator.register(getLLCSegment(100)); + consumerCoordinator.register(getLLCSegment(101)); + consumerCoordinator.register(getLLCSegment(102)); Thread.sleep(1000); // 4. check all of the above threads wait Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 102); Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get()); thread2.start(); @@ -205,27 +183,27 @@ public class ConsumerCoordinatorTest { // 5. check that first thread acquiring semaphore is of segment 104 Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 102); Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); - realtimeTableDataManager.registerSegment(getSegmentName(104), mockedRealtimeSegmentDataManager); + consumerCoordinator.register(getLLCSegment(104)); Thread.sleep(1000); Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 104); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 104); Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 1); // 6. check the next threads acquiring semaphore is 106 consumerCoordinator.getSemaphore().release(); - realtimeTableDataManager.registerSegment(getSegmentName(106), mockedRealtimeSegmentDataManager); + consumerCoordinator.register(getLLCSegment(106)); Thread.sleep(1000); Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 106); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 106); Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 1); } @@ -249,26 +227,24 @@ public class ConsumerCoordinatorTest { Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), -1); Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get()); - RealtimeSegmentDataManager mockedRealtimeSegmentDataManager = getMockedRealtimeSegmentDataManager(); - // 3. register older segment and check seq num watermark and semaphore. - realtimeTableDataManager.registerSegment(getSegmentName(90), mockedRealtimeSegmentDataManager); + consumerCoordinator.register(getLLCSegment(90)); Thread.sleep(1000); Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 90); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 90); Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get()); // 4. register prev segment and check watermark and if thread was unblocked - realtimeTableDataManager.registerSegment(getSegmentName(91), mockedRealtimeSegmentDataManager); + consumerCoordinator.register(getLLCSegment(91)); Thread.sleep(1000); Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 91); Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); // 5. check that all the following transitions rely on seq num watermark and gets blocked. @@ -283,7 +259,7 @@ public class ConsumerCoordinatorTest { Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 91); Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); // 6. check that all above threads are still blocked even if semaphore is released. @@ -293,25 +269,25 @@ public class ConsumerCoordinatorTest { Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 91); Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); // 6. mark 101 seg as complete. Check 102 acquired the semaphore. - realtimeTableDataManager.registerSegment(getSegmentName(101), mockedRealtimeSegmentDataManager); + consumerCoordinator.register(getLLCSegment(101)); Thread.sleep(1000); Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 101); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 101); Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 0); // 7. register 102 seg, check if seg 103 is waiting on semaphore. - realtimeTableDataManager.registerSegment(getSegmentName(102), mockedRealtimeSegmentDataManager); + consumerCoordinator.register(getLLCSegment(102)); Thread.sleep(1000); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 102); Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 1); @@ -320,16 +296,16 @@ public class ConsumerCoordinatorTest { consumerCoordinator.getSemaphore().release(); Thread.sleep(1000); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 102); Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 0); // 8. register 103 seg and check if seg 104 is now queued on semaphore - realtimeTableDataManager.registerSegment(getSegmentName(103), mockedRealtimeSegmentDataManager); + consumerCoordinator.register(getLLCSegment(103)); Thread.sleep(1000); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 103); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 103); Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 1); } @@ -354,14 +330,14 @@ public class ConsumerCoordinatorTest { Assert.assertNotNull(realtimeTableDataManager); // prev segment has seq 91, so registering seq 90 won't do anything. - realtimeTableDataManager.registerSegment(getSegmentName(90), mockedRealtimeSegmentDataManager); + consumerCoordinator.register(getLLCSegment(90)); Thread.sleep(2000); Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); // 2. test that registering prev segment will unblock thread. - realtimeTableDataManager.registerSegment(getSegmentName(91), mockedRealtimeSegmentDataManager); + consumerCoordinator.register(getLLCSegment(91)); TestUtils.waitForCondition(aVoid -> (consumerCoordinator.getSemaphore().availablePermits() == 0), 5000, "Semaphore must be acquired after registering previous segment"); @@ -372,8 +348,8 @@ public class ConsumerCoordinatorTest { Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads()); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1); - realtimeTableDataManager.registerSegment(getSegmentName(101), mockedRealtimeSegmentDataManager); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 91); + consumerCoordinator.register(getLLCSegment(101)); // 3. test that segment 103 will be blocked. Map<String, String> serverSegmentStatusMap = new HashMap<>() {{ @@ -401,7 +377,7 @@ public class ConsumerCoordinatorTest { Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads()); // 4. registering seg 102 should unblock seg 103 - realtimeTableDataManager.registerSegment(getSegmentName(102), mockedRealtimeSegmentDataManager); + consumerCoordinator.register(getLLCSegment(102)); Thread.sleep(1000); @@ -416,12 +392,12 @@ public class ConsumerCoordinatorTest { Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 0); Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); - Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1); + Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 102); } @Test public void testRandomOrder() - throws InterruptedException { + throws Exception { RealtimeTableDataManager realtimeTableDataManager = Mockito.mock(RealtimeTableDataManager.class); Mockito.when(realtimeTableDataManager.getTableName()).thenReturn("tableTest_REALTIME"); @@ -450,51 +426,25 @@ public class ConsumerCoordinatorTest { String segmentName = "tableTest_REALTIME__1__101__20250304T0035Z"; LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); Assert.assertNotNull(llcSegmentName); - String previousSegment = consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName); - Assert.assertEquals(previousSegment, "tableTest_REALTIME__1__91__20250304T0035Z"); + int previousSegmentSequenceNumber = + consumerCoordinator.getPreviousSegmentSequenceNumberFromIdealState(llcSegmentName); + Assert.assertEquals(previousSegmentSequenceNumber, 91); consumerCoordinator.getSegmentAssignment().clear(); Map<String, String> serverSegmentStatusMap = new HashMap<>() {{ put("server_3", "ONLINE"); }}; consumerCoordinator.getSegmentAssignment().put(getSegmentName(100), serverSegmentStatusMap); - previousSegment = consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName); - Assert.assertNull(previousSegment); - } - - @Test - public void testIfSegmentIsConsumed() { - RealtimeTableDataManager realtimeTableDataManager = Mockito.mock(RealtimeTableDataManager.class); - Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(null); - - ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(true, realtimeTableDataManager); - Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101))); - - SegmentZKMetadata mockSegmentZKMetadata = Mockito.mock(SegmentZKMetadata.class); - - Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.IN_PROGRESS); - Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata); - Assert.assertFalse(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101))); - - Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.COMMITTING); - Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata); - Assert.assertFalse(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101))); - - Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.DONE); - Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata); - Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101))); - - Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.UPLOADED); - Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata); - Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101))); + previousSegmentSequenceNumber = consumerCoordinator.getPreviousSegmentSequenceNumberFromIdealState(llcSegmentName); + Assert.assertEquals(previousSegmentSequenceNumber, -1); } private Thread getNewThread(FakeConsumerCoordinator consumerCoordinator, LLCSegmentName llcSegmentName) { return new Thread(() -> { try { consumerCoordinator.acquire(llcSegmentName); - } catch (InterruptedException e) { - throw new RuntimeException(e); + } catch (Exception e) { + Assert.fail(); } }, String.valueOf(llcSegmentName.getSequenceNumber())); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index a332818164..8bf32c576b 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -28,10 +28,8 @@ import java.time.Instant; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.function.Supplier; import javax.annotation.Nullable; @@ -70,7 +68,6 @@ import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -750,58 +747,6 @@ public class RealtimeSegmentDataManagerTest { segmentDataManager.close(); } - @Test - public void testOnlyOneSegmentHoldingTheSemaphoreForParticularPartition() - throws Exception { - long timeout = 10_000L; - FakeRealtimeSegmentDataManager firstSegmentDataManager = createFakeSegmentManager(); - Assert.assertTrue(firstSegmentDataManager.getConsumerSemaphoreAcquired().get()); - Semaphore firstSemaphore = firstSegmentDataManager.getPartitionGroupConsumerSemaphore(); - Assert.assertEquals(firstSemaphore.availablePermits(), 0); - Assert.assertFalse(firstSemaphore.hasQueuedThreads()); - - AtomicReference<FakeRealtimeSegmentDataManager> secondSegmentDataManager = new AtomicReference<>(null); - - // Construct the second segment manager, which will be blocked on the semaphore. - Thread constructSecondSegmentManager = new Thread(() -> { - try { - secondSegmentDataManager.set(createFakeSegmentManager()); - } catch (Exception e) { - throw new RuntimeException("Exception when sleeping for " + timeout + "ms", e); - } - }); - constructSecondSegmentManager.start(); - - // Wait until the second segment manager gets blocked on the semaphore. - TestUtils.waitForCondition(aVoid -> { - if (firstSemaphore.hasQueuedThreads()) { - // Once verified the second segment gets blocked, release the semaphore. - firstSegmentDataManager.close(); - return true; - } else { - return false; - } - }, timeout, "Failed to wait for the second segment blocked on semaphore"); - - // Wait for the second segment manager finished the construction. - TestUtils.waitForCondition(aVoid -> secondSegmentDataManager.get() != null, timeout, - "Failed to acquire the semaphore for the second segment manager in " + timeout + "ms"); - - Assert.assertTrue(secondSegmentDataManager.get().getConsumerSemaphoreAcquired().get()); - Semaphore secondSemaphore = secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore(); - Assert.assertEquals(firstSemaphore, secondSemaphore); - Assert.assertEquals(secondSemaphore.availablePermits(), 0); - Assert.assertFalse(secondSemaphore.hasQueuedThreads()); - - // Call offload method the 2nd time on the first segment manager, the permits in semaphore won't increase. - firstSegmentDataManager.close(); - Assert.assertEquals(firstSegmentDataManager.getPartitionGroupConsumerSemaphore().availablePermits(), 0); - - // The permit finally gets released in the Semaphore. - secondSegmentDataManager.get().close(); - Assert.assertEquals(secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore().availablePermits(), 1); - } - @Test public void testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org