This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e10320595 handle absent segments so that catchup checker doesn't get 
stuck on them (#12883)
8e10320595 is described below

commit 8e103205955e8af4fe286ebd6e97b30605724be2
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Mon Apr 22 16:41:46 2024 -0700

    handle absent segments so that catchup checker doesn't get stuck on them 
(#12883)
    
    * skip missing segments while checking freshness during server startup
    
    * get new consuming segments again if current consuming segments are 
committed by other servers
---
 .../server/starter/helix/BaseServerStarter.java    |  71 +++++++-----
 .../FreshnessBasedConsumptionStatusChecker.java    |   7 +-
 .../IngestionBasedConsumptionStatusChecker.java    | 128 ++++++++++++++-------
 .../helix/OffsetBasedConsumptionStatusChecker.java |   7 +-
 .../helix/ConsumptionStatusCheckerTestUtils.java   |  38 ++++++
 ...FreshnessBasedConsumptionStatusCheckerTest.java | 103 ++++++++++++++---
 .../OffsetBasedConsumptionStatusCheckerTest.java   |  32 ++++--
 7 files changed, 288 insertions(+), 98 deletions(-)

diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 02c7b81ea5..78cd1a14e7 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -153,8 +154,8 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _helixClusterName = 
_serverConf.getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
     ServiceStartableUtils.applyClusterConfig(_serverConf, _zkAddress, 
_helixClusterName, ServiceRole.SERVER);
 
-    PinotInsecureMode.setPinotInInsecureMode(
-        
Boolean.valueOf(_serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
+    PinotInsecureMode.setPinotInInsecureMode(Boolean.parseBoolean(
+        _serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
             CommonConstants.DEFAULT_PINOT_INSECURE_MODE)));
 
     setupHelixSystemProperties();
@@ -275,8 +276,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
 
     // collect all resources which have this instance in the ideal state
     List<String> resourcesToMonitor = new ArrayList<>();
-
-    Set<String> consumingSegments = new HashSet<>();
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
     boolean checkRealtime = realtimeConsumptionCatchupWaitMs > 0;
     if (isFreshnessStatusCheckerEnabled && realtimeMinFreshnessMs <= 0) {
       LOGGER.warn("Realtime min freshness {} must be > 0. Setting relatime min 
freshness to default {}.",
@@ -289,23 +289,22 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
       if (!TableNameBuilder.isTableResource(resourceName)) {
         continue;
       }
-
       // Only monitor enabled resources
       IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, resourceName);
-      if (idealState.isEnabled()) {
-
-        for (String partitionName : idealState.getPartitionSet()) {
-          if (idealState.getInstanceSet(partitionName).contains(_instanceId)) {
-            resourcesToMonitor.add(resourceName);
-            break;
-          }
+      if (idealState == null || !idealState.isEnabled()) {
+        continue;
+      }
+      for (String partitionName : idealState.getPartitionSet()) {
+        if (idealState.getInstanceSet(partitionName).contains(_instanceId)) {
+          resourcesToMonitor.add(resourceName);
+          break;
         }
-        if (checkRealtime && 
TableNameBuilder.isRealtimeTableResource(resourceName)) {
-          for (String partitionName : idealState.getPartitionSet()) {
-            if (StateModel.SegmentStateModel.CONSUMING.equals(
-                
idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
-              consumingSegments.add(partitionName);
-            }
+      }
+      if (checkRealtime && 
TableNameBuilder.isRealtimeTableResource(resourceName)) {
+        for (String partitionName : idealState.getPartitionSet()) {
+          if (StateModel.SegmentStateModel.CONSUMING.equals(
+              idealState.getInstanceStateMap(partitionName).get(_instanceId))) 
{
+            consumingSegments.computeIfAbsent(resourceName, k -> new 
HashSet<>()).add(partitionName);
           }
         }
       }
@@ -332,7 +331,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
             realtimeMinFreshnessMs, idleTimeoutMs);
         FreshnessBasedConsumptionStatusChecker freshnessStatusChecker =
             new 
FreshnessBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(),
 consumingSegments,
-                realtimeMinFreshnessMs, idleTimeoutMs);
+                this::getConsumingSegments, realtimeMinFreshnessMs, 
idleTimeoutMs);
         Supplier<Integer> getNumConsumingSegmentsNotReachedMinFreshness =
             
freshnessStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
         serviceStatusCallbackListBuilder.add(
@@ -341,7 +340,8 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
       } else if (isOffsetBasedConsumptionStatusCheckerEnabled) {
         LOGGER.info("Setting up offset based status checker");
         OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
-            new 
OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), 
consumingSegments);
+            new 
OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), 
consumingSegments,
+                this::getConsumingSegments);
         Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset =
             
consumptionStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
         serviceStatusCallbackListBuilder.add(
@@ -359,6 +359,22 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         new 
ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
   }
 
+  @Nullable
+  private Set<String> getConsumingSegments(String realtimeTableName) {
+    IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, realtimeTableName);
+    if (idealState == null || !idealState.isEnabled()) {
+      return null;
+    }
+    Set<String> consumingSegments = new HashSet<>();
+    for (String partitionName : idealState.getPartitionSet()) {
+      if (StateModel.SegmentStateModel.CONSUMING.equals(
+          idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
+        consumingSegments.add(partitionName);
+      }
+    }
+    return consumingSegments;
+  }
+
   private void updateInstanceConfigIfNeeded(ServerConf serverConf) {
     InstanceConfig instanceConfig = 
HelixHelper.getInstanceConfig(_helixManager, _instanceId);
 
@@ -518,12 +534,13 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
       }
     }
 
-    boolean exitServerOnIncompleteStartup = _serverConf.getProperty(
-        Server.CONFIG_OF_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE,
-        Server.DEFAULT_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE);
+    boolean exitServerOnIncompleteStartup =
+        
_serverConf.getProperty(Server.CONFIG_OF_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE,
+            Server.DEFAULT_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE);
     if (exitServerOnIncompleteStartup) {
-      String errorMessage = String.format("Service status %s has not turned 
GOOD within %dms: %s. Exiting server.",
-          serviceStatus, System.currentTimeMillis() - startTimeMs, 
ServiceStatus.getStatusDescription());
+      String errorMessage =
+          String.format("Service status %s has not turned GOOD within %dms: 
%s. Exiting server.", serviceStatus,
+              System.currentTimeMillis() - startTimeMs, 
ServiceStatus.getStatusDescription());
       throw new IllegalStateException(errorMessage);
     }
     LOGGER.warn("Service status has not turned GOOD within {}ms: {}", 
System.currentTimeMillis() - startTimeMs,
@@ -581,8 +598,8 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
     instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> 
_isServerReadyToServeQueries);
     // initialize the thread accountant for query killing
-    Tracing.ThreadAccountantOps
-        
.initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX),
 _instanceId);
