mcvsubbu commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548076840



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);

Review comment:
       Can we keep the log messages consistent? Let us call it a segment 
instead of partition.  (please check other log messages as well)

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);

Review comment:
       should be 4xx error (unless pinot messed up real bad. :-))

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> 
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }

Review comment:
       Please add a thread.sleep here instead of a busy-wait loop. Suggestion:
   `
   Thread.sleep(min(100,maxWaitTimeMillis/10))
   `

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to 
stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for 
external view to stabilize, and finally enabling it again",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Time in millis to wait for external view to 
converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {
+    segmentName = URIUtils.decode(segmentName);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name 
with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName,
+          externalViewWaitTimeMs > 0 ? externalViewWaitTimeMs
+              : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse("Successfully invoked segment reset");
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segment: %s in table: %s. %s", 
segmentName, tableNameWithType, e.getMessage()),
+          Status.NOT_FOUND);
+    }
+  }
+
+  /**
+   * Resets all segments of the given table
+   * This API will take segments to OFFLINE state, wait for External View to 
stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets all segments of the table, by first disabling 
them, waiting for external view to stabilize, and finally enabling the 
segments",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetAllSegments(
+      @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Time in millis to wait for external view to 
converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name 
with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetAllSegments(tableNameWithType, 
externalViewWaitTimeMs > 0 ? externalViewWaitTimeMs
+          : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse("Successfully invoked segment reset");

Review comment:
       better to include the table name in the message. Also, you may want to 
word it such that it clearly implies that the reset is completed.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to 
stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for 
external view to stabilize, and finally enabling it again",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Time in millis to wait for external view to 
converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {

Review comment:
       All that this parameter does is to override the admin command wait time. 
Why not call it something like that? We can then add it to any admin command 
now or later with the same name.
   Suggested:
   ```suggestion
         @ApiParam(value = "Maximum time in milliseconds to wait for reset to 
be completed") @QueryParam("maxWaitTimeMs") long externalViewWaitTimeMs) {
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType, partitions);

Review comment:
       Dont you have to enable the partition (segment) again after this call?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> 
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after 
disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, 
tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();

Review comment:
       suggest naming the variables with `segments` instead of `partritions`. 
We do have two other semantics of partitions that is already confusing (stream 
partitions, and partitioning of data in segment assignment)

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> 
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after 
disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, 
tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = 
externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", 
tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : 
resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : 
disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() 
- startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions

Review comment:
       Not sure if we can start off with no external view at thsi point, since 
helix will still be processing the reset calls.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to 
stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for 
external view to stabilize, and finally enabling it again",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Time in millis to wait for external view to 
converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {
+    segmentName = URIUtils.decode(segmentName);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name 
with type: %s", tableNameWithType);

Review comment:
       Please make sure that this error message shows up on the console or curl 
command if table type is not given. Sometimes we see that the precondition 
check error message does not show up, and we get a 5xx error (this should be a 
4xx error)

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");

Review comment:
       should be a 4xx error

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> 
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after 
disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, 
tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = 
externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", 
tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : 
resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : 
disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() 
- startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Iterator<Map.Entry<String, Set<String>>> iterator = 
partitionInstancesToCheck.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, Set<String>> entryToCheck = iterator.next();
+        String partitionToCheck = entryToCheck.getKey();
+        Set<String> instancesToCheck = entryToCheck.getValue();
+        Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(partitionToCheck);
+        if (newExternalViewStateMap == null) {
+          continue;
+        }
+        boolean allOffline = true;
+        for (String instance : instancesToCheck) {
+          if 
(!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
+            allOffline = false;
+            break;
+          }
+        }
+        if (allOffline) {
+          iterator.remove();
+        }
+      }
+    }
+    if (!partitionInstancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after 
disable/reset call. Skipping enable of segments of table: %s",
+          tableNameWithType));
+    }
+
+    // Enable partitions
+    LOGGER.info("Enabling partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : 
resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : 
disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), 
tableNameWithType, partitions);

Review comment:
       Why do we have an enable here and one in line 1869? Can you clarify 
again if helix expects two of these in the reset api ?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, 
String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> 
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after 
disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, 
tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, 
tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long 
externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = 
externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || 
SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", 
tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : 
resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : 
disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), 
tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() 
- startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Iterator<Map.Entry<String, Set<String>>> iterator = 
partitionInstancesToCheck.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, Set<String>> entryToCheck = iterator.next();
+        String partitionToCheck = entryToCheck.getKey();
+        Set<String> instancesToCheck = entryToCheck.getValue();
+        Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(partitionToCheck);
+        if (newExternalViewStateMap == null) {
+          continue;
+        }
+        boolean allOffline = true;
+        for (String instance : instancesToCheck) {
+          if 
(!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
+            allOffline = false;
+            break;
+          }
+        }
+        if (allOffline) {
+          iterator.remove();
+        }
+      }
+    }

Review comment:
       Please add a sleep here like in the other case, instead of busy-waiting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to