This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e5d9a04650 Unblocks helix thread for online segment (#15246) e5d9a04650 is described below commit e5d9a046506270124b8c3d90b8db4e301cc7701d Author: NOOB <43700604+noob-se...@users.noreply.github.com> AuthorDate: Fri Mar 28 15:00:02 2025 +0530 Unblocks helix thread for online segment (#15246) * Fixes helix threads getting blocked edge case * Increase wait time * addresses PR comment * nit * Releases partitionGroupConsumerSemaphore incase of Errors * keeps condition consistent with RealtimeTableDataManager * nit * Adds dedup and upsert condition * handle npe * refactoring * nmit * nit * nit * decrease time wait * fix condition * updates allowConsumptionDuringCommit * fixes lint * nit * nit * unblocks helix threads * reverts prev condition * fixes lint * Adds test * Adds comment * nit * fixes test * nit --- .../data/manager/realtime/ConsumerCoordinator.java | 47 ++++++++++++++++++++-- .../manager/realtime/RealtimeTableDataManager.java | 24 ++++++++--- .../realtime/SegmentAlreadyConsumedException.java | 26 ++++++++++++ .../manager/realtime/ConsumerCoordinatorTest.java | 29 +++++++++++++ 4 files changed, 117 insertions(+), 9 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java index 75ba5f73d4..35dac8b2ec 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; import org.apache.helix.model.IdealState; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerTimer; import org.apache.pinot.common.utils.LLCSegmentName; @@ -87,8 +88,13 @@ public class ConsumerCoordinator { long startTimeMs = System.currentTimeMillis(); while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) { - LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: {}ms. Retrying.", llcSegmentName, + String currSegmentName = llcSegmentName.getSegmentName(); + LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: {}ms. Retrying.", currSegmentName, System.currentTimeMillis() - startTimeMs); + + if (isSegmentAlreadyConsumed(currSegmentName)) { + throw new SegmentAlreadyConsumedException(currSegmentName); + } } } @@ -159,6 +165,15 @@ public class ConsumerCoordinator { if (!_condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) { LOGGER.warn("Semaphore access denied to segment: {}. Waited on previous segment: {} for: {}ms.", currSegment.getSegmentName(), previousSegment, System.currentTimeMillis() - startTimeMs); + + if (isSegmentAlreadyConsumed(currSegment.getSegmentName())) { + // if segment is already consumed, just return from here. + // NOTE: if segment is deleted, this segment will never be registered and helix thread waiting on + // watermark for prev segment won't be notified. All such helix threads will fallback to rely on ideal + // state for previous segment. + throw new SegmentAlreadyConsumedException(currSegment.getSegmentName()); + } + // waited until timeout, fetch previous segment again from ideal state as previous segment might be // changed in ideal state. previousSegment = getPreviousSegmentFromIdealState(currSegment); @@ -197,6 +212,15 @@ public class ConsumerCoordinator { LOGGER.warn( "Semaphore access denied to segment: {}. Waited on previous segment with sequence number: {} for: {}ms.", currSegment.getSegmentName(), prevSeqNum, System.currentTimeMillis() - startTimeMs); + + if (isSegmentAlreadyConsumed(currSegment.getSegmentName())) { + // if segment is already consumed, just return from here. + // NOTE: if segment is deleted, this segment will never be registered and helix thread waiting on + // watermark for prev segment won't be notified. All such helix threads will fallback to rely on ideal + // state for previous segment. + throw new SegmentAlreadyConsumedException(currSegment.getSegmentName()); + } + // waited until the timeout. Rely on ideal state now. return _maxSegmentSeqNumRegistered >= prevSeqNum; } @@ -256,8 +280,8 @@ public class ConsumerCoordinator { } long timeSpentMs = System.currentTimeMillis() - startTimeMs; - LOGGER.info("Fetched previous segment: {} to current segment: {} in: {}ms.", previousSegment, currSegment, - timeSpentMs); + LOGGER.info("Fetched previous segment: {} to current segment: {} in: {}ms.", previousSegment, + currSegment.getSegmentName(), timeSpentMs); _serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(), ServerTimer.PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS, timeSpentMs, TimeUnit.MILLISECONDS); @@ -288,4 +312,21 @@ public class ConsumerCoordinator { int getMaxSegmentSeqNumLoaded() { return _maxSegmentSeqNumRegistered; } + + @VisibleForTesting + boolean isSegmentAlreadyConsumed(String currSegmentName) { + SegmentZKMetadata segmentZKMetadata = _realtimeTableDataManager.fetchZKMetadata(currSegmentName); + if (segmentZKMetadata == null) { + // segment is deleted. no need to consume. + LOGGER.warn("Skipping consumption for segment: {} because ZK metadata does not exists.", currSegmentName); + return true; + } + if (segmentZKMetadata.getStatus().isCompleted()) { + // if segment is done or uploaded, no need to consume. + LOGGER.warn("Skipping consumption for segment: {} because ZK status is already marked as completed.", + currSegmentName); + return true; + } + return false; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 7bffd4fdd8..668da1b748 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -516,11 +516,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private void doAddConsumingSegment(String segmentName) throws AttemptsExceededException, RetriableOperationException { SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName); - if ((zkMetadata.getStatus() != Status.IN_PROGRESS) && (!_enforceConsumptionInOrder)) { + if ((zkMetadata == null) || (zkMetadata.getStatus().isCompleted())) { // NOTE: We do not throw exception here because the segment might have just been committed before the state // transition is processed. We can skip adding this segment, and the segment will enter CONSUMING state in // Helix, then we can rely on the following CONSUMING -> ONLINE state transition to add it. - _logger.warn("Segment: {} is already committed, skipping adding it as CONSUMING segment", segmentName); + _logger.warn("Segment: {} is already consumed, skipping adding it as CONSUMING segment", segmentName); return; } IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig(); @@ -557,10 +557,22 @@ public class RealtimeTableDataManager extends BaseTableDataManager { PartitionDedupMetadataManager partitionDedupMetadataManager = _tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null; - RealtimeSegmentDataManager realtimeSegmentDataManager = - createRealtimeSegmentDataManager(zkMetadata, tableConfig, indexLoadingConfig, schema, llcSegmentName, - consumerCoordinator, partitionUpsertMetadataManager, partitionDedupMetadataManager, - _isTableReadyToConsumeData); + RealtimeSegmentDataManager realtimeSegmentDataManager; + try { + realtimeSegmentDataManager = + createRealtimeSegmentDataManager(zkMetadata, tableConfig, indexLoadingConfig, schema, llcSegmentName, + consumerCoordinator, partitionUpsertMetadataManager, partitionDedupMetadataManager, + _isTableReadyToConsumeData); + } catch (SegmentAlreadyConsumedException e) { + // Don't register segment. + // If segment is not deleted, Eventually this server should receive a CONSUMING -> ONLINE helix state transition. + // If consumption in order is enforced: + // 1. If segment was deleted: Helix thread waiting on this deleted segment will fallback to fetch prev segment + // from ideal state. + // 2. If segment is not deleted, Helix thread waiting on this segment will be notified and unblocked during + // consuming -> online transition of this segment. + return; + } registerSegment(segmentName, realtimeSegmentDataManager, partitionUpsertMetadataManager); if (partitionUpsertMetadataManager != null) { partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java new file mode 100644 index 0000000000..6b04920dff --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.realtime; + +public class SegmentAlreadyConsumedException extends RuntimeException { + + public SegmentAlreadyConsumedException(String currSegmentName) { + super("Skipping consumption for segment: " + currSegmentName); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java index a2b01a44ea..5041bfade7 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java @@ -25,8 +25,10 @@ import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; import org.mockito.Mockito; import org.testng.Assert; @@ -455,6 +457,33 @@ public class ConsumerCoordinatorTest { Assert.assertNull(previousSegment); } + @Test + public void testIfSegmentIsConsumed() { + RealtimeTableDataManager realtimeTableDataManager = Mockito.mock(RealtimeTableDataManager.class); + Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(null); + + FakeConsumerCoordinator consumerCoordinator = new FakeConsumerCoordinator(true, realtimeTableDataManager); + Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101))); + + SegmentZKMetadata mockSegmentZKMetadata = Mockito.mock(SegmentZKMetadata.class); + + Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.IN_PROGRESS); + Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata); + Assert.assertFalse(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101))); + + Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.COMMITTING); + Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata); + Assert.assertFalse(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101))); + + Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.DONE); + Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata); + Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101))); + + Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.UPLOADED); + Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata); + Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101))); + } + private Thread getNewThread(FakeConsumerCoordinator consumerCoordinator, LLCSegmentName llcSegmentName) { return new Thread(() -> { try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org