snleee commented on code in PR #10350:
URL: https://github.com/apache/pinot/pull/10350#discussion_r1133267301


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment 
to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no 
enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and 
we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first 
update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, 
some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the 
new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;

Review Comment:
   Can we add the comment on what the key/value is supposed to be? 
   
   I guess that it is the following:
   key = instance
   value = true (`online`)/false(`offline`)



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment 
to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no 
enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and 
we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first 
update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, 
some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the 
new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, 
nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change 
processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;
+  private Map<String, SegmentState> _newSegmentStates;
 
-  // These 2 variables are needed for instance selection (multi-threaded), so 
make them volatile
-  private volatile Map<String, List<String>> _segmentToEnabledInstancesMap;
-  private volatile Set<String> _unavailableSegments;
+  // _segmentStateSnapshot is needed for instance selection (multi-threaded), 
so make them volatile
+  private volatile SegmentStateSnapshot _segmentStateSnapshot;
+  private Clock _clock;
 
   BaseInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
-      @Nullable AdaptiveServerSelector adaptiveServerSelector) {
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, 
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    this(tableNameWithType, brokerMetrics, adaptiveServerSelector, 
propertyStore, Clock.systemUTC());
+  }
+
+  // Test only for clock injection.
+  BaseInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
+      Clock clock) {
     _tableNameWithType = tableNameWithType;
     _brokerMetrics = brokerMetrics;
     _adaptiveServerSelector = adaptiveServerSelector;
+    _newSegmentStates = new HashMap<>();
+    _propertyStore = propertyStore;
+    _clock = clock;
+  }
+
+  // Get the segment where the ideal state hasn't converged with external view 
and which doesn't have error instance.
+  private static List<String> getPotentialNewSegments(IdealState idealState, 
ExternalView externalView,
+      Set<String> onlineSegments) {
+    List<String> potentialNewSegments = new ArrayList<>();
+    Map<String, Map<String, String>> externalViewAssignment = 
externalView.getRecord().getMapFields();
+    for (Map.Entry<String, Map<String, String>> entry : 
idealState.getRecord().getMapFields().entrySet()) {
+      String segment = entry.getKey();
+      // Only track online segments
+      if (!onlineSegments.contains(segment)) {
+        continue;
+      }
+      Map<String, String> idealStateInstanceStateMap = entry.getValue();
+      // Segments with missing external view are considered as new.
+      Map<String, String> externalViewInstanceStateMap =
+          externalViewAssignment.getOrDefault(segment, Collections.emptyMap());
+      List<String> onlineInstance = new ArrayList<>();
+      boolean couldBeNewSegment = true;

Review Comment:
   Do we need this flag? Since we break on L163, we probably can remove the 
check on `couldBeNewSegment` and still get the same logic.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -141,139 +233,116 @@ public void onInstancesChange(Set<String> 
enabledInstances, List<String> changed
    */
   @Override
   public void onAssignmentChange(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments) {
+    long nowMillis = _clock.millis();
+    onAssignmentChange(idealState, externalView, onlineSegments, nowMillis, 
true);
+  }
+
+  private void onAssignmentChange(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments,
+      long nowMillis, boolean refreshNewSegments) {
+    if (refreshNewSegments) {
+      // If this call is not from init, we use existing state to check whether 
a segment is new.
+      // And we use system clock time as an approximation of segment creation 
time.
+      for (String segment : onlineSegments) {
+        if (!_segmentToOnlineInstancesMap.containsKey(segment) && 
!_newSegmentStates.containsKey(segment)) {
+          _newSegmentStates.put(segment, new SegmentState(nowMillis));
+        }
+      }
+    }
     _segmentToOnlineInstancesMap.clear();
-    _segmentToOfflineInstancesMap.clear();
-    _instanceToSegmentsMap.clear();
 
     // Update the cached maps
-    updateSegmentMaps(idealState, externalView, onlineSegments, 
_segmentToOnlineInstancesMap,
-        _segmentToOfflineInstancesMap, _instanceToSegmentsMap);
+    updateSegmentMaps(idealState, externalView, onlineSegments, 
_segmentToOnlineInstancesMap, _newSegmentStates,
+        nowMillis);
 
-    // Generate a new map from segment to enabled ONLINE/CONSUMING instances 
and a new set of unavailable segments (no
-    // enabled instance or all enabled instances are in ERROR state)
-    Map<String, List<String>> segmentToEnabledInstancesMap =
-        new 
HashMap<>(HashUtil.getHashMapCapacity(_segmentToOnlineInstancesMap.size()));
-    Set<String> unavailableSegments = new HashSet<>();
-    // NOTE: Put null as the value when there is no enabled instances for a 
segment so that segmentToEnabledInstancesMap
-    // always contains all segments. With this, in onInstancesChange() we can 
directly iterate over
-    // segmentToEnabledInstancesMap.entrySet() and modify the value without 
changing the map entries.
-    for (Map.Entry<String, List<String>> entry : 
_segmentToOnlineInstancesMap.entrySet()) {
-      String segment = entry.getKey();
-      List<String> enabledInstancesForSegment =
-          calculateEnabledInstancesForSegment(segment, entry.getValue(), 
unavailableSegments);
-      segmentToEnabledInstancesMap.put(segment, enabledInstancesForSegment);
-    }
-
-    _segmentToEnabledInstancesMap = segmentToEnabledInstancesMap;
-    _unavailableSegments = unavailableSegments;
+    _segmentStateSnapshot =
+        SegmentStateSnapshot.createSnapshot(_tableNameWithType, 
_segmentToOnlineInstancesMap, _newSegmentStates,
+            _enabledInstances, _brokerMetrics);
   }
 
   /**
    * Updates the segment maps based on the given ideal state, external view 
and online segments (segments with
    * ONLINE/CONSUMING instances in the ideal state and pre-selected by the 
{@link SegmentPreSelector}).
    */
-  void updateSegmentMaps(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments,
-      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, 
List<String>> segmentToOfflineInstancesMap,
-      Map<String, List<String>> instanceToSegmentsMap) {
-    // Iterate over the external view instead of the online segments so that 
the map lookups are performed on the
-    // HashSet instead of the TreeSet for performance
-    // NOTE: Do not track segments not in the external view because it is a 
valid state when the segment is new added
-    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
-    for (Map.Entry<String, Map<String, String>> entry : 
externalView.getRecord().getMapFields().entrySet()) {
+  protected void updateSegmentMaps(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments,
+      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, 
SegmentState> newSegmentStateMap,
+      long nowMillis) {
+    // NOTE: Segments with missing external view are considered as new.
+    Map<String, Map<String, String>> externalViewAssignment = 
externalView.getRecord().getMapFields();
+    // Iterate over the ideal state instead of the external view since this 
will cover segment with missing external
+    // view.
+    for (Map.Entry<String, Map<String, String>> entry : 
idealState.getRecord().getMapFields().entrySet()) {
       String segment = entry.getKey();
-
       // Only track online segments
       if (!onlineSegments.contains(segment)) {
         continue;
       }
-
-      Map<String, String> externalViewInstanceStateMap = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
-      List<String> onlineInstances = new 
ArrayList<>(externalViewInstanceStateMap.size());
-      List<String> offlineInstances = new ArrayList<>();
-      segmentToOnlineInstancesMap.put(segment, onlineInstances);
-      segmentToOfflineInstancesMap.put(segment, offlineInstances);
+      Map<String, String> idealStateInstanceStateMap = entry.getValue();
+      Map<String, String> externalViewInstanceStateMap =
+          externalViewAssignment.getOrDefault(segment, Collections.emptyMap());
+      // Sort the online instances for replica-group routing to work. For 
multiple segments with the same online
+      // instances, if the list is sorted, the same index in the list will 
always point to the same instance.
+      Set<String> onlineInstances = new TreeSet<>();
       for (Map.Entry<String, String> instanceStateEntry : 
externalViewInstanceStateMap.entrySet()) {
         String instance = instanceStateEntry.getKey();
-
         // Only track instances within the ideal state
         // NOTE: When an instance is not in the ideal state, the instance will 
drop the segment soon, and it is not safe
         // to query this instance for the segment. This could happen when a 
segment is moved from one instance to
         // another instance.
         if (!idealStateInstanceStateMap.containsKey(instance)) {
           continue;
         }
-
         String externalViewState = instanceStateEntry.getValue();
         // Do not track instances in ERROR state
         if (!externalViewState.equals(SegmentStateModel.ERROR)) {
-          instanceToSegmentsMap.computeIfAbsent(instance, k -> new 
ArrayList<>()).add(segment);
-          if (externalViewState.equals(SegmentStateModel.OFFLINE)) {
-            offlineInstances.add(instance);
-          } else {
+          if (SegmentStateModel.isOnline(externalViewState)) {
             onlineInstances.add(instance);
           }
+        } else {
+          // Segment with error state instance should be considered old.
+          newSegmentStateMap.remove(segment);
         }
       }
-
-      // Sort the online instances for replica-group routing to work. For 
multiple segments with the same online
-      // instances, if the list is sorted, the same index in the list will 
always point to the same instance.
-      if (!(externalViewInstanceStateMap instanceof SortedMap)) {
-        onlineInstances.sort(null);
-        offlineInstances.sort(null);
-      }
-    }
-  }
-
-  /**
-   * Calculates the enabled ONLINE/CONSUMING instances for the given segment, 
and updates the unavailable segments (no
-   * enabled instance or all enabled instances are in ERROR state).
-   */
-  @Nullable
-  private List<String> calculateEnabledInstancesForSegment(String segment, 
List<String> onlineInstancesForSegment,
-      Set<String> unavailableSegments) {
-    List<String> enabledInstancesForSegment = new 
ArrayList<>(onlineInstancesForSegment.size());
-    for (String onlineInstance : onlineInstancesForSegment) {
-      if (_enabledInstances.contains(onlineInstance)) {
-        enabledInstancesForSegment.add(onlineInstance);
+      SegmentState state = newSegmentStateMap.get(segment);

Review Comment:
   Let's rename this to `newSegmentState`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment 
to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no 
enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and 
we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first 
update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, 
some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the 
new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, 
nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change 
processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;

Review Comment:
   We can remove `_instanceToSegmentsMap`?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -18,69 +18,194 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Base implementation of instance selector which maintains a map from segment 
to enabled ONLINE/CONSUMING server
  * instances that serves the segment and a set of unavailable segments (no 
enabled instance or all enabled instances are
  * in ERROR state).
+ *
+ * New segment won't be counted as unavailable.
+ * This is because it is common for new segment to be partially available, and 
we don't want to have hot spot or low
+ * query availability problem caused by new segment.
+ * We also don't report new segment as unavailable segments.
+ *
+ * New segment is defined as segment that is created more than 5 minutes ago.
+ * - For initialization, we look up segment creation time from zookeeper.
+ * - After initialization, we use the system clock when we receive the first 
update of ideal state for that segment as
+ *   approximation of segment creation time.
+ *
+ * We retire new segment as old when:
+ * - The creation time is more than 5 mins ago
+ * - We receive error state for new segment
+ * - External view for segment converges with ideal state.
+ *
+ * Note that this implementation means:
+ * 1) Inconsistency across requests for new segments (some may be available, 
some may be not)
+ * 2) When there is no assignment/instance change for long time, some of the 
new segments that expire with the clock
+ * are still considered as old.
  */
 abstract class BaseInstanceSelector implements InstanceSelector {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseInstanceSelector.class);
+  public static class SegmentState {
+    // List of instance for this segment in ideal state.
+    // Mapping from candidate to index in _candidateInstance.
+    private HashMap<String, Boolean> _candidates;
+
+    private long _creationMillis;
+
+    public SegmentState(long creationMillis) {
+      _creationMillis = creationMillis;
+      _candidates = new HashMap<>();
+    }
+
+    public boolean isNew(long nowMillis) {
+      return CommonConstants.Helix.StateModel.isNewSegment(_creationMillis, 
nowMillis);
+    }
+
+    public void resetCandidates() {
+      _candidates.clear();
+    }
+
+    public void addCandidate(String candidate, boolean online) {
+      _candidates.put(candidate, online);
+    }
+
+    public HashMap<String, Boolean> getCandidates() {
+      return _candidates;
+    }
+  }
 
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  private final String _tableNameWithType;
   private final BrokerMetrics _brokerMetrics;
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final AdaptiveServerSelector _adaptiveServerSelector;
 
   // These 4 variables are the cached states to help accelerate the change 
processing
   private Set<String> _enabledInstances;
   private Map<String, List<String>> _segmentToOnlineInstancesMap;
-  private Map<String, List<String>> _segmentToOfflineInstancesMap;
   private Map<String, List<String>> _instanceToSegmentsMap;
+  private Map<String, SegmentState> _newSegmentStates;
 
-  // These 2 variables are needed for instance selection (multi-threaded), so 
make them volatile
-  private volatile Map<String, List<String>> _segmentToEnabledInstancesMap;
-  private volatile Set<String> _unavailableSegments;
+  // _segmentStateSnapshot is needed for instance selection (multi-threaded), 
so make them volatile
+  private volatile SegmentStateSnapshot _segmentStateSnapshot;

Review Comment:
   Currently, `_segmentStateSnapshot` is getting updated `onInstanceChange()` 
and `onAssignmentChange()`. This means that we only update `newSegment` list 
when there's a change in instance or idealstate/externalview.
   
   If there's no update on instance/idealstate/externalview for a long time 
(valid case), I think that we currently do not move newSegments to oldSegments. 
Please correct me if I'm wrong.
   
   If this is the case, should we consider adding a background thread that's 
periodically check the `SegmentStateSnapshot` and promote newSegments to 
oldSegments after 5min?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to