This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9f5fe2b7be Fix the potential deadlock for partial-upsert segment loading check (#10198) 9f5fe2b7be is described below commit 9f5fe2b7be241f778890483de05a1b45fed4d2d5 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Sun Jan 29 23:38:20 2023 -0800 Fix the potential deadlock for partial-upsert segment loading check (#10198) --- .../realtime/LLRealtimeSegmentDataManager.java | 19 ++++- .../manager/realtime/RealtimeTableDataManager.java | 64 ++++++++++----- .../realtime/LLRealtimeSegmentDataManagerTest.java | 2 +- .../local/utils/tablestate/TableStateUtils.java | 90 +++++++++++----------- 4 files changed, 104 insertions(+), 71 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index cffebf90db..b3607e6a06 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -34,6 +34,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.Utils; @@ -232,6 +233,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private final AtomicBoolean _acquiredConsumerSemaphore; private final String _metricKeyName; private final ServerMetrics _serverMetrics; + private final BooleanSupplier _isReadyToConsumeData; private final MutableSegmentImpl _realtimeSegment; private volatile StreamPartitionMsgOffset _currentOffset; private volatile State _state; @@ -395,6 +397,17 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { protected boolean consumeLoop() throws Exception { + // At this point, we know that we can potentially move the offset, so the old saved segment file is not valid + // anymore. Remove the file if it exists. + removeSegmentFile(); + + if (!_isReadyToConsumeData.getAsBoolean()) { + do { + //noinspection BusyWait + Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS); + } while (!_shouldStop && !endCriteriaReached() && _isReadyToConsumeData.getAsBoolean()); + } + _numRowsErrored = 0; final long idlePipeSleepTimeMillis = 100; final long idleTimeoutMillis = _partitionLevelStreamConfig.getIdleTimeoutMillis(); @@ -403,9 +416,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { StreamPartitionMsgOffset lastUpdatedOffset = _streamPartitionMsgOffsetFactory .create(_currentOffset); // so that we always update the metric when we enter this method. - // At this point, we know that we can potentially move the offset, so the old saved segment file is not valid - // anymore. Remove the file if it exists. - removeSegmentFile(); _segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", _currentOffset, _finalOffset); while (!_shouldStop && !endCriteriaReached()) { @@ -1263,7 +1273,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore, ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager, - @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager) { + @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier isReadyToConsumeData) { _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore(); _segmentZKMetadata = segmentZKMetadata; _tableConfig = tableConfig; @@ -1273,6 +1283,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _indexLoadingConfig = indexLoadingConfig; _schema = schema; _serverMetrics = serverMetrics; + _isReadyToConsumeData = isReadyToConsumeData; _segmentVersion = indexLoadingConfig.getSegmentVersion(); _instanceId = _realtimeTableDataManager.getServerInstance(); _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 48b849834f..9315770943 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -31,7 +31,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections.CollectionUtils; @@ -114,13 +114,17 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // likely that we get fresh data each time instead of multiple copies of roughly same data. private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30; - private final AtomicBoolean _allSegmentsLoaded = new AtomicBoolean(); + public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); + + // TODO: Change it to BooleanSupplier + private final Supplier<Boolean> _isServerReadyToServeQueries; - private TableDedupMetadataManager _tableDedupMetadataManager; - private TableUpsertMetadataManager _tableUpsertMetadataManager; // Object to track ingestion delay for all partitions private IngestionDelayTracker _ingestionDelayTracker; - private final Supplier<Boolean> _isServerReadyToServeQueries; + + private TableDedupMetadataManager _tableDedupMetadataManager; + private TableUpsertMetadataManager _tableUpsertMetadataManager; + private BooleanSupplier _isTableReadyToConsumeData; public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) { this(segmentBuildSemaphore, () -> true); @@ -135,8 +139,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { protected void doInit() { _leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, _serverMetrics, _tableNameWithType); // Tracks ingestion delay of all partitions being served for this table - _ingestionDelayTracker = new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this, - _isServerReadyToServeQueries); + _ingestionDelayTracker = + new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this, _isServerReadyToServeQueries); File statsFile = new File(_tableDataDir, STATS_FILE_NAME); try { _statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile); @@ -203,6 +207,36 @@ public class RealtimeTableDataManager extends BaseTableDataManager { Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics); } + + // For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data + if (isDedupEnabled() || isPartialUpsertEnabled()) { + _isTableReadyToConsumeData = new BooleanSupplier() { + volatile boolean _allSegmentsLoaded; + long _lastCheckTimeMs; + + @Override + public boolean getAsBoolean() { + if (_allSegmentsLoaded) { + return true; + } else { + synchronized (this) { + if (_allSegmentsLoaded) { + return true; + } + long currentTimeMs = System.currentTimeMillis(); + if (currentTimeMs - _lastCheckTimeMs <= READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS) { + return false; + } + _lastCheckTimeMs = currentTimeMs; + _allSegmentsLoaded = TableStateUtils.isAllSegmentsLoaded(_helixManager, _tableNameWithType); + return _allSegmentsLoaded; + } + } + } + }; + } else { + _isTableReadyToConsumeData = () -> true; + } } @Override @@ -265,7 +299,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { /** * Returns all partitionGroupIds for the partitions hosted by this server for current table. - * @apiNote this involves Zookeeper read and should not be used frequently due to efficiency concerns. + * @apiNote this involves Zookeeper read and should not be used frequently due to efficiency concerns. */ public Set<Integer> getHostedPartitionsGroupIds() { Set<Integer> partitionsHostedByThisServer = new HashSet<>(); @@ -401,22 +435,10 @@ public class RealtimeTableDataManager extends BaseTableDataManager { PartitionDedupMetadataManager partitionDedupMetadataManager = _tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null; - // For dedup and partial-upsert, wait for all segments loaded before creating the consuming segment - if (isDedupEnabled() || isPartialUpsertEnabled()) { - if (!_allSegmentsLoaded.get()) { - synchronized (_allSegmentsLoaded) { - if (!_allSegmentsLoaded.get()) { - TableStateUtils.waitForAllSegmentsLoaded(_helixManager, _tableNameWithType); - _allSegmentsLoaded.set(true); - } - } - } - } - segmentDataManager = new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager, - partitionDedupMetadataManager); + partitionDedupMetadataManager, _isTableReadyToConsumeData); } else { InstanceZKMetadata instanceZKMetadata = ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, _instanceId); segmentDataManager = new HLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, instanceZKMetadata, this, diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java index b6c290cde6..fd75a9a0f0 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java @@ -1001,7 +1001,7 @@ public class LLRealtimeSegmentDataManagerTest { throws Exception { super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, llcSegmentName, - semaphoreMap.get(llcSegmentName.getPartitionGroupId()), serverMetrics, null, null); + semaphoreMap.get(llcSegmentName.getPartitionGroupId()), serverMetrics, null, null, () -> true); _state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state"); _state.setAccessible(true); _shouldStop = LLRealtimeSegmentDataManager.class.getDeclaredField("_shouldStop"); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java index c11886f613..6de4536de1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java @@ -27,13 +27,14 @@ import org.apache.helix.PropertyKey; import org.apache.helix.model.CurrentState; import org.apache.helix.model.IdealState; import org.apache.helix.model.LiveInstance; -import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TableStateUtils { private static final Logger LOGGER = LoggerFactory.getLogger(TableStateUtils.class); + private static final int MAX_NUM_SEGMENTS_TO_LOG = 10; private TableStateUtils() { } @@ -83,58 +84,57 @@ public class TableStateUtils { * @return true if all segments for the given table are succesfully loaded. False otherwise */ public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType) { + List<String> onlineSegments = + getSegmentsInGivenStateForThisInstance(helixManager, tableNameWithType, SegmentStateModel.ONLINE); + if (onlineSegments.isEmpty()) { + LOGGER.info("No ONLINE segment found for table: {}", tableNameWithType); + return true; + } + + // Check if ideal state and current state matches for all segments assigned to the current instance HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); String instanceName = helixManager.getInstanceName(); - - List<String> onlineSegments = getSegmentsInGivenStateForThisInstance(helixManager, tableNameWithType, - CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE); - if (onlineSegments.size() > 0) { - LiveInstance liveInstance = dataAccessor.getProperty(keyBuilder.liveInstance(instanceName)); - if (liveInstance == null) { - LOGGER.warn("Failed to find live instance for instance: {}", instanceName); - return false; - } - String sessionId = liveInstance.getEphemeralOwner(); - CurrentState currentState = - dataAccessor.getProperty(keyBuilder.currentState(instanceName, sessionId, tableNameWithType)); - if (currentState == null) { - LOGGER.warn("Failed to find current state for instance: {}, sessionId: {}, table: {}", instanceName, sessionId, - tableNameWithType); - return false; - } - // Check if ideal state and current state matches for all segments assigned to the current instance - Map<String, String> currentStateMap = currentState.getPartitionStateMap(); - - for (String segmentName : onlineSegments) { - String actualState = currentStateMap.get(segmentName); - if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(actualState)) { - if (CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(actualState)) { - LOGGER.error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, tableNameWithType, - CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE); - } else { - LOGGER.info("Find unloaded segment: {}, table: {}, expected: {}, actual: {}", segmentName, - tableNameWithType, CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE, actualState); - } + LiveInstance liveInstance = dataAccessor.getProperty(keyBuilder.liveInstance(instanceName)); + if (liveInstance == null) { + LOGGER.warn("Failed to find live instance for instance: {}", instanceName); + return false; + } + String sessionId = liveInstance.getEphemeralOwner(); + CurrentState currentState = + dataAccessor.getProperty(keyBuilder.currentState(instanceName, sessionId, tableNameWithType)); + if (currentState == null) { + LOGGER.warn("Failed to find current state for instance: {}, sessionId: {}, table: {}", instanceName, sessionId, + tableNameWithType); + return false; + } + List<String> unloadedSegments = new ArrayList<>(); + Map<String, String> currentStateMap = currentState.getPartitionStateMap(); + for (String segmentName : onlineSegments) { + String actualState = currentStateMap.get(segmentName); + if (!SegmentStateModel.ONLINE.equals(actualState)) { + if (SegmentStateModel.ERROR.equals(actualState)) { + LOGGER.error("Found segment: {}, table: {} in ERROR state, expected: {}", segmentName, tableNameWithType, + SegmentStateModel.ONLINE); return false; + } else { + unloadedSegments.add(segmentName); } } } - - LOGGER.info("All segments loaded for table: {}", tableNameWithType); - return true; - } - - public static void waitForAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType) { - try { - while (!TableStateUtils.isAllSegmentsLoaded(helixManager, tableNameWithType)) { - LOGGER.info("Sleeping 1 second waiting for all segments loaded for table: {}", tableNameWithType); - //noinspection BusyWait - Thread.sleep(1000L); + if (unloadedSegments.isEmpty()) { + LOGGER.info("All segments loaded for table: {}", tableNameWithType); + return true; + } else { + int numUnloadedSegments = unloadedSegments.size(); + if (numUnloadedSegments <= MAX_NUM_SEGMENTS_TO_LOG) { + LOGGER.info("Found {} unloaded segments: {} for table: {}", numUnloadedSegments, unloadedSegments, + tableNameWithType); + } else { + LOGGER.info("Found {} unloaded segments: {}... for table: {}", numUnloadedSegments, + unloadedSegments.subList(0, MAX_NUM_SEGMENTS_TO_LOG), tableNameWithType); } - } catch (Exception e) { - throw new RuntimeException( - "Caught exception while waiting for all segments loaded for table: " + tableNameWithType, e); + return false; } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org