+    Tracing.ThreadAccountantOps.initializeThreadAccountant(
+        _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), 
_instanceId);
     initSegmentFetcher(_serverConf);
     StateModelFactory<?> stateModelFactory =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 6f3610e596..77eac3832e 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -19,7 +19,9 @@
 
 package org.apache.pinot.server.starter.helix;
 
+import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -37,9 +39,10 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
   private final long _minFreshnessMs;
   private final long _idleTimeoutMs;
 
-  public FreshnessBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager, Set<String> consumingSegments,
+  public FreshnessBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,
+      Map<String, Set<String>> consumingSegments, Function<String, 
Set<String>> consumingSegmentsSupplier,
       long minFreshnessMs, long idleTimeoutMs) {
-    super(instanceDataManager, consumingSegments);
+    super(instanceDataManager, consumingSegments, consumingSegmentsSupplier);
     _minFreshnessMs = minFreshnessMs;
     _idleTimeoutMs = idleTimeoutMs;
   }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
index 83de35a63c..c6fe0d16d6 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
@@ -19,15 +19,16 @@
 
 package org.apache.pinot.server.starter.helix;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
-import org.apache.pinot.common.utils.LLCSegmentName;
+import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,64 +36,103 @@ import org.slf4j.LoggerFactory;
 public abstract class IngestionBasedConsumptionStatusChecker {
   protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
-  // constructor parameters
-  protected final InstanceDataManager _instanceDataManager;
-  protected final Set<String> _consumingSegments;
-
-  // helper variable
-  private final Set<String> _caughtUpSegments = new HashSet<>();
+  private final InstanceDataManager _instanceDataManager;
+  private final Map<String, Set<String>> _consumingSegmentsByTable;
+  private final Map<String, Set<String>> _caughtUpSegmentsByTable = new 
HashMap<>();
+  private final Function<String, Set<String>> _consumingSegmentsSupplier;
 
+  /**
+   * Both consumingSegmentsByTable and consumingSegmentsSupplier are provided 
as it can be costly to get
+   * consumingSegmentsByTable via the supplier, so only use it when any 
missing segment is detected.
+   */
   public IngestionBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,
-      Set<String> consumingSegments) {
+      Map<String, Set<String>> consumingSegmentsByTable, Function<String, 
Set<String>> consumingSegmentsSupplier) {
     _instanceDataManager = instanceDataManager;
-    _consumingSegments = consumingSegments;
+    _consumingSegmentsByTable = consumingSegmentsByTable;
+    _consumingSegmentsSupplier = consumingSegmentsSupplier;
   }
 
