This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch full-auto-poc
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 37dc958fbc8cc33ab500167e43e78d764f239297
Author: jlli_LinkedIn <j...@linkedin.com>
AuthorDate: Mon Feb 12 23:36:47 2024 -0800

    Initial POC code for hybrid table
---
 .../pinot/broker/routing/BrokerRoutingManager.java |  11 +-
 .../instanceselector/BaseInstanceSelector.java     |  59 ++--
 .../StrictReplicaGroupInstanceSelector.java        |  34 ++-
 .../controller/helix/SegmentStatusChecker.java     |   6 +-
 ...imeSegmentOnlineOfflineStateModelGenerator.java |  72 +++++
 .../helix/core/PinotHelixResourceManager.java      |  30 +-
 .../helix/core/PinotTableIdealStateBuilder.java    |  17 +-
 .../realtime/MissingConsumingSegmentFinder.java    |   7 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  51 +++-
 .../helix/core/util/HelixSetupUtils.java           |  23 +-
 .../realtime/RealtimeSegmentDataManager.java       |   7 +
 .../tests/LLCRealtimeClusterIntegrationTest.java   |   8 +-
 .../server/starter/helix/BaseServerStarter.java    |  13 +-
 ...ltimeSegmentOnlineOfflineStateModelFactory.java | 325 +++++++++++++++++++++
 .../airlineStats_realtime_table_config.json        |   4 +-
 15 files changed, 574 insertions(+), 93 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index cc3a5354ef..b6be06653b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -431,7 +431,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
       externalViewVersion = externalView.getRecord().getVersion();
     }
 
-    Set<String> onlineSegments = getOnlineSegments(idealState);
+    Set<String> onlineSegments = getOnlineSegments(idealState, externalView);
 
     SegmentPreSelector segmentPreSelector =
         SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig, 
_propertyStore);
@@ -480,7 +480,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
         if (offlineTableExternalView == null) {
           offlineTableExternalView = new ExternalView(offlineTableName);
         }
-        Set<String> offlineTableOnlineSegments = 
getOnlineSegments(offlineTableIdealState);
+        Set<String> offlineTableOnlineSegments = 
getOnlineSegments(offlineTableIdealState, offlineTableExternalView);
         SegmentPreSelector offlineTableSegmentPreSelector =
             
SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig, 
_propertyStore);
         Set<String> offlineTablePreSelectedOnlineSegments =
@@ -538,8 +538,11 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   /**
    * Returns the online segments (with ONLINE/CONSUMING instances) in the 
given ideal state.
    */
