Jackie-Jiang commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1255271973


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -173,13 +173,82 @@ protected void doAddSegment(ImmutableSegmentImpl segment) 
{
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
+  @Override
+  public void preloadSegment(ImmutableSegment segment) {
+    String segmentName = segment.getSegmentName();
+    if (_stopped) {
+      _logger.info("Skip preloading segment: {} because metadata manager is 
already stopped", segmentName);
+      return;
+    }
+    if (!_enableSnapshot) {
+      _logger.info("Skip preloading segment: {} because snapshot is not 
enabled", segmentName);
+      return;
+    }
+    if (segment instanceof EmptyIndexSegment) {
+      _logger.info("Skip preloading empty segment: {}", segmentName);
+      return;
+    }

Review Comment:
   Similarly here, this should never be hit



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +107,127 @@ public void init(TableConfig tableConfig, Schema schema, 
TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+    if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+      // Preloading the segments with snapshots for fast upsert metadata 
recovery.
+      // Note that there is an implicit waiting logic between the thread doing 
the segment preloading here and the
+      // other helix threads about to process segment state transitions (e.g. 
taking segments from OFFLINE to ONLINE).
+      // The thread doing the segment preloading here must complete before the 
other helix threads start to handle
+      // segment state transitions. This is ensured implicitly because segment 
preloading happens here when
+      // initializing this TableUpsertMetadataManager, which happens when 
initializing the TableDataManager, which
+      // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, 
which ensures the waiting logic.
+      try {
+        _isPreloading = true;
+        preloadSegments();
+      } catch (Exception e) {
+        // Even if preloading fails, we should continue to complete the 
initialization, so that TableDataManager can be

Review Comment:
   This can potentially cause unexpected behavior. Let's not swallow this 
global catch. If we know certain preload failure can be skipped, we can catch 
it in a per-segment level so that we can continue preloading the remaining 
segments



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +107,127 @@ public void init(TableConfig tableConfig, Schema schema, 
TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+    if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+      // Preloading the segments with snapshots for fast upsert metadata 
recovery.
+      // Note that there is an implicit waiting logic between the thread doing 
the segment preloading here and the
+      // other helix threads about to process segment state transitions (e.g. 
taking segments from OFFLINE to ONLINE).
+      // The thread doing the segment preloading here must complete before the 
other helix threads start to handle
+      // segment state transitions. This is ensured implicitly because segment 
preloading happens here when
+      // initializing this TableUpsertMetadataManager, which happens when 
initializing the TableDataManager, which
+      // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, 
which ensures the waiting logic.
+      try {
+        _isPreloading = true;
+        preloadSegments();
+      } catch (Exception e) {
+        // Even if preloading fails, we should continue to complete the 
initialization, so that TableDataManager can be
+        // created. Once TableDataManager is created, no more segment 
preloading would happen, and the normal segment
+        // loading logic would be used. The segments not being preloaded 
successfully here would be loaded via the
+        // normal segment loading logic, the one doing more costly checks on 
the upsert metadata.
+        LOGGER.warn("Failed to preload segments for table: {}", 
_tableNameWithType, e);
+        if (e instanceof InterruptedException) {
+          // Restore the interrupted status in case the upper callers want to 
check.
+          Thread.currentThread().interrupt();
+        }
+      } finally {
+        _isPreloading = false;
+      }
+    }
+  }
+
+  /**
+   * Get the ideal state and find segments assigned to current instance, then 
preload those with validDocIds snapshot.
+   * Skip those without the snapshots and those whose crc has changed, as they 
will be handled by normal Helix state
+   * transitions, which will proceed after the preloading phase fully 
completes.
+   */
+  private void preloadSegments()
+      throws Exception {
+    LOGGER.info("Preload segments in table: {} for fast upsert metadata 
recovery", _tableNameWithType);
+    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, 
_tableNameWithType);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
_helixManager.getHelixPropertyStore();
+    String instanceId = getInstanceId();
+    IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
+    List<Future<?>> futures = new ArrayList<>();
+    for (String segmentName : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = 
idealState.getInstanceStateMap(segmentName);
+      String state = instanceStateMap.get(instanceId);
+      if 
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+        LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", 
segmentName, state);
+        continue;
+      }
+      if (_segmentPreloadExecutor == null) {

Review Comment:
   (optional) Should we consider not allowing preloading when the executor is 
not configured? This will make it single threaded can potentially cause 
performance degradation



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -173,13 +173,82 @@ protected void doAddSegment(ImmutableSegmentImpl segment) 
{
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
+  @Override
+  public void preloadSegment(ImmutableSegment segment) {
+    String segmentName = segment.getSegmentName();
+    if (_stopped) {
+      _logger.info("Skip preloading segment: {} because metadata manager is 
already stopped", segmentName);
+      return;
+    }
+    if (!_enableSnapshot) {
+      _logger.info("Skip preloading segment: {} because snapshot is not 
enabled", segmentName);
+      return;
+    }

Review Comment:
   We should throw exception here. If we are preloading the segment, snapshot 
must exist



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java:
##########
@@ -35,8 +32,7 @@ private TableUpsertMetadataManagerFactory() {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
 
-  public static TableUpsertMetadataManager create(TableConfig tableConfig, 
Schema schema,
-      TableDataManager tableDataManager, ServerMetrics serverMetrics) {
+  public static TableUpsertMetadataManager create(TableConfig tableConfig) {

Review Comment:
   Suggest not making this change. For plugin factories, `init()` is usually 
called within the factory



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to