-  public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
-    for (String segName : _consumingSegments) {
-      if (_caughtUpSegments.contains(segName)) {
-        continue;
-      }
-      TableDataManager tableDataManager = getTableDataManager(segName);
+  // This might be called by multiple threads, thus synchronized to be correct.
+  public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria() 
{
+    // If the checker found any consuming segments are missing or committed 
for a table, it should reset the set of
+    // consuming segments for the table to continue to monitor the freshness, 
otherwise the checker might get stuck
+    // on deleted segments or tables, or miss new consuming segments created 
in the table and get ready prematurely.
+    Set<String> tablesToRefresh = new HashSet<>();
+    Iterator<Map.Entry<String, Set<String>>> itr = 
_consumingSegmentsByTable.entrySet().iterator();
+    while (itr.hasNext()) {
+      Map.Entry<String, Set<String>> tableSegments = itr.next();
+      String tableNameWithType = tableSegments.getKey();
+      TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(tableNameWithType);
       if (tableDataManager == null) {
-        _logger.info("TableDataManager is not yet setup for segment {}. Will 
check consumption status later", segName);
+        _logger.info("No tableDataManager for table: {}. Refresh table's 
consuming segments", tableNameWithType);
+        tablesToRefresh.add(tableNameWithType);
         continue;
       }
-      SegmentDataManager segmentDataManager = null;
-      try {
-        segmentDataManager = tableDataManager.acquireSegment(segName);
-        if (segmentDataManager == null) {
-          _logger.info("SegmentDataManager is not yet setup for segment {}. 
Will check consumption status later",
-              segName);
+      Set<String> consumingSegments = tableSegments.getValue();
+      Set<String> caughtUpSegments = 
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new 
HashSet<>());
+      for (String segName : consumingSegments) {
+        if (caughtUpSegments.contains(segName)) {
           continue;
         }
-        if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
-          // There's a possibility that a consuming segment has converted to a 
committed segment. If that's the case,
-          // segment data manager will not be of type 
RealtimeSegmentDataManager.
-          _logger.info("Segment {} is already committed and is considered 
caught up.", segName);
-          _caughtUpSegments.add(segName);
+        SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segName);
+        if (segmentDataManager == null) {
+          _logger.info("No segmentDataManager for segment: {} from table: {}. 
Refresh table's consuming segments",
+              segName, tableNameWithType);
+          tablesToRefresh.add(tableNameWithType);
           continue;
         }
-
-        RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
-        if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
-          _caughtUpSegments.add(segName);
-        }
-      } finally {
-        if (segmentDataManager != null) {
+        try {
+          if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
+            // It's possible that the consuming segment has been committed by 
another server. In this case, we should
+            // get the new consuming segments for the table and continue to 
monitor their consumption status, until the
+            // current server catches up the consuming segments.
+            _logger.info("Segment: {} from table: {} is already committed. 
Refresh table's consuming segments.",
+                segName, tableNameWithType);
+            tablesToRefresh.add(tableNameWithType);
+            continue;
+          }
+          RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
+          if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
+            caughtUpSegments.add(segName);
+          }
+        } finally {
           tableDataManager.releaseSegment(segmentDataManager);
         }
       }
+      int numLaggingSegments = consumingSegments.size() - 
caughtUpSegments.size();
+      if (numLaggingSegments == 0) {
+        _logger.info("Consuming segments from table: {} have all caught up", 
tableNameWithType);
+        itr.remove();
+        _caughtUpSegmentsByTable.remove(tableNameWithType);
+      }
+    }
+    if (!tablesToRefresh.isEmpty()) {
+      for (String tableNameWithType : tablesToRefresh) {
+        Set<String> updatedConsumingSegments = 
_consumingSegmentsSupplier.apply(tableNameWithType);
+        if (updatedConsumingSegments == null || 
updatedConsumingSegments.isEmpty()) {
+          _consumingSegmentsByTable.remove(tableNameWithType);
+          _caughtUpSegmentsByTable.remove(tableNameWithType);
+          _logger.info("Found no consuming segments from table: {}, which is 
probably removed", tableNameWithType);
+        } else {
+          _consumingSegmentsByTable.put(tableNameWithType, 
updatedConsumingSegments);
+          _caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new 
HashSet<>())
+              .retainAll(updatedConsumingSegments);
+          _logger.info(
+              "Updated consumingSegments: {} and caughtUpSegments: {} for 
table: {}, as consuming segments were "
+                  + "missing or committed", updatedConsumingSegments, 
_caughtUpSegmentsByTable.get(tableNameWithType),
+              tableNameWithType);
+        }
+      }
     }
-    return _consumingSegments.size() - _caughtUpSegments.size();
+    int numLaggingSegments = 0;
+    for (Map.Entry<String, Set<String>> tableSegments : 
_consumingSegmentsByTable.entrySet()) {
+      String tableNameWithType = tableSegments.getKey();
+      Set<String> consumingSegments = tableSegments.getValue();
+      Set<String> caughtUpSegments = 
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new 
HashSet<>());
+      numLaggingSegments += consumingSegments.size() - caughtUpSegments.size();
+    }
+    return numLaggingSegments;
   }
 
   protected abstract boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager);
