This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new cf74f5ad6d Consumes segments in strict order of sequence number (#15261) cf74f5ad6d is described below commit cf74f5ad6dde98157f534e7781cbb177889cddac Author: NOOB <43700604+noob-se...@users.noreply.github.com> AuthorDate: Thu Mar 27 07:12:08 2025 +0530 Consumes segments in strict order of sequence number (#15261) --- .../apache/pinot/common/metrics/ServerTimer.java | 6 + .../data/manager/realtime/ConsumerCoordinator.java | 291 +++++++++++++ .../realtime/RealtimeSegmentDataManager.java | 22 +- .../manager/realtime/RealtimeTableDataManager.java | 61 ++- .../manager/realtime/ConsumerCoordinatorTest.java | 485 +++++++++++++++++++++ .../realtime/RealtimeSegmentDataManagerTest.java | 19 +- ...FailureInjectingRealtimeSegmentDataManager.java | 19 +- .../FailureInjectingRealtimeTableDataManager.java | 5 +- .../table/ingestion/StreamIngestionConfig.java | 27 +- 9 files changed, 891 insertions(+), 44 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java index 1618f73047..917a187946 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java @@ -63,6 +63,12 @@ public enum ServerTimer implements AbstractMetrics.Timer { SECONDARY_Q_WAIT_TIME_MS("milliseconds", false, "Time spent waiting in the secondary queue when BinaryWorkloadScheduler is used."), + PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS("milliseconds", false, + "Time spent while fetching previous segment from ideal state for any segment."), + + PREV_SEGMENT_WAIT_TIME_MS("milliseconds", false, + "Time spent while waiting on previous segment to be registered."), + // Multi-stage /** * Time spent building the hash table for the join. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java new file mode 100644 index 0000000000..75ba5f73d4 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.realtime; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nullable; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.metrics.ServerTimer; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The ConsumerCoordinator coordinates the offline->consuming helix transitions. + */ +public class ConsumerCoordinator { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerCoordinator.class); + private static final long WAIT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(3); + + private final Semaphore _semaphore; + private final boolean _enforceConsumptionInOrder; + private final Condition _condition; + private final Lock _lock; + private final ServerMetrics _serverMetrics; + private final boolean _alwaysRelyOnIdealState; + private final RealtimeTableDataManager _realtimeTableDataManager; + private final AtomicBoolean _firstTransitionProcessed; + + private volatile int _maxSegmentSeqNumRegistered = -1; + + public ConsumerCoordinator(boolean enforceConsumptionInOrder, RealtimeTableDataManager realtimeTableDataManager) { + _semaphore = new Semaphore(1); + _lock = new ReentrantLock(); + _condition = _lock.newCondition(); + _enforceConsumptionInOrder = enforceConsumptionInOrder; + _realtimeTableDataManager = realtimeTableDataManager; + StreamIngestionConfig streamIngestionConfig = realtimeTableDataManager.getStreamIngestionConfig(); + if (streamIngestionConfig != null) { + // if isUseIdealStateToCalculatePreviousSegment is true, server relies on ideal state to fetch previous segment + // to a segment for all helix transitions. + _alwaysRelyOnIdealState = streamIngestionConfig.isUseIdealStateToCalculatePreviousSegment(); + } else { + _alwaysRelyOnIdealState = false; + } + _firstTransitionProcessed = new AtomicBoolean(false); + _serverMetrics = ServerMetrics.get(); + } + + public void acquire(LLCSegmentName llcSegmentName) + throws InterruptedException { + if (_enforceConsumptionInOrder) { + long startTimeMs = System.currentTimeMillis(); + waitForPrevSegment(llcSegmentName); + _serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(), ServerTimer.PREV_SEGMENT_WAIT_TIME_MS, + System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS); + } + + long startTimeMs = System.currentTimeMillis(); + while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: {}ms. Retrying.", llcSegmentName, + System.currentTimeMillis() - startTimeMs); + } + } + + public void release() { + _semaphore.release(); + } + + @VisibleForTesting + Semaphore getSemaphore() { + return _semaphore; + } + + public void trackSegment(LLCSegmentName llcSegmentName) { + _lock.lock(); + try { + if (!_alwaysRelyOnIdealState) { + _maxSegmentSeqNumRegistered = Math.max(_maxSegmentSeqNumRegistered, llcSegmentName.getSequenceNumber()); + } + // notify all helix threads waiting for their offline -> consuming segment's prev segment to be loaded + _condition.signalAll(); + } finally { + _lock.unlock(); + } + } + + private void waitForPrevSegment(LLCSegmentName currSegment) + throws InterruptedException { + + if (_alwaysRelyOnIdealState || !_firstTransitionProcessed.get()) { + // if _alwaysRelyOnIdealState or no offline -> consuming transition has been processed, it means rely on + // ideal state to fetch previous segment. + awaitForPreviousSegmentFromIdealState(currSegment); + + // the first transition will always be prone to error, consider edge case where segment previous to current + // helix transition's segment was deleted and this server came alive after successful deletion. the prev + // segment will not exist, hence first transition is handled using isFirstTransitionSuccessful. + _firstTransitionProcessed.set(true); + return; + } + + // rely on _maxSegmentSeqNumRegistered watermark for previous segment. + if (awaitForPreviousSegmentSequenceNumber(currSegment, WAIT_INTERVAL_MS)) { + return; + } + + // tried using prevSegSeqNumber watermark, but could not acquire the previous segment. + // fallback to acquire prev segment from ideal state. + awaitForPreviousSegmentFromIdealState(currSegment); + } + + private void awaitForPreviousSegmentFromIdealState(LLCSegmentName currSegment) + throws InterruptedException { + String previousSegment = getPreviousSegmentFromIdealState(currSegment); + if (previousSegment == null) { + // previous segment can only be null if either all the previous segments are deleted or this is the starting + // sequence segment of the partition Group. + return; + } + + SegmentDataManager segmentDataManager = _realtimeTableDataManager.acquireSegment(previousSegment); + try { + long startTimeMs = System.currentTimeMillis(); + _lock.lock(); + try { + while (segmentDataManager == null) { + // if segmentDataManager == null, it means segment is not loaded in the server. + // wait until it's loaded. + if (!_condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Semaphore access denied to segment: {}. Waited on previous segment: {} for: {}ms.", + currSegment.getSegmentName(), previousSegment, System.currentTimeMillis() - startTimeMs); + // waited until timeout, fetch previous segment again from ideal state as previous segment might be + // changed in ideal state. + previousSegment = getPreviousSegmentFromIdealState(currSegment); + if (previousSegment == null) { + return; + } + } + segmentDataManager = _realtimeTableDataManager.acquireSegment(previousSegment); + } + } finally { + _lock.unlock(); + } + } finally { + if (segmentDataManager != null) { + _realtimeTableDataManager.releaseSegment(segmentDataManager); + } + } + } + + /*** + * @param currSegment is the segment of current helix transition. + * @param timeoutMs is max time to wait in millis + * @return true if previous Segment was registered to the server, else false. + * @throws InterruptedException + */ + @VisibleForTesting + boolean awaitForPreviousSegmentSequenceNumber(LLCSegmentName currSegment, long timeoutMs) + throws InterruptedException { + long startTimeMs = System.currentTimeMillis(); + int prevSeqNum = currSegment.getSequenceNumber() - 1; + _lock.lock(); + try { + while (_maxSegmentSeqNumRegistered < prevSeqNum) { + // it means the previous segment is not loaded in the server. Wait until it's loaded. + if (!_condition.await(timeoutMs, TimeUnit.MILLISECONDS)) { + LOGGER.warn( + "Semaphore access denied to segment: {}. Waited on previous segment with sequence number: {} for: {}ms.", + currSegment.getSegmentName(), prevSeqNum, System.currentTimeMillis() - startTimeMs); + // waited until the timeout. Rely on ideal state now. + return _maxSegmentSeqNumRegistered >= prevSeqNum; + } + } + return true; + } finally { + _lock.unlock(); + } + } + + @VisibleForTesting + @Nullable + String getPreviousSegmentFromIdealState(LLCSegmentName currSegment) { + long startTimeMs = System.currentTimeMillis(); + // if seq num of current segment is 102, maxSequenceNumBelowCurrentSegment must be highest seq num of any segment + // created before current segment + int maxSequenceNumBelowCurrentSegment = -1; + String previousSegment = null; + int currPartitionGroupId = currSegment.getPartitionGroupId(); + int currSequenceNum = currSegment.getSequenceNumber(); + Map<String, Map<String, String>> segmentAssignment = getSegmentAssignment(); + String currentServerInstanceId = _realtimeTableDataManager.getServerInstance(); + + for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); + String state = instanceStateMap.get(currentServerInstanceId); + + if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) { + // if server is looking for previous segment to current transition's segment, it means the previous segment + // has to be online in the instance. If all previous segments are not online, we just allow the current helix + // transition to go ahead. + continue; + } + + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + if (llcSegmentName == null) { + // ignore uploaded segments + continue; + } + + if (llcSegmentName.getPartitionGroupId() != currPartitionGroupId) { + // ignore segments of different partitions. + continue; + } + + if (llcSegmentName.getSequenceNumber() >= currSequenceNum) { + // ignore segments with higher sequence number than existing helix transition segment. + continue; + } + + if (llcSegmentName.getSequenceNumber() > maxSequenceNumBelowCurrentSegment) { + maxSequenceNumBelowCurrentSegment = llcSegmentName.getSequenceNumber(); + // also track the name of segment + previousSegment = segmentName; + } + } + + long timeSpentMs = System.currentTimeMillis() - startTimeMs; + LOGGER.info("Fetched previous segment: {} to current segment: {} in: {}ms.", previousSegment, currSegment, + timeSpentMs); + _serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(), + ServerTimer.PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS, timeSpentMs, TimeUnit.MILLISECONDS); + + return previousSegment; + } + + @VisibleForTesting + Map<String, Map<String, String>> getSegmentAssignment() { + IdealState idealState = HelixHelper.getTableIdealState(_realtimeTableDataManager.getHelixManager(), + _realtimeTableDataManager.getTableName()); + Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", + _realtimeTableDataManager.getTableName()); + return idealState.getRecord().getMapFields(); + } + + @VisibleForTesting + Lock getLock() { + return _lock; + } + + @VisibleForTesting + AtomicBoolean getFirstTransitionProcessed() { + return _firstTransitionProcessed; + } + + // this should not be used outside of tests. + @VisibleForTesting + int getMaxSegmentSeqNumLoaded() { + return _maxSegmentSeqNumRegistered; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 229799e8d2..9011318891 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -247,7 +247,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // Semaphore for each partitionGroupId only, which is to prevent two different stream consumers // from consuming with the same partitionGroupId in parallel in the same host. // See the comments in {@link RealtimeTableDataManager}. - private final Semaphore _partitionGroupConsumerSemaphore; + private final ConsumerCoordinator _consumerCoordinator; // A boolean flag to check whether the current thread has acquired the semaphore. // This boolean is needed because the semaphore is shared by threads; every thread holding this semaphore can // modify the permit. This boolean make sure the semaphore gets released only once when the partition group stops @@ -1066,7 +1066,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { @VisibleForTesting Semaphore getPartitionGroupConsumerSemaphore() { - return _partitionGroupConsumerSemaphore; + return _consumerCoordinator.getSemaphore(); } @VisibleForTesting @@ -1280,8 +1280,8 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { closePartitionGroupConsumer(); closePartitionMetadataProvider(); if (_acquiredConsumerSemaphore.compareAndSet(true, false)) { - _segmentLogger.info("Releasing the _partitionGroupConsumerSemaphore"); - _partitionGroupConsumerSemaphore.release(); + _segmentLogger.info("Releasing the consumer semaphore"); + _consumerCoordinator.release(); } } @@ -1540,7 +1540,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // If the transition is OFFLINE to ONLINE, the caller should have downloaded the segment and we don't reach here. public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig, - Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore, + Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator consumerCoordinator, ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager, @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier isReadyToConsumeData) throws AttemptsExceededException, RetriableOperationException { @@ -1585,7 +1585,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { _segmentZKMetadata.getEndOffset() == null ? null : _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()), _segmentZKMetadata.getStatus().toString()); - _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore; + _consumerCoordinator = consumerCoordinator; _acquiredConsumerSemaphore = new AtomicBoolean(false); InstanceDataManagerConfig instanceDataManagerConfig = indexLoadingConfig.getInstanceDataManagerConfig(); String clientIdSuffix = @@ -1679,11 +1679,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // Acquire semaphore to create stream consumers try { - long startTimeMs = System.currentTimeMillis(); - while (!_partitionGroupConsumerSemaphore.tryAcquire(5, TimeUnit.MINUTES)) { - _segmentLogger.warn("Failed to acquire partitionGroupConsumerSemaphore in: {} ms. Retrying.", - System.currentTimeMillis() - startTimeMs); - } + _consumerCoordinator.acquire(llcSegmentName); _acquiredConsumerSemaphore.set(true); } catch (InterruptedException e) { String errorMsg = "InterruptedException when acquiring the partitionConsumerSemaphore"; @@ -1714,8 +1710,8 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { // In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released. // Hence releasing the semaphore here to unblock reset operation via Helix Admin. - _segmentLogger.info("Releasing the _partitionGroupConsumerSemaphore"); - _partitionGroupConsumerSemaphore.release(); + _segmentLogger.info("Releasing the consumer semaphore"); + _consumerCoordinator.release(); _acquiredConsumerSemaphore.set(false); _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), "Failed to initialize segment data manager", t)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index b070b743e0..7bffd4fdd8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -75,6 +75,8 @@ import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.FieldSpec; @@ -106,8 +108,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // consuming from the same stream partition can lead to bugs. // The semaphores will stay in the hash map even if the consuming partitions move to a different host. // We expect that there will be a small number of semaphores, but that may be ok. - private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap<>(); - + private final Map<Integer, ConsumerCoordinator> _partitionGroupIdToConsumerCoordinatorMap = + new ConcurrentHashMap<>(); // The old name of the stats file used to be stats.ser which we changed when we moved all packages // from com.linkedin to org.apache because of not being able to deserialize the old files using the newer classes private static final String STATS_FILE_NAME = "segment-stats.ser"; @@ -142,6 +144,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private TableDedupMetadataManager _tableDedupMetadataManager; private TableUpsertMetadataManager _tableUpsertMetadataManager; private BooleanSupplier _isTableReadyToConsumeData; + private boolean _enforceConsumptionInOrder = false; public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) { this(segmentBuildSemaphore, () -> true); @@ -223,6 +226,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { _tableUpsertMetadataManager.init(_tableConfig, schema, this); } + _enforceConsumptionInOrder = isEnforceConsumptionInOrder(); + // For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data if (isDedupEnabled() || isPartialUpsertEnabled()) { _isTableReadyToConsumeData = new BooleanSupplier() { @@ -511,7 +516,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private void doAddConsumingSegment(String segmentName) throws AttemptsExceededException, RetriableOperationException { SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName); - if (zkMetadata.getStatus() != Status.IN_PROGRESS) { + if ((zkMetadata.getStatus() != Status.IN_PROGRESS) && (!_enforceConsumptionInOrder)) { // NOTE: We do not throw exception here because the segment might have just been committed before the state // transition is processed. We can skip adding this segment, and the segment will enter CONSUMING state in // Helix, then we can rely on the following CONSUMING -> ONLINE state transition to add it. @@ -543,7 +548,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // Generates only one semaphore for every partition LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); int partitionGroupId = llcSegmentName.getPartitionGroupId(); - Semaphore semaphore = _partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new Semaphore(1)); + ConsumerCoordinator consumerCoordinator = getConsumerCoordinator(partitionGroupId); // Create the segment data manager and register it PartitionUpsertMetadataManager partitionUpsertMetadataManager = @@ -553,8 +558,9 @@ public class RealtimeTableDataManager extends BaseTableDataManager { _tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null; RealtimeSegmentDataManager realtimeSegmentDataManager = - createRealtimeSegmentDataManager(zkMetadata, tableConfig, indexLoadingConfig, schema, llcSegmentName, semaphore, - partitionUpsertMetadataManager, partitionDedupMetadataManager, _isTableReadyToConsumeData); + createRealtimeSegmentDataManager(zkMetadata, tableConfig, indexLoadingConfig, schema, llcSegmentName, + consumerCoordinator, partitionUpsertMetadataManager, partitionDedupMetadataManager, + _isTableReadyToConsumeData); registerSegment(segmentName, realtimeSegmentDataManager, partitionUpsertMetadataManager); if (partitionUpsertMetadataManager != null) { partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName); @@ -641,12 +647,12 @@ public class RealtimeTableDataManager extends BaseTableDataManager { @VisibleForTesting protected RealtimeSegmentDataManager createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata, TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName, - Semaphore semaphore, PartitionUpsertMetadataManager partitionUpsertMetadataManager, + ConsumerCoordinator consumerCoordinator, PartitionUpsertMetadataManager partitionUpsertMetadataManager, PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier isTableReadyToConsumeData) throws AttemptsExceededException, RetriableOperationException { return new RealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(), - indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager, - partitionDedupMetadataManager, isTableReadyToConsumeData); + indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, _serverMetrics, + partitionUpsertMetadataManager, partitionDedupMetadataManager, isTableReadyToConsumeData); } /** @@ -803,6 +809,21 @@ public class RealtimeTableDataManager extends BaseTableDataManager { registerSegment(segmentName, segmentDataManager); } + @Override + protected SegmentDataManager registerSegment(String segmentName, SegmentDataManager segmentDataManager) { + SegmentDataManager oldSegmentDataManager = super.registerSegment(segmentName, segmentDataManager); + if (_enforceConsumptionInOrder) { + // helix threads might be waiting for their respective previous segments to be loaded. + // they need to be notified here. + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + if (llcSegmentName != null) { + ConsumerCoordinator consumerCoordinator = getConsumerCoordinator(llcSegmentName.getPartitionGroupId()); + consumerCoordinator.trackSegment(llcSegmentName); + } + } + return oldSegmentDataManager; + } + /** * Replaces the CONSUMING segment with a downloaded committed one. */ @@ -852,6 +873,23 @@ public class RealtimeTableDataManager extends BaseTableDataManager { return Collections.emptyMap(); } + @Nullable + public StreamIngestionConfig getStreamIngestionConfig() { + IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig(); + return ingestionConfig != null ? ingestionConfig.getStreamIngestionConfig() : null; + } + + @VisibleForTesting + ConsumerCoordinator getConsumerCoordinator(int partitionGroupId) { + return _partitionGroupIdToConsumerCoordinatorMap.computeIfAbsent(partitionGroupId, + k -> new ConsumerCoordinator(_enforceConsumptionInOrder, this)); + } + + @VisibleForTesting + void setEnforceConsumptionInOrder(boolean enforceConsumptionInOrder) { + _enforceConsumptionInOrder = enforceConsumptionInOrder; + } + /** * Validate a schema against the table config for real-time record consumption. * Ideally, we should validate these things when schema is added or table is created, but either of these @@ -886,4 +924,9 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // 2. Validate the schema itself SchemaUtils.validate(schema); } + + private boolean isEnforceConsumptionInOrder() { + StreamIngestionConfig streamIngestionConfig = getStreamIngestionConfig(); + return streamIngestionConfig != null && streamIngestionConfig.isEnforceConsumptionInOrder(); + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java new file mode 100644 index 0000000000..a2b01a44ea --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java @@ -0,0 +1,485 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.realtime; + +import com.google.common.cache.CacheBuilder; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.util.TestUtils; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class ConsumerCoordinatorTest { + + private static class FakeRealtimeTableDataManager extends RealtimeTableDataManager { + private final StreamIngestionConfig _streamIngestionConfig; + private ConsumerCoordinator _consumerCoordinator; + + public FakeRealtimeTableDataManager(Semaphore segmentBuildSemaphore, + boolean useIdealStateToCalculatePreviousSegment) { + super(segmentBuildSemaphore); + super._recentlyDeletedSegments = CacheBuilder.newBuilder().build(); + StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(List.of(new HashMap<>())); + streamIngestionConfig.setEnforceConsumptionInOrder(true); + if (useIdealStateToCalculatePreviousSegment) { + streamIngestionConfig.setUseIdealStateToCalculatePreviousSegment(true); + } + _streamIngestionConfig = streamIngestionConfig; + } + + @Override + ConsumerCoordinator getConsumerCoordinator(int partitionId) { + return _consumerCoordinator; + } + + public void setConsumerCoordinator(ConsumerCoordinator consumerCoordinator) { + _consumerCoordinator = consumerCoordinator; + } + + @Override + public StreamIngestionConfig getStreamIngestionConfig() { + return _streamIngestionConfig; + } + + @Override + public String getServerInstance() { + return "server_1"; + } + } + + private static class FakeConsumerCoordinator extends ConsumerCoordinator { + private final Map<String, Map<String, String>> _segmentAssignmentMap; + + public FakeConsumerCoordinator(boolean enforceConsumptionInOrder, + RealtimeTableDataManager realtimeTableDataManager) { + super(enforceConsumptionInOrder, realtimeTableDataManager); + Map<String, String> serverSegmentStatusMap = new HashMap<>() {{ + put("server_1", "ONLINE"); + put("server_3", "ONLINE"); + }}; + _segmentAssignmentMap = new HashMap<>() {{ + put("tableTest_REALTIME__1__101__20250304T0035Z", serverSegmentStatusMap); + put("tableTest_REALTIME__2__101__20250304T0035Z", serverSegmentStatusMap); + put("tableTest_REALTIME__2__100__20250304T0035Z", serverSegmentStatusMap); + put("tableTest_REALTIME__1__1__20250304T0035Z", serverSegmentStatusMap); + put("tableTest_REALTIME__1__14__20250304T0035Z", serverSegmentStatusMap); + put("tableTest_REALTIME__1__91__20250304T0035Z", serverSegmentStatusMap); + put("tableTest_REALTIME__1__90__20250304T0035Z", serverSegmentStatusMap); + }}; + } + + @Override + public Map<String, Map<String, String>> getSegmentAssignment() { + return _segmentAssignmentMap; + } + } + + @Test + public void testAwaitForPreviousSegmentSequenceNumber() + throws InterruptedException { + // 1. enable tracking segment seq num. + FakeRealtimeTableDataManager realtimeTableDataManager = new FakeRealtimeTableDataManager(null, false); + realtimeTableDataManager.setEnforceConsumptionInOrder(true); + FakeConsumerCoordinator consumerCoordinator = new FakeConsumerCoordinator(true, realtimeTableDataManager); + realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator); + + // 2. check if thread waits on prev segment seq + AtomicBoolean atomicBoolean = new AtomicBoolean(false); + Thread thread1 = new Thread(() -> { + LLCSegmentName llcSegmentName = getLLCSegment(101); + try { + boolean b = consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 5000); + atomicBoolean.set(b); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + thread1.start(); + + // 3. add prev segment and check if thread is unblocked. + consumerCoordinator.trackSegment(getLLCSegment(100)); + + TestUtils.waitForCondition(aVoid -> atomicBoolean.get(), 4000, + "Thread waiting on previous segment should have been unblocked."); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100); + + // 4. check if second thread waits on prev segment seq until timeout and returns false + AtomicBoolean atomicBoolean2 = new AtomicBoolean(false); + Thread thread2 = new Thread(() -> { + LLCSegmentName llcSegmentName = getLLCSegment(102); + try { + boolean b = consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 500); + atomicBoolean2.set(b); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + thread2.start(); + + Thread.sleep(1500); + + Assert.assertFalse(atomicBoolean2.get()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100); + } + + @Test + public void testFirstConsumer() + throws InterruptedException { + // 1. Enable tracking segment seq num. + FakeRealtimeTableDataManager realtimeTableDataManager = new FakeRealtimeTableDataManager(null, false); + realtimeTableDataManager.setEnforceConsumptionInOrder(true); + FakeConsumerCoordinator consumerCoordinator = new FakeConsumerCoordinator(true, realtimeTableDataManager); + realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator); + ReentrantLock lock = (ReentrantLock) consumerCoordinator.getLock(); + RealtimeSegmentDataManager mockedRealtimeSegmentDataManager = getMockedRealtimeSegmentDataManager(); + Map<String, String> serverSegmentStatusMap = new HashMap<>() {{ + put("server_1", "ONLINE"); + put("server_3", "ONLINE"); + }}; + consumerCoordinator.getSegmentAssignment().put(getSegmentName(100), serverSegmentStatusMap); + consumerCoordinator.getSegmentAssignment().put(getSegmentName(102), serverSegmentStatusMap); + consumerCoordinator.getSegmentAssignment().put(getSegmentName(104), serverSegmentStatusMap); + consumerCoordinator.getSegmentAssignment().put(getSegmentName(106), serverSegmentStatusMap); + consumerCoordinator.getSegmentAssignment().put(getSegmentName(107), serverSegmentStatusMap); + consumerCoordinator.getSegmentAssignment().put(getSegmentName(109), serverSegmentStatusMap); + + // 2. create multiple helix transitions in this order: 106, 109, 104, 107 + Thread thread1 = getNewThread(consumerCoordinator, getLLCSegment(106)); + Thread thread2 = getNewThread(consumerCoordinator, getLLCSegment(109)); + Thread thread3 = getNewThread(consumerCoordinator, getLLCSegment(104)); + Thread thread4 = getNewThread(consumerCoordinator, getLLCSegment(107)); + + thread1.start(); + + Thread.sleep(1000); + + // 3. load segment 100, 101, 102 + realtimeTableDataManager.registerSegment(getSegmentName(100), mockedRealtimeSegmentDataManager); + realtimeTableDataManager.registerSegment(getSegmentName(101), mockedRealtimeSegmentDataManager); + realtimeTableDataManager.registerSegment(getSegmentName(102), mockedRealtimeSegmentDataManager); + Thread.sleep(1000); + + // 4. check all of the above threads wait + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102); + Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get()); + + thread2.start(); + thread3.start(); + thread4.start(); + + Thread.sleep(1000); + + // 5. check that first thread acquiring semaphore is of segment 104 + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102); + Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); + + realtimeTableDataManager.registerSegment(getSegmentName(104), mockedRealtimeSegmentDataManager); + Thread.sleep(1000); + + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 104); + Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); + Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 1); + + // 6. check the next threads acquiring semaphore is 106 + consumerCoordinator.getSemaphore().release(); + realtimeTableDataManager.registerSegment(getSegmentName(106), mockedRealtimeSegmentDataManager); + + Thread.sleep(1000); + + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 106); + Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); + Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 1); + } + + @Test + public void testSequentialOrderNotRelyingOnIdealState() + throws InterruptedException { + // 1. Enable tracking segment seq num. + FakeRealtimeTableDataManager realtimeTableDataManager = new FakeRealtimeTableDataManager(null, false); + realtimeTableDataManager.setEnforceConsumptionInOrder(true); + + FakeConsumerCoordinator consumerCoordinator = new FakeConsumerCoordinator(true, realtimeTableDataManager); + realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator); + ReentrantLock lock = (ReentrantLock) consumerCoordinator.getLock(); + + // 2. check first transition blocked on ideal state + Thread thread1 = getNewThread(consumerCoordinator, getLLCSegment(101)); + thread1.start(); + + Thread.sleep(2000); + + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1); + Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get()); + + RealtimeSegmentDataManager mockedRealtimeSegmentDataManager = getMockedRealtimeSegmentDataManager(); + + // 3. register older segment and check seq num watermark and semaphore. + realtimeTableDataManager.registerSegment(getSegmentName(90), mockedRealtimeSegmentDataManager); + Thread.sleep(1000); + + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 90); + Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get()); + + // 4. register prev segment and check watermark and if thread was unblocked + realtimeTableDataManager.registerSegment(getSegmentName(91), mockedRealtimeSegmentDataManager); + Thread.sleep(1000); + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91); + Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); + + // 5. check that all the following transitions rely on seq num watermark and gets blocked. + Thread thread2 = getNewThread(consumerCoordinator, getLLCSegment(102)); + Thread thread3 = getNewThread(consumerCoordinator, getLLCSegment(103)); + Thread thread4 = getNewThread(consumerCoordinator, getLLCSegment(104)); + thread3.start(); + thread2.start(); + thread4.start(); + + Thread.sleep(2000); + + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91); + Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); + + // 6. check that all above threads are still blocked even if semaphore is released. + consumerCoordinator.getSemaphore().release(); + + Thread.sleep(1000); + + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91); + Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); + + // 6. mark 101 seg as complete. Check 102 acquired the semaphore. + realtimeTableDataManager.registerSegment(getSegmentName(101), mockedRealtimeSegmentDataManager); + + Thread.sleep(1000); + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 101); + Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); + Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 0); + + // 7. register 102 seg, check if seg 103 is waiting on semaphore. + realtimeTableDataManager.registerSegment(getSegmentName(102), mockedRealtimeSegmentDataManager); + + Thread.sleep(1000); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102); + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); + Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 1); + + // 8. release the semaphore and check if semaphore is acquired by seg 103. + consumerCoordinator.getSemaphore().release(); + Thread.sleep(1000); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102); + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); + Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 0); + + // 8. register 103 seg and check if seg 104 is now queued on semaphore + realtimeTableDataManager.registerSegment(getSegmentName(103), mockedRealtimeSegmentDataManager); + Thread.sleep(1000); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 103); + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 1); + } + + @Test + public void testSequentialOrderRelyingOnIdealState() + throws InterruptedException { + FakeRealtimeTableDataManager realtimeTableDataManager = new FakeRealtimeTableDataManager(null, true); + realtimeTableDataManager.setEnforceConsumptionInOrder(true); + + FakeConsumerCoordinator consumerCoordinator = new FakeConsumerCoordinator(true, realtimeTableDataManager); + realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator); + + // 1. test that acquire blocks when prev segment is not loaded. + Thread thread = getNewThread(consumerCoordinator, getLLCSegment(101)); + thread.start(); + + ReentrantLock lock = (ReentrantLock) consumerCoordinator.getLock(); + + RealtimeSegmentDataManager mockedRealtimeSegmentDataManager = Mockito.mock(RealtimeSegmentDataManager.class); + Mockito.when(mockedRealtimeSegmentDataManager.increaseReferenceCount()).thenReturn(true); + Assert.assertNotNull(realtimeTableDataManager); + + // prev segment has seq 91, so registering seq 90 won't do anything. + realtimeTableDataManager.registerSegment(getSegmentName(90), mockedRealtimeSegmentDataManager); + + Thread.sleep(2000); + + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); + + // 2. test that registering prev segment will unblock thread. + realtimeTableDataManager.registerSegment(getSegmentName(91), mockedRealtimeSegmentDataManager); + + TestUtils.waitForCondition(aVoid -> (consumerCoordinator.getSemaphore().availablePermits() == 0), 5000, + "Semaphore must be acquired after registering previous segment"); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get()); + + consumerCoordinator.release(); + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); + Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads()); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1); + realtimeTableDataManager.registerSegment(getSegmentName(101), mockedRealtimeSegmentDataManager); + + // 3. test that segment 103 will be blocked. + Map<String, String> serverSegmentStatusMap = new HashMap<>() {{ + put("server_1", "ONLINE"); + put("server_3", "ONLINE"); + }}; + consumerCoordinator.getSegmentAssignment().put(getSegmentName(102), serverSegmentStatusMap); + consumerCoordinator.getSegmentAssignment().put(getSegmentName(103), serverSegmentStatusMap); + consumerCoordinator.getSegmentAssignment().put(getSegmentName(104), serverSegmentStatusMap); + + Thread thread1 = getNewThread(consumerCoordinator, getLLCSegment(103)); + thread1.start(); + + Thread.sleep(1000); + + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); + + // 3. test that segment 102 will acquire semaphore. + Thread thread2 = getNewThread(consumerCoordinator, getLLCSegment(102)); + thread2.start(); + + Thread.sleep(1000); + + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads()); + + // 4. registering seg 102 should unblock seg 103 + realtimeTableDataManager.registerSegment(getSegmentName(102), mockedRealtimeSegmentDataManager); + + Thread.sleep(1000); + + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 1); + + // 5. releasing semaphore should let seg 103 acquire it + consumerCoordinator.getSemaphore().release(); + + Thread.sleep(1000); + + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 0); + Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked()); + Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1); + } + + @Test + public void testRandomOrder() + throws InterruptedException { + RealtimeTableDataManager realtimeTableDataManager = Mockito.mock(RealtimeTableDataManager.class); + Mockito.when(realtimeTableDataManager.getTableName()).thenReturn("tableTest_REALTIME"); + + FakeConsumerCoordinator consumerCoordinator = new FakeConsumerCoordinator(false, realtimeTableDataManager); + + String segmentName = "tableTest_REALTIME__1__101__20250304T0035Z"; + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + Assert.assertNotNull(llcSegmentName); + consumerCoordinator.acquire(llcSegmentName); + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 0); + Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads()); + + consumerCoordinator.release(); + Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 1); + Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads()); + } + + @Test + public void testPreviousSegment() { + RealtimeTableDataManager realtimeTableDataManager = Mockito.mock(RealtimeTableDataManager.class); + Mockito.when(realtimeTableDataManager.getTableName()).thenReturn("tableTest_REALTIME"); + Mockito.when(realtimeTableDataManager.getServerInstance()).thenReturn("server_1"); + + FakeConsumerCoordinator consumerCoordinator = new FakeConsumerCoordinator(true, realtimeTableDataManager); + + String segmentName = "tableTest_REALTIME__1__101__20250304T0035Z"; + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + Assert.assertNotNull(llcSegmentName); + String previousSegment = consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName); + Assert.assertEquals(previousSegment, "tableTest_REALTIME__1__91__20250304T0035Z"); + + consumerCoordinator.getSegmentAssignment().clear(); + Map<String, String> serverSegmentStatusMap = new HashMap<>() {{ + put("server_3", "ONLINE"); + }}; + consumerCoordinator.getSegmentAssignment().put(getSegmentName(100), serverSegmentStatusMap); + previousSegment = consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName); + Assert.assertNull(previousSegment); + } + + private Thread getNewThread(FakeConsumerCoordinator consumerCoordinator, LLCSegmentName llcSegmentName) { + return new Thread(() -> { + try { + consumerCoordinator.acquire(llcSegmentName); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, String.valueOf(llcSegmentName.getSequenceNumber())); + } + + private RealtimeSegmentDataManager getMockedRealtimeSegmentDataManager() { + RealtimeSegmentDataManager mockedRealtimeSegmentDataManager = Mockito.mock(RealtimeSegmentDataManager.class); + Mockito.when(mockedRealtimeSegmentDataManager.increaseReferenceCount()).thenReturn(true); + Assert.assertNotNull(mockedRealtimeSegmentDataManager); + return mockedRealtimeSegmentDataManager; + } + + private LLCSegmentName getLLCSegment(int seqNum) { + String segmentName = getSegmentName(seqNum); + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + Assert.assertNotNull(llcSegmentName); + return llcSegmentName; + } + + private String getSegmentName(int seqNum) { + return "tableTest_REALTIME__1__" + seqNum + "__20250304T0035Z"; + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index 623d0de37d..72dbf26acb 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -97,7 +97,8 @@ public class RealtimeSegmentDataManagerTest { private static final long START_OFFSET_VALUE = 198L; private static final LongMsgOffset START_OFFSET = new LongMsgOffset(START_OFFSET_VALUE); - private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap<>(); + private final Map<Integer, ConsumerCoordinator> _partitionGroupIdToConsumerCoordinatorMap = + new ConcurrentHashMap<>(); private static TableConfig createTableConfig() throws Exception { @@ -166,12 +167,13 @@ public class RealtimeSegmentDataManagerTest { tableConfig.getIngestionConfig().setRetryOnSegmentBuildPrecheckFailure(true); RealtimeTableDataManager tableDataManager = createTableDataManager(tableConfig); LLCSegmentName llcSegmentName = new LLCSegmentName(SEGMENT_NAME_STR); - _partitionGroupIdToSemaphoreMap.putIfAbsent(PARTITION_GROUP_ID, new Semaphore(1)); + _partitionGroupIdToConsumerCoordinatorMap.putIfAbsent(PARTITION_GROUP_ID, + new ConsumerCoordinator(false, tableDataManager)); Schema schema = Fixtures.createSchema(); ServerMetrics serverMetrics = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); return new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, tableDataManager, new File(TEMP_DIR, REALTIME_TABLE_NAME).getAbsolutePath(), schema, llcSegmentName, - _partitionGroupIdToSemaphoreMap, serverMetrics, timeSupplier); + _partitionGroupIdToConsumerCoordinatorMap, serverMetrics, timeSupplier); } @BeforeClass @@ -992,7 +994,7 @@ public class RealtimeSegmentDataManagerTest { private boolean _notifySegmentBuildFailedWithDeterministicErrorCalled = false; public boolean _throwExceptionFromConsume = false; public boolean _postConsumeStoppedCalled = false; - public Map<Integer, Semaphore> _semaphoreMap; + public Map<Integer, ConsumerCoordinator> _consumerCoordinatorMap; public boolean _stubConsumeLoop = true; private TimeSupplier _timeSupplier; private boolean _indexCapacityThresholdBreached; @@ -1009,12 +1011,13 @@ public class RealtimeSegmentDataManagerTest { public FakeRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, Schema schema, - LLCSegmentName llcSegmentName, Map<Integer, Semaphore> semaphoreMap, ServerMetrics serverMetrics, - TimeSupplier timeSupplier) + LLCSegmentName llcSegmentName, Map<Integer, ConsumerCoordinator> consumerCoordinatorMap, + ServerMetrics serverMetrics, TimeSupplier timeSupplier) throws Exception { super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, llcSegmentName, - semaphoreMap.get(llcSegmentName.getPartitionGroupId()), serverMetrics, null, null, () -> true); + consumerCoordinatorMap.get(llcSegmentName.getPartitionGroupId()), serverMetrics, null, null, + () -> true); _state = RealtimeSegmentDataManager.class.getDeclaredField("_state"); _state.setAccessible(true); _shouldStop = RealtimeSegmentDataManager.class.getDeclaredField("_shouldStop"); @@ -1024,7 +1027,7 @@ public class RealtimeSegmentDataManagerTest { _segmentBuildFailedWithDeterministicError = RealtimeSegmentDataManager.class.getDeclaredField("_segmentBuildFailedWithDeterministicError"); _segmentBuildFailedWithDeterministicError.setAccessible(true); - _semaphoreMap = semaphoreMap; + _consumerCoordinatorMap = consumerCoordinatorMap; _streamMsgOffsetFactory = RealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory"); _streamMsgOffsetFactory.setAccessible(true); _streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java index 960138e8a0..8899fbd723 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java @@ -18,10 +18,10 @@ */ package org.apache.pinot.integration.tests.realtime.utils; -import java.util.concurrent.Semaphore; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; @@ -43,16 +43,15 @@ public class FailureInjectingRealtimeSegmentDataManager extends RealtimeSegmentD /** * Creates a manager that will forcibly fail the commit segment step. */ - public FailureInjectingRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, - TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, - IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName, - Semaphore partitionGroupConsumerSemaphore, ServerMetrics serverMetrics, - boolean failCommit) throws AttemptsExceededException, RetriableOperationException { + public FailureInjectingRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, + RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig, + Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator consumerCoordinator, + ServerMetrics serverMetrics, boolean failCommit) + throws AttemptsExceededException, RetriableOperationException { // Pass through to the real parent constructor - super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, - indexLoadingConfig, schema, llcSegmentName, partitionGroupConsumerSemaphore, serverMetrics, - null /* no PartitionUpsertMetadataManager */, null /* no PartitionDedupMetadataManager */, - () -> true /* isReadyToConsumeData always true for tests */); + super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, indexLoadingConfig, schema, + llcSegmentName, consumerCoordinator, serverMetrics, null /* no PartitionUpsertMetadataManager */, + null /* no PartitionDedupMetadataManager */, () -> true /* isReadyToConsumeData always true for tests */); _failCommit = failCommit; } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java index 278e2bcb58..af65cd1a79 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java @@ -25,6 +25,7 @@ import java.util.function.Supplier; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.PauselessConsumptionUtils; +import org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; @@ -52,7 +53,7 @@ public class FailureInjectingRealtimeTableDataManager extends RealtimeTableDataM @Override protected RealtimeSegmentDataManager createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata, TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName, - Semaphore semaphore, PartitionUpsertMetadataManager partitionUpsertMetadataManager, + ConsumerCoordinator consumerCoordinator, PartitionUpsertMetadataManager partitionUpsertMetadataManager, PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier isTableReadyToConsumeData) throws AttemptsExceededException, RetriableOperationException { @@ -61,6 +62,6 @@ public class FailureInjectingRealtimeTableDataManager extends RealtimeTableDataM addFailureToCommits = false; } return new FailureInjectingRealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(), - indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, addFailureToCommits); + indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, _serverMetrics, addFailureToCommits); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java index e1400620fb..da17a2c020 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java @@ -39,10 +39,17 @@ public class StreamIngestionConfig extends BaseJsonConfig { private boolean _columnMajorSegmentBuilderEnabled = true; @JsonPropertyDescription("Whether to track offsets of the filtered stream messages during consumption.") - private boolean _trackFilteredMessageOffsets = false; + private boolean _trackFilteredMessageOffsets; @JsonPropertyDescription("Whether pauseless consumption is enabled for the table") - private boolean _pauselessConsumptionEnabled = false; + private boolean _pauselessConsumptionEnabled; + + @JsonPropertyDescription("Enforce consumption of segments in order of segment creation by the controller") + private boolean _enforceConsumptionInOrder; + + @JsonPropertyDescription("If enabled, Server always relies on ideal state to get previous segment. If disabled, " + + "server uses sequence id - 1 for previous segment") + private boolean _useIdealStateToCalculatePreviousSegment; @JsonPropertyDescription("Policy to determine the behaviour of parallel consumption.") private ParallelSegmentConsumptionPolicy _parallelSegmentConsumptionPolicy; @@ -80,6 +87,22 @@ public class StreamIngestionConfig extends BaseJsonConfig { _pauselessConsumptionEnabled = pauselessConsumptionEnabled; } + public boolean isEnforceConsumptionInOrder() { + return _enforceConsumptionInOrder; + } + + public void setEnforceConsumptionInOrder(boolean enforceConsumptionInOrder) { + _enforceConsumptionInOrder = enforceConsumptionInOrder; + } + + public boolean isUseIdealStateToCalculatePreviousSegment() { + return _useIdealStateToCalculatePreviousSegment; + } + + public void setUseIdealStateToCalculatePreviousSegment(boolean useIdealStateToCalculatePreviousSegment) { + _useIdealStateToCalculatePreviousSegment = useIdealStateToCalculatePreviousSegment; + } + @Nullable public ParallelSegmentConsumptionPolicy getParallelSegmentConsumptionPolicy() { return _parallelSegmentConsumptionPolicy; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org