mcvsubbu commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548313544



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to 
stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for 
external view to stabilize, and finally enabling it again",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Time in millis to wait for external view to 
converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {

Review comment:
       I am trying to avoid documenting what external view means. We know it as 
pinot devs, but probably better to minimize exposure of this to basic beginner 
users (and I think that is who we are targeting here). For all practical 
purposes, the ev wait will really translate to the total admin command wait 
time, so a re-use of that same term will save us some explanation -- is my 
thought.  I am flexible on this

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> 
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after 
disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, 
tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = 
externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", 
tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : 
resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : 
disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() 
- startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions

Review comment:
       I see, yes that is good.
   
   My question was, will we ever get into this with an empty EV in a _valid_ 
case. So, let us say, a beginner starts running pinot, has things badly 
configured so that EV never appears. Now, they correct the config and click on 
'reset'. Is this a valid scenario? Do we not want to support that scenario if 
it is valid?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> 
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after 
disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, 
tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();

Review comment:
       Yes, when we make calls to helix we have to use partition. But within 
pinot we always refer to a segment as a segment . It will really reduce 
confusion if you keep the `segment` term when we refer to that.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType, partitions);

Review comment:
       My bad. I did not see the 'false' argument there. (God, I wish helix 
went a little extra inch to provide a disablePartition api :-)

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> 
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after 
disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, 
tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = 
externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", 
tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : 
resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : 
disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() 
- startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Iterator<Map.Entry<String, Set<String>>> iterator = 
partitionInstancesToCheck.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, Set<String>> entryToCheck = iterator.next();
+        String partitionToCheck = entryToCheck.getKey();
+        Set<String> instancesToCheck = entryToCheck.getValue();
+        Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(partitionToCheck);
+        if (newExternalViewStateMap == null) {
+          continue;
+        }
+        boolean allOffline = true;
+        for (String instance : instancesToCheck) {
+          if 
(!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
+            allOffline = false;
+            break;
+          }
+        }
+        if (allOffline) {
+          iterator.remove();
+        }
+      }
+    }
+    if (!partitionInstancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after 
disable/reset call. Skipping enable of segments of table: %s",
+          tableNameWithType));
+    }
+
+    // Enable partitions
+    LOGGER.info("Enabling partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : 
resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : 
disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), 
tableNameWithType, partitions);

Review comment:
       Agreed. My bad.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to