-
-  private TableDataManager getTableDataManager(String segmentName) {
-    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
-    String tableName = llcSegmentName.getTableName();
-    String tableNameWithType = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
-    return _instanceDataManager.getTableDataManager(tableNameWithType);
-  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
index 6b597e3fa2..ad7d2905ba 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
@@ -19,7 +19,9 @@
 
 package org.apache.pinot.server.starter.helix;
 
+import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -34,8 +36,9 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
  */
 public class OffsetBasedConsumptionStatusChecker extends 
IngestionBasedConsumptionStatusChecker {
 
-  public OffsetBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager, Set<String> consumingSegments) {
-    super(instanceDataManager, consumingSegments);
+  public OffsetBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,
+      Map<String, Set<String>> consumingSegments, Function<String, 
Set<String>> consumingSegmentsSupplier) {
+    super(instanceDataManager, consumingSegments, consumingSegmentsSupplier);
   }
 
   @Override
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/ConsumptionStatusCheckerTestUtils.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/ConsumptionStatusCheckerTestUtils.java
new file mode 100644
index 0000000000..ccd8f6f855
--- /dev/null
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/ConsumptionStatusCheckerTestUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+
+class ConsumptionStatusCheckerTestUtils {
+  private ConsumptionStatusCheckerTestUtils() {
+  }
+
+  public static Function<String, Set<String>> getConsumingSegments(Map<String, 
Set<String>> consumingSegments) {
+    // Create a new Set instance to keep updates separated from the 
consumingSegments.
+    return (tableName) -> {
+      Set<String> updated = consumingSegments.get(tableName);
+      return updated == null ? null : new HashSet<>(updated);
+    };
+  }
+}
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index 6301b54d04..e619ba7d70 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -20,7 +20,11 @@
 package org.apache.pinot.server.starter.helix;
 
 import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
@@ -42,8 +46,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     private final long _now;
 
     public FakeFreshnessBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,
-        Set<String> consumingSegments, long minFreshnessMs, long 
idleTimeoutMs, long now) {
-      super(instanceDataManager, consumingSegments, minFreshnessMs, 
idleTimeoutMs);
+        Map<String, Set<String>> consumingSegments, Function<String, 
Set<String>> consumingSegmentsSupplier,
+        long minFreshnessMs, long idleTimeoutMs, long now) {
+      super(instanceDataManager, consumingSegments, consumingSegmentsSupplier, 
minFreshnessMs, idleTimeoutMs);
       _now = now;
     }
 
