This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f5fe2b7be Fix the potential deadlock for partial-upsert segment 
loading check (#10198)
9f5fe2b7be is described below

commit 9f5fe2b7be241f778890483de05a1b45fed4d2d5
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Sun Jan 29 23:38:20 2023 -0800

    Fix the potential deadlock for partial-upsert segment loading check (#10198)
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 19 ++++-
 .../manager/realtime/RealtimeTableDataManager.java | 64 ++++++++++-----
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  2 +-
 .../local/utils/tablestate/TableStateUtils.java    | 90 +++++++++++-----------
 4 files changed, 104 insertions(+), 71 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index cffebf90db..b3607e6a06 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.Utils;
@@ -232,6 +233,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private final AtomicBoolean _acquiredConsumerSemaphore;
   private final String _metricKeyName;
   private final ServerMetrics _serverMetrics;
+  private final BooleanSupplier _isReadyToConsumeData;
   private final MutableSegmentImpl _realtimeSegment;
   private volatile StreamPartitionMsgOffset _currentOffset;
   private volatile State _state;
@@ -395,6 +397,17 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
   protected boolean consumeLoop()
       throws Exception {
+    // At this point, we know that we can potentially move the offset, so the 
old saved segment file is not valid
+    // anymore. Remove the file if it exists.
+    removeSegmentFile();
+
+    if (!_isReadyToConsumeData.getAsBoolean()) {
+      do {
+        //noinspection BusyWait
+        
Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+      } while (!_shouldStop && !endCriteriaReached() && 
_isReadyToConsumeData.getAsBoolean());
+    }
+
     _numRowsErrored = 0;
     final long idlePipeSleepTimeMillis = 100;
     final long idleTimeoutMillis = 
_partitionLevelStreamConfig.getIdleTimeoutMillis();
@@ -403,9 +416,6 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
     StreamPartitionMsgOffset lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory
         .create(_currentOffset);  // so that we always update the metric when 
we enter this method.
-    // At this point, we know that we can potentially move the offset, so the 
old saved segment file is not valid
-    // anymore. Remove the file if it exists.
-    removeSegmentFile();
 
     _segmentLogger.info("Starting consumption loop start offset {}, 
finalOffset {}", _currentOffset, _finalOffset);
     while (!_shouldStop && !endCriteriaReached()) {
@@ -1263,7 +1273,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       RealtimeTableDataManager realtimeTableDataManager, String 
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
       Schema schema, LLCSegmentName llcSegmentName, Semaphore 
partitionGroupConsumerSemaphore,
       ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager 
partitionUpsertMetadataManager,
-      @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager) {
+      @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager, 
BooleanSupplier isReadyToConsumeData) {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = segmentZKMetadata;
     _tableConfig = tableConfig;
@@ -1273,6 +1283,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     _indexLoadingConfig = indexLoadingConfig;
     _schema = schema;
     _serverMetrics = serverMetrics;
+    _isReadyToConsumeData = isReadyToConsumeData;
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getServerInstance();
     _leaseExtender = 
SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 48b849834f..9315770943 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -31,7 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
 import java.util.function.Supplier;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections.CollectionUtils;
@@ -114,13 +114,17 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   // likely that we get fresh data each time instead of multiple copies of 
roughly same data.
   private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;
 
-  private final AtomicBoolean _allSegmentsLoaded = new AtomicBoolean();
+  public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(5);
+
+  // TODO: Change it to BooleanSupplier
+  private final Supplier<Boolean> _isServerReadyToServeQueries;
 
-  private TableDedupMetadataManager _tableDedupMetadataManager;
-  private TableUpsertMetadataManager _tableUpsertMetadataManager;
   // Object to track ingestion delay for all partitions
   private IngestionDelayTracker _ingestionDelayTracker;
-  private final Supplier<Boolean> _isServerReadyToServeQueries;
+
+  private TableDedupMetadataManager _tableDedupMetadataManager;
+  private TableUpsertMetadataManager _tableUpsertMetadataManager;
+  private BooleanSupplier _isTableReadyToConsumeData;
 
   public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
     this(segmentBuildSemaphore, () -> true);
@@ -135,8 +139,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   protected void doInit() {
     _leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, 
_serverMetrics, _tableNameWithType);
     // Tracks ingestion delay of all partitions being served for this table
-    _ingestionDelayTracker = new IngestionDelayTracker(_serverMetrics, 
_tableNameWithType, this,
-        _isServerReadyToServeQueries);
+    _ingestionDelayTracker =
+        new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this, 
_isServerReadyToServeQueries);
     File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
     try {
       _statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile);
@@ -203,6 +207,36 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", _tableNameWithType);
       _tableUpsertMetadataManager = 
TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, 
_serverMetrics);
     }
+
+    // For dedup and partial-upsert, need to wait for all segments loaded 
before starting consuming data
+    if (isDedupEnabled() || isPartialUpsertEnabled()) {
+      _isTableReadyToConsumeData = new BooleanSupplier() {
+        volatile boolean _allSegmentsLoaded;
+        long _lastCheckTimeMs;
+
+        @Override
+        public boolean getAsBoolean() {
+          if (_allSegmentsLoaded) {
+            return true;
+          } else {
+            synchronized (this) {
+              if (_allSegmentsLoaded) {
+                return true;
+              }
+              long currentTimeMs = System.currentTimeMillis();
+              if (currentTimeMs - _lastCheckTimeMs <= 
READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS) {
+                return false;
+              }
+              _lastCheckTimeMs = currentTimeMs;
+              _allSegmentsLoaded = 
TableStateUtils.isAllSegmentsLoaded(_helixManager, _tableNameWithType);
+              return _allSegmentsLoaded;
+            }
+          }
+        }
+      };
+    } else {
+      _isTableReadyToConsumeData = () -> true;
+    }
   }
 
   @Override
@@ -265,7 +299,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   /**
    * Returns all partitionGroupIds for the partitions hosted by this server 
for current table.
-   * @apiNote  this involves Zookeeper read and should not be used frequently 
due to efficiency concerns.
+   * @apiNote this involves Zookeeper read and should not be used frequently 
due to efficiency concerns.
    */
   public Set<Integer> getHostedPartitionsGroupIds() {
     Set<Integer> partitionsHostedByThisServer = new HashSet<>();
@@ -401,22 +435,10 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       PartitionDedupMetadataManager partitionDedupMetadataManager =
           _tableDedupMetadataManager != null ? 
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
               : null;
-      // For dedup and partial-upsert, wait for all segments loaded before 
creating the consuming segment
-      if (isDedupEnabled() || isPartialUpsertEnabled()) {
-        if (!_allSegmentsLoaded.get()) {
-          synchronized (_allSegmentsLoaded) {
-            if (!_allSegmentsLoaded.get()) {
-              TableStateUtils.waitForAllSegmentsLoaded(_helixManager, 
_tableNameWithType);
-              _allSegmentsLoaded.set(true);
-            }
-          }
-        }
-      }
-
       segmentDataManager =
           new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, 
this, _indexDir.getAbsolutePath(),
               indexLoadingConfig, schema, llcSegmentName, semaphore, 
_serverMetrics, partitionUpsertMetadataManager,
-              partitionDedupMetadataManager);
+              partitionDedupMetadataManager, _isTableReadyToConsumeData);
     } else {
       InstanceZKMetadata instanceZKMetadata = 
ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, _instanceId);
       segmentDataManager = new HLRealtimeSegmentDataManager(segmentZKMetadata, 
tableConfig, instanceZKMetadata, this,
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index b6c290cde6..fd75a9a0f0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -1001,7 +1001,7 @@ public class LLRealtimeSegmentDataManagerTest {
         throws Exception {
       super(segmentZKMetadata, tableConfig, realtimeTableDataManager, 
resourceDataDir,
           new IndexLoadingConfig(makeInstanceDataManagerConfig(), 
tableConfig), schema, llcSegmentName,
-          semaphoreMap.get(llcSegmentName.getPartitionGroupId()), 
serverMetrics, null, null);
+          semaphoreMap.get(llcSegmentName.getPartitionGroupId()), 
serverMetrics, null, null, () -> true);
       _state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state");
       _state.setAccessible(true);
       _shouldStop = 
LLRealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index c11886f613..6de4536de1 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -27,13 +27,14 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
-import org.apache.pinot.spi.utils.CommonConstants;
+import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class TableStateUtils {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableStateUtils.class);
+  private static final int MAX_NUM_SEGMENTS_TO_LOG = 10;
 
   private TableStateUtils() {
   }
@@ -83,58 +84,57 @@ public class TableStateUtils {
    * @return true if all segments for the given table are succesfully loaded. 
False otherwise
    */
   public static boolean isAllSegmentsLoaded(HelixManager helixManager, String 
tableNameWithType) {
+    List<String> onlineSegments =
+        getSegmentsInGivenStateForThisInstance(helixManager, 
tableNameWithType, SegmentStateModel.ONLINE);
+    if (onlineSegments.isEmpty()) {
+      LOGGER.info("No ONLINE segment found for table: {}", tableNameWithType);
+      return true;
+    }
+
+    // Check if ideal state and current state matches for all segments 
assigned to the current instance
     HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
     String instanceName = helixManager.getInstanceName();
-
-    List<String> onlineSegments = 
getSegmentsInGivenStateForThisInstance(helixManager, tableNameWithType,
-        CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
-    if (onlineSegments.size() > 0) {
-      LiveInstance liveInstance = 
dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
-      if (liveInstance == null) {
-        LOGGER.warn("Failed to find live instance for instance: {}", 
instanceName);
-        return false;
-      }
-      String sessionId = liveInstance.getEphemeralOwner();
-      CurrentState currentState =
-          dataAccessor.getProperty(keyBuilder.currentState(instanceName, 
sessionId, tableNameWithType));
-      if (currentState == null) {
-        LOGGER.warn("Failed to find current state for instance: {}, sessionId: 
{}, table: {}", instanceName, sessionId,
-            tableNameWithType);
-        return false;
-      }
-      // Check if ideal state and current state matches for all segments 
assigned to the current instance
-      Map<String, String> currentStateMap = 
currentState.getPartitionStateMap();
-
-      for (String segmentName : onlineSegments) {
-        String actualState = currentStateMap.get(segmentName);
-        if 
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(actualState))
 {
-          if 
(CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(actualState)) {
-            LOGGER.error("Find ERROR segment: {}, table: {}, expected: {}", 
segmentName, tableNameWithType,
-                CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
-          } else {
-            LOGGER.info("Find unloaded segment: {}, table: {}, expected: {}, 
actual: {}", segmentName,
-                tableNameWithType, 
CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE, actualState);
-          }
+    LiveInstance liveInstance = 
dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
+    if (liveInstance == null) {
+      LOGGER.warn("Failed to find live instance for instance: {}", 
instanceName);
+      return false;
+    }
+    String sessionId = liveInstance.getEphemeralOwner();
+    CurrentState currentState =
+        dataAccessor.getProperty(keyBuilder.currentState(instanceName, 
sessionId, tableNameWithType));
+    if (currentState == null) {
+      LOGGER.warn("Failed to find current state for instance: {}, sessionId: 
{}, table: {}", instanceName, sessionId,
+          tableNameWithType);
+      return false;
+    }
+    List<String> unloadedSegments = new ArrayList<>();
+    Map<String, String> currentStateMap = currentState.getPartitionStateMap();
+    for (String segmentName : onlineSegments) {
+      String actualState = currentStateMap.get(segmentName);
+      if (!SegmentStateModel.ONLINE.equals(actualState)) {
+        if (SegmentStateModel.ERROR.equals(actualState)) {
+          LOGGER.error("Found segment: {}, table: {} in ERROR state, expected: 
{}", segmentName, tableNameWithType,
+              SegmentStateModel.ONLINE);
           return false;
+        } else {
+          unloadedSegments.add(segmentName);
         }
       }
     }
-
-    LOGGER.info("All segments loaded for table: {}", tableNameWithType);
-    return true;
-  }
-
-  public static void waitForAllSegmentsLoaded(HelixManager helixManager, 
String tableNameWithType) {
-    try {
-      while (!TableStateUtils.isAllSegmentsLoaded(helixManager, 
tableNameWithType)) {
-        LOGGER.info("Sleeping 1 second waiting for all segments loaded for 
table: {}", tableNameWithType);
-        //noinspection BusyWait
-        Thread.sleep(1000L);
+    if (unloadedSegments.isEmpty()) {
+      LOGGER.info("All segments loaded for table: {}", tableNameWithType);
+      return true;
+    } else {
+      int numUnloadedSegments = unloadedSegments.size();
+      if (numUnloadedSegments <= MAX_NUM_SEGMENTS_TO_LOG) {
+        LOGGER.info("Found {} unloaded segments: {} for table: {}", 
numUnloadedSegments, unloadedSegments,
+            tableNameWithType);
+      } else {
+        LOGGER.info("Found {} unloaded segments: {}... for table: {}", 
numUnloadedSegments,
+            unloadedSegments.subList(0, MAX_NUM_SEGMENTS_TO_LOG), 
tableNameWithType);
       }
-    } catch (Exception e) {
-      throw new RuntimeException(
-          "Caught exception while waiting for all segments loaded for table: " 
+ tableNameWithType, e);
+      return false;
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to