This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 33074e1e7e Work around the problem of Helix sending 2 transitions for CONSUMING -> DROPPED (#12351) 33074e1e7e is described below commit 33074e1e7ec807251c8163d1807c61f5f80e8853 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Feb 1 14:13:39 2024 -0800 Work around the problem of Helix sending 2 transitions for CONSUMING -> DROPPED (#12351) --- .../SegmentOnlineOfflineStateModelFactory.java | 40 ++++++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index 42d1642c7b..b0625c879b 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -19,6 +19,10 @@ package org.apache.pinot.server.starter.helix; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.NotificationContext; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateModel; @@ -46,6 +50,13 @@ import org.slf4j.LoggerFactory; * 3. Delete an existed segment. */ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { + // NOTE: Helix might process CONSUMING -> DROPPED transition as 2 separate transitions: CONSUMING -> OFFLINE followed + // by OFFLINE -> DROPPED. Use this cache to track the segments that just went through CONSUMING -> OFFLINE transition + // to detect CONSUMING -> DROPPED transition. + // TODO: Check how Helix handle CONSUMING -> DROPPED transition and remove this cache if it's not needed. + private final Cache<Pair<String, String>, Boolean> _recentlyOffloadedConsumingSegments = + CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build(); + private final String _instanceId; private final InstanceDataManager _instanceDataManager; @@ -139,6 +150,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta String segmentName = message.getPartitionName(); try { _instanceDataManager.offloadSegment(realtimeTableName, segmentName); + _recentlyOffloadedConsumingSegments.put(Pair.of(realtimeTableName, segmentName), true); } catch (Exception e) { _logger.error("Caught exception in state transition CONSUMING -> OFFLINE for table: {}, segment: {}", realtimeTableName, segmentName, e); @@ -151,12 +163,10 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " + message); String realtimeTableName = message.getResourceName(); String segmentName = message.getPartitionName(); - TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); - Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName); - tableDataManager.onConsumingToDropped(segmentName); try { _instanceDataManager.offloadSegment(realtimeTableName, segmentName); _instanceDataManager.deleteSegment(realtimeTableName, segmentName); + onConsumingToDropped(realtimeTableName, segmentName); } catch (Exception e) { _logger.error("Caught exception in state transition CONSUMING -> DROPPED for table: {}, segment: {}", realtimeTableName, segmentName, e); @@ -164,6 +174,21 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta } } + /** + * Should be invoked after segment is offloaded and deleted so that it can safely release the resources from table + * data manager. + */ + private void onConsumingToDropped(String realtimeTableName, String segmentName) { + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); + if (tableDataManager == null) { + _logger.warn( + "Failed to find data manager for table: {}, skip invoking consuming to dropped callback for segment: {}", + realtimeTableName, segmentName); + return; + } + tableDataManager.onConsumingToDropped(segmentName); + } + @Transition(from = "OFFLINE", to = "ONLINE") public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { _logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message); @@ -215,6 +240,15 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta String segmentName = message.getPartitionName(); try { _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + + // Check if the segment is recently offloaded from CONSUMING to OFFLINE + if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + Pair<String, String> tableSegmentPair = Pair.of(tableNameWithType, segmentName); + if (_recentlyOffloadedConsumingSegments.getIfPresent(tableSegmentPair) != null) { + _recentlyOffloadedConsumingSegments.invalidate(tableSegmentPair); + onConsumingToDropped(tableNameWithType, segmentName); + } + } } catch (Exception e) { _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}", tableNameWithType, segmentName, e); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org