@@ -58,10 +63,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     String segA0 = "tableA__0__0__123Z";
     String segA1 = "tableA__1__0__123Z";
     String segB0 = "tableB__0__0__123Z";
-    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+    consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10000L, 0L);
+        new FreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 
10000L, 0L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -119,6 +127,55 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
   }
 
+  @Test
+  public void testWithDroppedTableAndSegment()
+      throws InterruptedException {
+    String segA0 = "tableA__0__0__123Z";
+    String segA1 = "tableA__1__0__123Z";
+    String segB0 = "tableB__0__0__123Z";
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.computeIfAbsent("tableA_REALTIME", k -> new 
HashSet<>()).add(segA0);
+    consumingSegments.computeIfAbsent("tableA_REALTIME", k -> new 
HashSet<>()).add(segA1);
+    consumingSegments.computeIfAbsent("tableB_REALTIME", k -> new 
HashSet<>()).add(segB0);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    FreshnessBasedConsumptionStatusChecker statusChecker =
+        new FreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L);
+
+    // TableDataManager is not set up yet
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
+
+    // setup TableDataMangers
+    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+    
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(null);
+
+    // setup SegmentDataManagers
+    RealtimeSegmentDataManager segMngrA0 = 
mock(RealtimeSegmentDataManager.class);
+    when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+    when(tableDataManagerA.acquireSegment(segA1)).thenReturn(null);
+
+    when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
+    // ensure negative values are ignored
+    setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
+
+    //              current offset          latest stream offset    current 
time    last ingestion time
+    // segA0              0                       20                     100   
            Long.MIN_VALUE
+    // segA1 (segment is absent)
+    // segB0 (table is absent)
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
+
+    // updatedConsumingSegments still provide 3 segments to checker but one 
has caught up.
+    when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 2);
+    // Remove the missing segments and check again.
+    consumingSegments.get("tableA_REALTIME").remove(segA1);
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 1);
+    consumingSegments.remove("tableB_REALTIME");
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
+  }
+
   private void setupLatestIngestionTimestamp(RealtimeSegmentDataManager 
segmentDataManager,
       long latestIngestionTimestamp) {
     MutableSegment mockSegment = mock(MutableSegment.class);
@@ -133,10 +190,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     String segA0 = "tableA__0__0__123Z";
     String segA1 = "tableA__1__0__123Z";
     String segB0 = "tableB__0__0__123Z";
-    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+    consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, 0L, 100L);
+        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L, 100L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -195,12 +255,14 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     String segA0 = "tableA__0__0__123Z";
     String segA1 = "tableA__1__0__123Z";
     String segB0 = "tableB__0__0__123Z";