-  private static Set<String> getOnlineSegments(IdealState idealState) {
+  private static Set<String> getOnlineSegments(IdealState idealState, 
ExternalView externalView) {
     Map<String, Map<String, String>> segmentAssignment = 
idealState.getRecord().getMapFields();
+    if (segmentAssignment.isEmpty()) {
+      segmentAssignment = externalView.getRecord().getMapFields();
+    }
     Set<String> onlineSegments = new 
HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
     for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
       Map<String, String> instanceStateMap = entry.getValue();
@@ -777,7 +780,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     // inconsistency between components, which is fine because the 
inconsistency only exists for the newly changed
     // segments and only lasts for a very short time.
     void onAssignmentChange(IdealState idealState, ExternalView externalView) {
-      Set<String> onlineSegments = getOnlineSegments(idealState);
+      Set<String> onlineSegments = getOnlineSegments(idealState, externalView);
       Set<String> preSelectedOnlineSegments = 
_segmentPreSelector.preSelect(onlineSegments);
       _segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
preSelectedOnlineSegments);
       _segmentSelector.onAssignmentChange(idealState, externalView, 
preSelectedOnlineSegments);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index b2961eef94..aebafa7741 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
@@ -130,11 +131,11 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   Map<String, Long> getNewSegmentCreationTimeMapFromZK(IdealState idealState, 
ExternalView externalView,
       Set<String> onlineSegments) {
     List<String> potentialNewSegments = new ArrayList<>();
-    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
+//    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
     Map<String, Map<String, String>> externalViewAssignment = 
externalView.getRecord().getMapFields();
     for (String segment : onlineSegments) {
-      assert idealStateAssignment.containsKey(segment);
-      if (isPotentialNewSegment(idealStateAssignment.get(segment), 
externalViewAssignment.get(segment))) {
+      assert externalViewAssignment.containsKey(segment);
+      if (isPotentialNewSegment(externalViewAssignment.get(segment))) {
         potentialNewSegments.add(segment);
       }
     }
@@ -169,14 +170,13 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
    * - Any instance for the segment is in ERROR state
    * - External view for the segment converges with ideal state
    */
-  static boolean isPotentialNewSegment(Map<String, String> 
idealStateInstanceStateMap,
-      @Nullable Map<String, String> externalViewInstanceStateMap) {
+  static boolean isPotentialNewSegment(@Nullable Map<String, String> 
externalViewInstanceStateMap) {
     if (externalViewInstanceStateMap == null) {
       return true;
     }
     boolean hasConverged = true;
     // Only track ONLINE/CONSUMING instances within the ideal state
-    for (Map.Entry<String, String> entry : 
idealStateInstanceStateMap.entrySet()) {
+    for (Map.Entry<String, String> entry : 
externalViewInstanceStateMap.entrySet()) {
       if (isOnlineForRouting(entry.getValue())) {
         String externalViewState = 
externalViewInstanceStateMap.get(entry.getKey());
         if (externalViewState == null || 
externalViewState.equals(SegmentStateModel.OFFLINE)) {
@@ -192,14 +192,13 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   /**
    * Returns the online instances for routing purpose.
    */
-  static TreeSet<String> getOnlineInstances(Map<String, String> 
idealStateInstanceStateMap,
-      Map<String, String> externalViewInstanceStateMap) {
+  static TreeSet<String> getOnlineInstances(Map<String, String> 
externalViewInstanceStateMap) {
     TreeSet<String> onlineInstances = new TreeSet<>();
     // Only track ONLINE/CONSUMING instances within the ideal state
-    for (Map.Entry<String, String> entry : 
idealStateInstanceStateMap.entrySet()) {
+    for (Map.Entry<String, String> entry : 
externalViewInstanceStateMap.entrySet()) {
       String instance = entry.getKey();
       // NOTE: DO NOT check if EV matches IS because it is a valid state when 
EV is CONSUMING while IS is ONLINE
-      if (isOnlineForRouting(entry.getValue()) && 
isOnlineForRouting(externalViewInstanceStateMap.get(instance))) {
+      if (isOnlineForRouting(externalViewInstanceStateMap.get(instance))) {
         onlineInstances.add(instance);
       }
     }
@@ -217,6 +216,14 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
     }
   }
 
+  static SortedSet<String> convertToSortedSet(Set<String> set) {
+    if (set instanceof SortedSet) {
+      return (SortedSet<String>) set;
+    } else {
+      return new TreeSet<>(set);
+    }
+  }
+
   /**
    * Updates the segment maps based on the given ideal state, external view, 
online segments (segments with
    * ONLINE/CONSUMING instances in the ideal state and pre-selected by the 
{@link SegmentPreSelector}) and new segments.
@@ -229,20 +236,22 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
     _oldSegmentCandidatesMap.clear();
     _newSegmentStateMap = new 
HashMap<>(HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size()));
 
-    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
+//    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
+    Set<String> idealStateSegmentSet = idealState.getPartitionSet();
     Map<String, Map<String, String>> externalViewAssignment = 
externalView.getRecord().getMapFields();
     for (String segment : onlineSegments) {
-      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+//      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+
       Long newSegmentCreationTimeMs = newSegmentCreationTimeMap.get(segment);
       Map<String, String> externalViewInstanceStateMap = 
externalViewAssignment.get(segment);
       if (externalViewInstanceStateMap == null) {
         if (newSegmentCreationTimeMs != null) {
           // New segment
-          List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(idealStateInstanceStateMap.size());
-          for (Map.Entry<String, String> entry : 
convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
-            if (isOnlineForRouting(entry.getValue())) {
-              candidates.add(new SegmentInstanceCandidate(entry.getKey(), 
false));
-            }
+          List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(Integer.parseInt(idealState.getReplicas()));
+          for (String segmentName : convertToSortedSet(idealStateSegmentSet)) {
+//            if (isOnlineForRouting(entry.getValue())) {
+            candidates.add(new SegmentInstanceCandidate(segmentName, false));
+//            }
           }
           _newSegmentStateMap.put(segment, new 
NewSegmentState(newSegmentCreationTimeMs, candidates));
         } else {
@@ -250,11 +259,11 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
           _oldSegmentCandidatesMap.put(segment, Collections.emptyList());
         }
       } else {
-        TreeSet<String> onlineInstances = 
getOnlineInstances(idealStateInstanceStateMap, externalViewInstanceStateMap);
+        TreeSet<String> onlineInstances = 
getOnlineInstances(externalViewInstanceStateMap);
         if (newSegmentCreationTimeMs != null) {
           // New segment
-          List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(idealStateInstanceStateMap.size());
-          for (Map.Entry<String, String> entry : 
convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
+          List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(externalViewInstanceStateMap.size());
+          for (Map.Entry<String, String> entry : 
convertToSortedMap(externalViewInstanceStateMap).entrySet()) {
             if (isOnlineForRouting(entry.getValue())) {
               String instance = entry.getKey();
               candidates.add(new SegmentInstanceCandidate(instance, 
onlineInstances.contains(instance)));
@@ -365,7 +374,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   @Override
   public void onAssignmentChange(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments) {
     Map<String, Long> newSegmentCreationTimeMap =
-        getNewSegmentCreationTimeMapFromExistingStates(idealState, 
externalView, onlineSegments);
+        getNewSegmentCreationTimeMapFromExistingStates(externalView, 
onlineSegments);
     updateSegmentMaps(idealState, externalView, onlineSegments, 
newSegmentCreationTimeMap);
     refreshSegmentStates();
   }
@@ -373,11 +382,11 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   /**
    * Returns a map from new segment to their creation time based on the 
existing in-memory states.
    */
-  Map<String, Long> getNewSegmentCreationTimeMapFromExistingStates(IdealState 
idealState, ExternalView externalView,
+  Map<String, Long> 
getNewSegmentCreationTimeMapFromExistingStates(ExternalView externalView,
       Set<String> onlineSegments) {
     Map<String, Long> newSegmentCreationTimeMap = new HashMap<>();
     long currentTimeMs = _clock.millis();
-    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
+//    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
     Map<String, Map<String, String>> externalViewAssignment = 
externalView.getRecord().getMapFields();
     for (String segment : onlineSegments) {
       NewSegmentState newSegmentState = _newSegmentStateMap.get(segment);
@@ -393,8 +402,8 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
       }
       // For recently created segment, check if it is qualified as new segment
       if (creationTimeMs > 0) {
-        assert idealStateAssignment.containsKey(segment);
-        if (isPotentialNewSegment(idealStateAssignment.get(segment), 
externalViewAssignment.get(segment))) {
+        assert externalViewAssignment.containsKey(segment);
+        if (isPotentialNewSegment(externalViewAssignment.get(segment))) {
           newSegmentCreationTimeMap.put(segment, creationTimeMs);
         }
       }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index 8c352bdbe6..5b34371a8d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -96,7 +96,7 @@ public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSele
     int newSegmentMapCapacity = 
HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size());
     _newSegmentStateMap = new HashMap<>(newSegmentMapCapacity);
 
-    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
+//    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
     Map<String, Map<String, String>> externalViewAssignment = 
externalView.getRecord().getMapFields();
 
     // Get the online instances for the segments
@@ -104,14 +104,14 @@ public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSele
         new HashMap<>(HashUtil.getHashMapCapacity(onlineSegments.size()));
     Map<String, Set<String>> newSegmentToOnlineInstancesMap = new 
HashMap<>(newSegmentMapCapacity);
     for (String segment : onlineSegments) {
-      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
-      assert idealStateInstanceStateMap != null;
+//      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+//      assert idealStateInstanceStateMap != null;
       Map<String, String> externalViewInstanceStateMap = 
externalViewAssignment.get(segment);
       Set<String> onlineInstances;
       if (externalViewInstanceStateMap == null) {
         onlineInstances = Collections.emptySet();
       } else {
-        onlineInstances = getOnlineInstances(idealStateInstanceStateMap, 
externalViewInstanceStateMap);
+        onlineInstances = getOnlineInstances(externalViewInstanceStateMap);
       }
       if (newSegmentCreationTimeMap.containsKey(segment)) {
         newSegmentToOnlineInstancesMap.put(segment, onlineInstances);
@@ -126,16 +126,18 @@ public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSele
     for (Map.Entry<String, Set<String>> entry : 
oldSegmentToOnlineInstancesMap.entrySet()) {
       String segment = entry.getKey();
       Set<String> onlineInstances = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
-      Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet();
+//      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+//      Set<String> instancesInIdealState = 
idealStateInstanceStateMap.keySet();
+      Map<String, String> externalViewInstanceStateMap = 
externalViewAssignment.get(segment);
+      Set<String> instanceInExternalView = 
externalViewInstanceStateMap.keySet();
       Set<String> unavailableInstances =
-          unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k -> 
new HashSet<>());
-      for (String instance : instancesInIdealState) {
+          unavailableInstancesMap.computeIfAbsent(instanceInExternalView, k -> 
new HashSet<>());
+      for (String instance : instanceInExternalView) {
         if (!onlineInstances.contains(instance)) {
           if (unavailableInstances.add(instance)) {
             LOGGER.warn(
                 "Found unavailable instance: {} in instance group: {} for 
segment: {}, table: {} (IS: {}, EV: {})",
-                instance, instancesInIdealState, segment, _tableNameWithType, 
idealStateInstanceStateMap,
+                instance, instanceInExternalView, segment, _tableNameWithType, 
externalViewInstanceStateMap,
                 externalViewAssignment.get(segment));
           }
         }
@@ -147,8 +149,9 @@ public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSele
       String segment = entry.getKey();
       // NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted)
       Set<String> onlineInstances = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
-      Set<String> unavailableInstances = 
unavailableInstancesMap.get(idealStateInstanceStateMap.keySet());
+      Map<String, String> externalViewInstanceStateMap = 
externalViewAssignment.get(segment);
+//      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+      Set<String> unavailableInstances = 
unavailableInstancesMap.get(externalViewInstanceStateMap.keySet());
       List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(onlineInstances.size());
       for (String instance : onlineInstances) {
         if (!unavailableInstances.contains(instance)) {
@@ -161,11 +164,12 @@ public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSele
     for (Map.Entry<String, Set<String>> entry : 
newSegmentToOnlineInstancesMap.entrySet()) {
       String segment = entry.getKey();
       Set<String> onlineInstances = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+      Map<String, String> externalViewInstanceStateMap = 
externalViewAssignment.get(segment);
+//      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
       Set<String> unavailableInstances =
-          
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(), 
Collections.emptySet());
-      List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(idealStateInstanceStateMap.size());
-      for (String instance : 
convertToSortedMap(idealStateInstanceStateMap).keySet()) {
+          
unavailableInstancesMap.getOrDefault(externalViewInstanceStateMap.keySet(), 
Collections.emptySet());
+      List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(externalViewInstanceStateMap.size());
+      for (String instance : 
convertToSortedMap(externalViewInstanceStateMap).keySet()) {
         if (!unavailableInstances.contains(instance)) {
           candidates.add(new SegmentInstanceCandidate(instance, 
onlineInstances.contains(instance)));
         }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d0af31044f..2b9431b0f2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -256,8 +256,12 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
       int nReplicas = 0;
       int nIdeal = 0;
       nSegments++;
+      Map<String, String> partitionMap = 
idealState.getInstanceStateMap(partitionName);
+      if (partitionMap == null) {
+        continue;
+      }
       // Skip segments not online in ideal state
-      for (Map.Entry<String, String> serverAndState : 
idealState.getInstanceStateMap(partitionName).entrySet()) {
+      for (Map.Entry<String, String> serverAndState : partitionMap.entrySet()) 
{
         if (serverAndState == null) {
           break;
         }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java
new file mode 100644
index 0000000000..ec640131cb
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import org.apache.helix.model.StateModelDefinition;
+
+
+/**
+ * Offline Segment state model generator describes the transitions for offline 
segment states.
+ *
+ * Online to Offline, Online to Dropped
+ * Offline to Online, Offline to Dropped
+ *
+ * This does not include the state transitions for realtime segments (which 
includes the CONSUMING state)
+ */
+public class PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator {
+  private PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator() {
+  }
+
+  public static final String PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL 
=
+      "RealtimeSegmentOnlineOfflineStateModel";
+
+  public static final String ONLINE_STATE = "ONLINE";
+  public static final String CONSUMING_STATE = "CONSUMING";
+  public static final String OFFLINE_STATE = "OFFLINE";
+  public static final String DROPPED_STATE = "DROPPED";
+
+  public static StateModelDefinition generatePinotStateModelDefinition() {
+    StateModelDefinition.Builder builder =
+        new 
StateModelDefinition.Builder(PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL);
+    builder.initialState(OFFLINE_STATE);
+
+    builder.addState(ONLINE_STATE);
+    builder.addState(CONSUMING_STATE);
+    builder.addState(OFFLINE_STATE);
+    builder.addState(DROPPED_STATE);
+    // Set the initial state when the node starts
+
+    // Add transitions between the states.
+    builder.addTransition(CONSUMING_STATE, ONLINE_STATE);
+    builder.addTransition(OFFLINE_STATE, CONSUMING_STATE);
+//    builder.addTransition(OFFLINE_STATE, ONLINE_STATE);
+    builder.addTransition(CONSUMING_STATE, OFFLINE_STATE);
+    builder.addTransition(ONLINE_STATE, OFFLINE_STATE);
+    builder.addTransition(OFFLINE_STATE, DROPPED_STATE);
+
+    // set constraints on states.
+    // static constraint
+    builder.dynamicUpperBound(ONLINE_STATE, "R");
+    // dynamic constraint, R means it should be derived based on the 
replication
+    // factor.
+
+    StateModelDefinition statemodelDefinition = builder.build();
+    return statemodelDefinition;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ae208715fd..0a308fb514 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1583,17 +1583,19 @@ public class PinotHelixResourceManager {
     Preconditions.checkState(tableType == TableType.OFFLINE || tableType == 
TableType.REALTIME,
         "Invalid table type: %s", tableType);
 
-    IdealState idealState;
-    if (tableType == TableType.REALTIME) {
-      idealState =
-           
PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
-               _enableBatchMessageMode);
-    } else {
-      // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables 
only
-      idealState =
-          
PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
-              _enableBatchMessageMode);
-    }
+    IdealState idealState =
+        
PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
+            _enableBatchMessageMode);
+//    if (tableType == TableType.REALTIME) {
+//      idealState =
+//           
PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
+//               _enableBatchMessageMode);
+//    } else {
+//      // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables 
only
+//      idealState =
+//          
PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
+//              _enableBatchMessageMode);
+//    }
    // IdealState idealState =
    //     
PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, 
tableConfig.getReplication(),
    //         _enableBatchMessageMode);
@@ -2270,8 +2272,10 @@ public class PinotHelixResourceManager {
             TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
             if (tableType == TableType.REALTIME) {
               // TODO: Once REALTIME uses FULL-AUTO only the listFields should 
be updated
-              currentAssignment.put(segmentName,
-                  
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
+              currentAssignmentList.put(segmentName, Collections.emptyList()
+                  /* 
SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */);
+//              currentAssignment.put(segmentName,
+//                  
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
             } else {
               // TODO: Assess whether to pass in an empty instance list or to 
set the preferred list
               currentAssignmentList.put(segmentName, Collections.emptyList()
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 63222df7e3..f02ea86dc7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -23,10 +23,12 @@ import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.model.builder.FullAutoModeISBuilder;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.apache.pinot.spi.utils.retry.RetryPolicy;
 import org.slf4j.Logger;
@@ -57,12 +59,23 @@ public class PinotTableIdealStateBuilder {
   public static IdealState buildEmptyFullAutoIdealStateFor(String 
tableNameWithType, int numReplicas,
       boolean enableBatchMessageMode) {
     LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: 
{}", tableNameWithType, numReplicas);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    String stateModel;
+    if (tableType == null) {
+      throw new RuntimeException("Failed to get table type from table name: " 
+ tableNameWithType);
+    } else if (TableType.OFFLINE.equals(tableType)) {
+      stateModel =
+          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    } else {
+      stateModel =
+          
PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    }
+
     // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, 
crushed auto-rebalance by default
     // TODO: The state model used only works for OFFLINE tables today. Add 
support for REALTIME state model too
     FullAutoModeISBuilder idealStateBuilder = new 
FullAutoModeISBuilder(tableNameWithType);
     idealStateBuilder
-        .setStateModel(
-            
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+        .setStateModel(stateModel)
         
.setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1)
         // TODO: Revisit the rebalance strategy to use (maybe we add a custom 
one)
         .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index f4192a5a1a..03d72600c7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -25,6 +25,7 @@ import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.commons.lang.StringUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -141,8 +142,12 @@ public class MissingConsumingSegmentFinder {
             // Note that there is no problem in case the partition group has 
reached its end of life.
             SegmentZKMetadata segmentZKMetadata = _segmentMetadataFetcher
                 .fetchSegmentZkMetadata(_realtimeTableName, 
latestCompletedSegment.getSegmentName());
+            String endOffset = segmentZKMetadata.getEndOffset();
+            if (StringUtils.isEmpty(endOffset)) {
+              return;
+            }
             StreamPartitionMsgOffset completedSegmentEndOffset =
-                
_streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
+                _streamPartitionMsgOffsetFactory.create(endOffset);
             if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) {
               // there are unconsumed messages available on the stream
               missingSegmentInfo._totalCount++;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 003f985e3f..88f3c1431c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -46,6 +46,7 @@ import org.apache.helix.Criteria;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -71,7 +72,6 @@ import 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
-import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
@@ -448,6 +448,18 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  @VisibleForTesting
+  ExternalView getExternalView(String realtimeTableName) {
+    try {
+      ExternalView externalView = 
HelixHelper.getExternalViewForResource(_helixAdmin, _clusterName, 
realtimeTableName);
+      Preconditions.checkState(externalView != null, "Failed to find 
ExternalView for table: " + realtimeTableName);
+      return externalView;
+    } catch (Exception e) {
+      _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
+      throw e;
+    }
+  }
+
   /**
    * This method moves the segment file from another location to its permanent 
location.
    * When splitCommit is enabled, segment file is uploaded to the 
segmentLocation in the committingSegmentDescriptor,
@@ -521,9 +533,17 @@ public class PinotLLCRealtimeSegmentManager {
     TableConfig tableConfig = getTableConfig(realtimeTableName);
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
     IdealState idealState = getIdealState(realtimeTableName);
-    Preconditions.checkState(
-        
idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
-        "Failed to find instance in CONSUMING state in IdealState for segment: 
%s", committingSegmentName);
+    ExternalView externalView = getExternalView(realtimeTableName);
+    // Check whether there is at least 1 replica in ONLINE state for full-auto 
mode.
+    if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) {
+      Preconditions.checkState(
+          
externalView.getStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE),
+          "Failed to find instance in ONLINE state in IdealState for segment: 
%s", committingSegmentName);
+    } else {
+      Preconditions.checkState(
+          
idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
+          "Failed to find instance in CONSUMING state in IdealState for 
segment: %s", committingSegmentName);
+    }
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     /*
@@ -993,9 +1013,11 @@ public class PinotLLCRealtimeSegmentManager {
     // TODO: Need to figure out the best way to handle committed segments' 
state change
     if (committingSegmentName != null) {
       // Change committing segment state to ONLINE
-      Set<String> instances = 
instanceStatesMap.get(committingSegmentName).keySet();
-      instanceStatesMap.put(committingSegmentName,
-          SegmentAssignmentUtils.getInstanceStateMap(instances, 
SegmentStateModel.ONLINE));
+//      Set<String> instances = 
instanceStatesMap.get(committingSegmentName).keySet();
+//      instanceStatesMap.put(committingSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instances, 
SegmentStateModel.ONLINE));
+      instanceStatesList.put(newSegmentName, Collections.emptyList()
+          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
       LOGGER.info("Updating segment: {} to ONLINE state", 
committingSegmentName);
     }
 
@@ -1029,14 +1051,14 @@ public class PinotLLCRealtimeSegmentManager {
       List<String> instancesAssigned =
           segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, 
instancePartitionsMap);
       // No need to check for tableType as offline tables can never go to 
CONSUMING state. All callers are for REALTIME
-      instanceStatesMap.put(newSegmentName,
-          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
+//      instanceStatesMap.put(newSegmentName,
+//          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
       // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the 
map. Uncomment below lines to update list.
       //       Assess whether we should set am empty InstanceStateList for the 
segment or not. i.e. do we support
       //       this preferred list concept, and does Helix-Auto even allow 
preferred list concept (from code reading it
       //       looks like it does)
-      // instanceStatesList.put(newSegmentName, Collections.emptyList()
-      //    
/*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
+       instanceStatesList.put(newSegmentName, Collections.emptyList()
+          /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/);
       LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", 
newSegmentName, instancesAssigned);
     }
   }
@@ -1247,8 +1269,11 @@ public class PinotLLCRealtimeSegmentManager {
                     newPartitionGroupMetadataList, instancePartitions, 
instanceStatesMap, instanceStatesList,
                     segmentAssignment, instancePartitionsMap, startOffset);
               } else {
-                LOGGER.error("Got unexpected instance state map: {} for 
segment: {}", instanceStateMap,
-                    latestSegmentName);
+                LOGGER.error(
+                    "Got unexpected instance state map: {} for segment: {}. 
Segment status: {}. "
+                        + "recreateDeletedConsumingSegment: {}",
+                    instanceStateMap, latestSegmentName, 
latestSegmentZKMetadata.getStatus(),
+                    recreateDeletedConsumingSegment);
               }
             }
             // else, the partition group has reached end of life. This is an 
acceptable state
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 61f6112365..033bf59405 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -47,7 +47,7 @@ import 
org.apache.pinot.common.utils.helix.LeadControllerUtils;
 import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
 import 
org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator;
-import 
org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
+import 
org.apache.pinot.controller.helix.core.PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,7 +82,7 @@ public class HelixSetupUtils {
         Map<String, String> configMap = new HashMap<>();
         configMap.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, 
Boolean.toString(true));
         
configMap.put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(),
-            Boolean.toString(true));
+            Boolean.toString(false));
         configMap.put(ENABLE_CASE_INSENSITIVE_KEY, 
Boolean.toString(DEFAULT_ENABLE_CASE_INSENSITIVE));
         configMap.put(DEFAULT_HYPERLOGLOG_LOG2M_KEY, 
Integer.toString(DEFAULT_HYPERLOGLOG_LOG2M));
         
configMap.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, 
Boolean.toString(false));
@@ -139,23 +139,24 @@ public class HelixSetupUtils {
 
   private static void addSegmentStateModelDefinitionIfNeeded(String 
helixClusterName, HelixAdmin helixAdmin,
       HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) {
-    String segmentStateModelName =
-        
PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    StateModelDefinition stateModelDefinition = 
helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName);
+    String realtimeSegmentStateModelName =
+        
PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    StateModelDefinition stateModelDefinition =
+        helixAdmin.getStateModelDef(helixClusterName, 
realtimeSegmentStateModelName);
     if (stateModelDefinition == null || isUpdateStateModel) {
       if (stateModelDefinition == null) {
-        LOGGER.info("Adding state model: {} with CONSUMING state", 
segmentStateModelName);
+        LOGGER.info("Adding state model: {} with CONSUMING state", 
realtimeSegmentStateModelName);
       } else {
-        LOGGER.info("Updating state model: {} to contain CONSUMING state", 
segmentStateModelName);
+        LOGGER.info("Updating state model: {} to contain CONSUMING state", 
realtimeSegmentStateModelName);
       }
-      helixDataAccessor
-          
.createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+      helixDataAccessor.createStateModelDef(
+          
PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
     }
 
     String offlineSegmentStateModelName =
         
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    StateModelDefinition offlineStateModelDefinition = 
helixAdmin.getStateModelDef(helixClusterName,
-        offlineSegmentStateModelName);
+    StateModelDefinition offlineStateModelDefinition =
+        helixAdmin.getStateModelDef(helixClusterName, 
offlineSegmentStateModelName);
     if (offlineStateModelDefinition == null || isUpdateStateModel) {
       if (stateModelDefinition == null) {
         LOGGER.info("Adding offline segment state model: {} with CONSUMING 
state", offlineSegmentStateModelName);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 282f3c72a1..5cdb3dda3a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -862,6 +862,13 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : 
ConsumerState.CONSUMING;
   }
 
+  /**
+   * Returns the state of the realtime segment.
+   */
+  public State getState() {
+    return _state;
+  }
+
   /**
    * Returns the timestamp of the last consumed message.
    */
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 2389fe8ba6..50693aaf73 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -500,8 +500,12 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
           }
         }
       });
-      Assert.assertTrue("No consuming segment found in partition " + 
partition, seqNum.get() >= 0);
-      return seqNum.get();
+      if (seqNum.get() == -1) {
+        return 0;
+      } else {
+        Assert.assertTrue("No consuming segment found in partition " + 
partition, seqNum.get() >= 0);
+        return seqNum.get();
+      }
     }
 
     public class ExceptingKafkaConsumer extends KafkaPartitionLevelConsumer {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 6e0d157075..9862ffb7dd 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -579,15 +579,20 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     Tracing.ThreadAccountantOps
         
.initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX),
 _instanceId);
     initSegmentFetcher(_serverConf);
-    StateModelFactory<?> stateModelFactoryWithRealtime =
-        new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
+//    StateModelFactory<?> stateModelFactoryWithRealtime =
+//        new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
     StateModelFactory<?> stateModelFactory =
         new OfflineSegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
+    StateModelFactory<?> realtimeSegmentStateModelFactory =
+        new RealtimeSegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
     _helixManager.getStateMachineEngine()
         
.registerStateModelFactory(OfflineSegmentOnlineOfflineStateModelFactory.getStateModelName(),
 stateModelFactory);
     _helixManager.getStateMachineEngine()
-        
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
-            stateModelFactoryWithRealtime);
+        
.registerStateModelFactory(RealtimeSegmentOnlineOfflineStateModelFactory.getStateModelName(),
+            realtimeSegmentStateModelFactory);
+//    _helixManager.getStateMachineEngine()
+//        
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
+//            stateModelFactoryWithRealtime);
     // Start the data manager as a pre-connect callback so that it starts 
after connecting to the ZK in order to access
     // the property store, but before receiving state transitions
     _helixManager.addPreConnectCallback(_serverInstance::startDataManager);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java
new file mode 100644
index 0000000000..d3a937eaef
--- /dev/null
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.base.Preconditions;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Data Server layer state model to take over how to operate on:
+ * 1. Add a new segment
+ * 2. Refresh an existed now serving segment.
+ * 3. Delete an existed segment.
+ *
+ * This only works for OFFLINE table segments and does not handle the 
CONSUMING state at all
+ */
+public class RealtimeSegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel> {
+  private final String _instanceId;
+  private final InstanceDataManager _instanceDataManager;
+
+  public RealtimeSegmentOnlineOfflineStateModelFactory(String instanceId, 
InstanceDataManager instanceDataManager) {
+    _instanceId = instanceId;
+    _instanceDataManager = instanceDataManager;
+  }
+
+  public static String getStateModelName() {
+    return "RealtimeSegmentOnlineOfflineStateModel";
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String 
partitionName) {
+    return new RealtimeSegmentOnlineOfflineStateModel();
+  }
+
+
+  // Helix seems to need StateModelInfo annotation for 'initialState'. It does 
not use the 'states' field.
+  // The transitions in the helix messages indicate the from/to states, and 
helix uses the
+  // Transition annotations (but only if StateModelInfo is defined).
+  @SuppressWarnings("unused")
+  @StateModelInfo(states = "{'OFFLINE', 'CONSUMING', 'ONLINE', 'DROPPED'}", 
initialState = "OFFLINE")
+  public class RealtimeSegmentOnlineOfflineStateModel extends StateModel {
+    private final Logger _logger = LoggerFactory.getLogger(_instanceId + " - 
RealtimeSegmentOnlineOfflineStateModel");
+
+    @Transition(from = "OFFLINE", to = "CONSUMING")
+    public void onBecomeConsumingFromOffline(Message message, 
NotificationContext context) {
+      
_logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline()
 : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+
+      // TODO: This may not be needed if we split the state models between 
OFFLINE and REALTIME. Commented out for now
+//      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+//      Preconditions.checkNotNull(tableType);
+//      if (tableType == TableType.OFFLINE) {
+//        _logger.info("OFFLINE->CONSUMING state transition called for OFFLINE 
table, treat this as a no-op");
+//        return;
+//      }
+
+      try {
+        _instanceDataManager.addRealtimeSegment(realtimeTableName, 
segmentName);
+      } catch (Exception e) {
+        String errorMessage =
+            String.format("Caught exception in state transition OFFLINE -> 
CONSUMING for table: %s, segment: %s",
+                realtimeTableName, segmentName);
+        _logger.error(errorMessage, e);
+        TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(realtimeTableName);
+        if (tableDataManager != null) {
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, 
e));
+        }
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "ONLINE")
+    public void onBecomeOnlineFromConsuming(Message message, 
NotificationContext context) {
+      
_logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming()
 : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+      Preconditions.checkNotNull(tableType);
+
+      // TODO: This may not be needed if we split the state models between 
OFFLINE and REALTIME. Commented out for now
+//      if (tableType == TableType.OFFLINE) {
+//        try {
+//          _instanceDataManager.addOrReplaceSegment(realtimeTableName, 
segmentName);
+//        } catch (Exception e) {
+//          String errorMessage = String.format(
+//              "Caught exception in state transition CONSUMING -> ONLINE for 
table: %s, segment: %s",
+//              realtimeTableName, segmentName);
+//          _logger.error(errorMessage, e);
+//          TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(realtimeTableName);
+//          if (tableDataManager != null) {
+//            tableDataManager.addSegmentError(segmentName, new 
SegmentErrorInfo(System.currentTimeMillis(),
+//                errorMessage, e));
+//          }
+//          Utils.rethrowException(e);
+//        }
+//      } else {
+      TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(realtimeTableName);
+      Preconditions.checkState(tableDataManager != null, "Failed to find 
table: %s", realtimeTableName);
+      tableDataManager.onConsumingToOnline(segmentName);
+      boolean isConsumingDone = false;
+      while (!isConsumingDone) {
+        SegmentDataManager acquiredSegment = 
tableDataManager.acquireSegment(segmentName);
+        // For this transition to be correct in helix, we should already have 
a segment that is consuming
+        Preconditions.checkState(acquiredSegment != null, "Failed to find 
segment: %s in table: %s", segmentName,
+            realtimeTableName);
+
+        // TODO: https://github.com/apache/pinot/issues/10049
+        try {
+          // This indicates that the realtime segment is already a completed 
immutable segment.
+          // So nothing to do for this transition.
+          if (!(acquiredSegment instanceof RealtimeSegmentDataManager)) {
+            // We found an LLC segment that is not consuming right now, must 
be that we already swapped it with a
+            // segment that has been built. Nothing to do for this state 
transition.
+            _logger.info(
+                "Segment {} not an instance of RealtimeSegmentDataManager. 
Reporting success for the transition",
+                acquiredSegment.getSegmentName());
+            return;
+          }
+          RealtimeSegmentDataManager segmentDataManager = 
(RealtimeSegmentDataManager) acquiredSegment;
+          RealtimeSegmentDataManager.State state = 
segmentDataManager.getState();
+          if ((!state.isFinal()) || state.shouldConsume()) {
+            Thread.sleep(10_000L);
+          } else {
+            SegmentZKMetadata segmentZKMetadata =
+                
ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(),
 realtimeTableName,
+                    segmentName);
+            segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
+            isConsumingDone = true;
+          }
+
+//          // Use a single thread to wait for the consuming segment to be 
fully completed
+//          ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+//          Future<Boolean> future = executorService.submit(() -> {
+//            try {
+//              while (true) {
+//                RealtimeSegmentDataManager.State state = 
segmentDataManager.getState();
+//                if (state.shouldConsume()) {
+//                  Thread.sleep(5_000L);
+//                } else {
+//                  
segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
+//                  return true;
+//                }
+////              RealtimeSegmentDataManager.State state = 
segmentDataManager.getState();
+////              if (state.isFinal()) {
+////                return true;
+////              }
+//              }
+//            } catch (InterruptedException e) {
+//              throw new RuntimeException(e);
+//            }
+//          });
+//          future.get();
+//          executorService.shutdown();
+        } catch (Exception e) {
+          String errorMessage =
+              String.format("Caught exception in state transition CONSUMING -> 
ONLINE for table: %s, segment: %s",
+                  realtimeTableName, segmentName);
+          _logger.error(errorMessage, e);
+          tableDataManager.addSegmentError(segmentName,
+              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, 
e));
+          Utils.rethrowException(e);
+        } finally {
+          tableDataManager.releaseSegment(acquiredSegment);
+        }
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "OFFLINE")
+    public void onBecomeOfflineFromConsuming(Message message, 
NotificationContext context) {
+      
_logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming()
 : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition CONSUMING -> 
OFFLINE for table: {}, segment: {}",
+            realtimeTableName, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "CONSUMING", to = "DROPPED")
+    public void onBecomeDroppedFromConsuming(Message message, 
NotificationContext context) {
+      
_logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming()
 : " + message);
+      String realtimeTableName = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(realtimeTableName);
+      Preconditions.checkState(tableDataManager != null, "Failed to find 
table: %s", realtimeTableName);
+      tableDataManager.onConsumingToDropped(segmentName);
+      try {
+        _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+        _instanceDataManager.deleteSegment(realtimeTableName, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition CONSUMING -> 
DROPPED for table: {}, segment: {}",
+            realtimeTableName, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+
+//    @Transition(from = "OFFLINE", to = "ONLINE")
+//    public void onBecomeOnlineFromOffline(Message message, 
NotificationContext context) {
+//      
_logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() 
: " + message);
+//      String tableNameWithType = message.getResourceName();
+//      String segmentName = message.getPartitionName();
+//      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+//      Preconditions.checkArgument((tableType != null) && (tableType != 
TableType.REALTIME),
+//          "TableType is null or is a REALTIME table, offline state model 
should not be called fo RT");
+//      try {
+//        _instanceDataManager.addOrReplaceSegment(tableNameWithType, 
segmentName);
+//      } catch (Exception e) {
+//        String errorMessage =
+//            String.format("Caught exception in state transition OFFLINE -> 
ONLINE for table: %s, segment: %s",
+//                tableNameWithType, segmentName);
+//        _logger.error(errorMessage, e);
+//        TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(tableNameWithType);
+//        if (tableDataManager != null) {
+//          tableDataManager.addSegmentError(segmentName,
+//              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, 
e));
+//        }
+//        Utils.rethrowException(e);
+//      }
+//    }
+
+    // Remove segment from InstanceDataManager.
+    // Still keep the data files in local.
+    @Transition(from = "ONLINE", to = "OFFLINE")
+    public void onBecomeOfflineFromOnline(Message message, NotificationContext 
context) {
+      
_logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline()
 : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> OFFLINE 
for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    // Delete segment from local directory.
+    @Transition(from = "OFFLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOffline(Message message, 
NotificationContext context) {
+      
_logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline()
 : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition OFFLINE -> DROPPED 
for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ONLINE", to = "DROPPED")
+    public void onBecomeDroppedFromOnline(Message message, NotificationContext 
context) {
+      
_logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline()
 : " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ONLINE -> DROPPED 
for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+
+    @Transition(from = "ERROR", to = "OFFLINE")
+    public void onBecomeOfflineFromError(Message message, NotificationContext 
context) {
+      
_logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromError() 
: " + message);
+    }
+
+    @Transition(from = "ERROR", to = "DROPPED")
+    public void onBecomeDroppedFromError(Message message, NotificationContext 
context) {
+      
_logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromError() 
: " + message);
+      String tableNameWithType = message.getResourceName();
+      String segmentName = message.getPartitionName();
+      try {
+        _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+      } catch (Exception e) {
+        _logger.error("Caught exception in state transition ERROR -> DROPPED 
for table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+        Utils.rethrowException(e);
+      }
+    }
+  }
+}
diff --git 
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
index 9d1dcd7e14..018ee3b6e8 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
@@ -6,7 +6,7 @@
     "timeColumnName": "DaysSinceEpoch",
     "retentionTimeUnit": "DAYS",
     "retentionTimeValue": "5",
-    "replication": "1"
+    "replication": "3"
   },
   "tableIndexConfig": {},
   "routing": {
@@ -26,7 +26,7 @@
           "stream.kafka.zk.broker.url": "localhost:2191/kafka",
           "stream.kafka.broker.list": "localhost:19092",
           "realtime.segment.flush.threshold.time": "3600000",
-          "realtime.segment.flush.threshold.size": "50000"
+          "realtime.segment.flush.threshold.size": "500"
         }
       ]
     },


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to