-    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+    consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     long idleTimeoutMs = 10L;
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, idleTimeoutMs,
-            100L);
+        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
idleTimeoutMs, 100L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -270,10 +332,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     String segA0 = "tableA__0__0__123Z";
     String segA1 = "tableA__1__0__123Z";
     String segB0 = "tableB__0__0__123Z";
-    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+    consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, 0L, 100L);
+        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L, 100L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -319,10 +384,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     String segA0 = "tableA__0__0__123Z";
     String segA1 = "tableA__1__0__123Z";
     String segB0 = "tableB__0__0__123Z";
-    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+    consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, 0L, 100L);
+        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L, 100L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -369,6 +437,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     setupLatestIngestionTimestamp(segMngrA0, 90L);
     // Unexpected case where latest ingested is somehow after current time
     setupLatestIngestionTimestamp(segMngrA1, 101L);
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 1);
+    consumingSegments.get("tableB_REALTIME").remove(segB0);
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
   }
 
@@ -377,10 +447,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     String segA0 = "tableA__0__0__123Z";
     String segA1 = "tableA__1__0__123Z";
     String segB0 = "tableB__0__0__123Z";
-    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+    consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments, 10L, 0L, 100L);
+        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L, 100L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
index 88b05b8ff0..2248f731d2 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
@@ -20,6 +20,8 @@
 package org.apache.pinot.server.starter.helix;
 
 import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -41,10 +43,13 @@ public class OffsetBasedConsumptionStatusCheckerTest {
     String segA0 = "tableA__0__0__123Z";
     String segA1 = "tableA__1__0__123Z";
     String segB0 = "tableB__0__0__123Z";
-    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+    consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     OffsetBasedConsumptionStatusChecker statusChecker =
-        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments);
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
 
     // setup TableDataMangers
     TableDataManager tableDataManagerA = mock(TableDataManager.class);
@@ -88,11 +93,14 @@ public class OffsetBasedConsumptionStatusCheckerTest {
     String segA0 = "tableA__0__0__123Z";
     String segA1 = "tableA__1__0__123Z";
     String segB0 = "tableB__0__0__123Z";
-    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+    consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
 
     OffsetBasedConsumptionStatusChecker statusChecker =
-        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments);
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -149,10 +157,13 @@ public class OffsetBasedConsumptionStatusCheckerTest {
     String segA0 = "tableA__0__0__123Z";
     String segA1 = "tableA__1__0__123Z";
     String segB0 = "tableB__0__0__123Z";
-    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+    consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     OffsetBasedConsumptionStatusChecker statusChecker =
-        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments);
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
 
     // setup TableDataMangers
     TableDataManager tableDataManagerA = mock(TableDataManager.class);
@@ -190,6 +201,8 @@ public class OffsetBasedConsumptionStatusCheckerTest {
     // segB0        committed at 1200               1500
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+    
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 1);
+    consumingSegments.get("tableB_REALTIME").remove(segB0);
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
   }
 
@@ -199,10 +212,13 @@ public class OffsetBasedConsumptionStatusCheckerTest {
     String segA0 = "tableA__0__0__123Z";
     String segA1 = "tableA__1__0__123Z";
     String segB0 = "tableB__0__0__123Z";
-    Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+    Map<String, Set<String>> consumingSegments = new HashMap<>();
+    consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+    consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     OffsetBasedConsumptionStatusChecker statusChecker =
-        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments);
+        new OffsetBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
 
     // setup TableDataMangers
     TableDataManager tableDataManagerA = mock(TableDataManager.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to