mcvsubbu commented on a change in pull request #6336: URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548076840
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) { return numMessagesSent; } + /** + * Resets a segment by disabling and then enabling the segment + */ + public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Preconditions + .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s"); + Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + List<String> partitions = Lists.newArrayList(segmentName); + + // First, disable or reset partition + for (String instance : instanceSet) { + if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType); Review comment: Can we keep the log messages consistent? Let us call it a segment instead of partition. (please check other log messages as well) ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) { return numMessagesSent; } + /** + * Resets a segment by disabling and then enabling the segment + */ + public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); Review comment: should be 4xx error (unless pinot messed up real bad. :-)) ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) { return numMessagesSent; } + /** + * Resets a segment by disabling and then enabling the segment + */ + public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Preconditions + .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s"); + Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + List<String> partitions = Lists.newArrayList(segmentName); + + // First, disable or reset partition + for (String instance : instanceSet) { + if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions); + } else { + LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions); + } + } + + // Wait for external view to stabilize + LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}", + externalViewWaitTimeMs, segmentName, tableNameWithType); + long startTime = System.currentTimeMillis(); + Set<String> instancesToCheck = new HashSet<>(instanceSet); + while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { + ExternalView newExternalView = getTableExternalView(tableNameWithType); + Preconditions + .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType); + Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName); + if (newExternalViewStateMap == null) { + continue; + } + instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))); + } Review comment: Please add a thread.sleep here instead of a busy-wait loop. Suggestion: ` Thread.sleep(min(100,maxWaitTimeMillis/10)) ` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java ########## @@ -355,6 +354,61 @@ public SuccessResponse reloadSegment( } } + /** + * Resets the segment of the table, by disabling and then enabling it. + * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state, + * thus effective in resetting segments or consumers in error states. + */ + @POST + @Path("segments/{tableNameWithType}/{segmentName}/reset") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again", + notes = "Resets a segment by disabling and then enabling a segment") + public SuccessResponse resetSegment( + @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType, + @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName, + @ApiParam(value = "Time in millis to wait for external view to converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) { + segmentName = URIUtils.decode(segmentName); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + try { + Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType); + _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName, + externalViewWaitTimeMs > 0 ? externalViewWaitTimeMs + : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + return new SuccessResponse("Successfully invoked segment reset"); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to reset segment: %s in table: %s. %s", segmentName, tableNameWithType, e.getMessage()), + Status.NOT_FOUND); + } + } + + /** + * Resets all segments of the given table + * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state, + * thus effective in resetting segments or consumers in error states. + */ + @POST + @Path("segments/{tableNameWithType}/reset") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Resets all segments of the table, by first disabling them, waiting for external view to stabilize, and finally enabling the segments", + notes = "Resets a segment by disabling and then enabling a segment") + public SuccessResponse resetAllSegments( + @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType, + @ApiParam(value = "Time in millis to wait for external view to converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + try { + Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType); + _pinotHelixResourceManager.resetAllSegments(tableNameWithType, externalViewWaitTimeMs > 0 ? externalViewWaitTimeMs + : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + return new SuccessResponse("Successfully invoked segment reset"); Review comment: better to include the table name in the message. Also, you may want to word it such that it clearly implies that the reset is completed. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java ########## @@ -355,6 +354,61 @@ public SuccessResponse reloadSegment( } } + /** + * Resets the segment of the table, by disabling and then enabling it. + * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state, + * thus effective in resetting segments or consumers in error states. + */ + @POST + @Path("segments/{tableNameWithType}/{segmentName}/reset") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again", + notes = "Resets a segment by disabling and then enabling a segment") + public SuccessResponse resetSegment( + @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType, + @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName, + @ApiParam(value = "Time in millis to wait for external view to converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) { Review comment: All that this parameter does is to override the admin command wait time. Why not call it something like that? We can then add it to any admin command now or later with the same name. Suggested: ```suggestion @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed") @QueryParam("maxWaitTimeMs") long externalViewWaitTimeMs) { ``` ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) { return numMessagesSent; } + /** + * Resets a segment by disabling and then enabling the segment + */ + public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Preconditions + .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s"); + Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + List<String> partitions = Lists.newArrayList(segmentName); + + // First, disable or reset partition + for (String instance : instanceSet) { + if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions); + } else { + LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions); Review comment: Dont you have to enable the partition (segment) again after this call? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) { return numMessagesSent; } + /** + * Resets a segment by disabling and then enabling the segment + */ + public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Preconditions + .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s"); + Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + List<String> partitions = Lists.newArrayList(segmentName); + + // First, disable or reset partition + for (String instance : instanceSet) { + if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions); + } else { + LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions); + } + } + + // Wait for external view to stabilize + LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}", + externalViewWaitTimeMs, segmentName, tableNameWithType); + long startTime = System.currentTimeMillis(); + Set<String> instancesToCheck = new HashSet<>(instanceSet); + while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { + ExternalView newExternalView = getTableExternalView(tableNameWithType); + Preconditions + .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType); + Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName); + if (newExternalViewStateMap == null) { + continue; + } + instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))); + } + if (!instancesToCheck.isEmpty()) { + throw new IllegalStateException(String.format( + "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s", + segmentName, tableNameWithType)); + } + + // Enable partition + LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType); + for (String instance : instanceSet) { + _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions); + } + } + + /** + * Resets all segments of a table by disabling and then enabling the segments + */ + public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + + Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>(); Review comment: suggest naming the variables with `segments` instead of `partritions`. We do have two other semantics of partitions that is already confusing (stream partitions, and partitioning of data in segment assignment) ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) { return numMessagesSent; } + /** + * Resets a segment by disabling and then enabling the segment + */ + public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Preconditions + .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s"); + Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + List<String> partitions = Lists.newArrayList(segmentName); + + // First, disable or reset partition + for (String instance : instanceSet) { + if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions); + } else { + LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions); + } + } + + // Wait for external view to stabilize + LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}", + externalViewWaitTimeMs, segmentName, tableNameWithType); + long startTime = System.currentTimeMillis(); + Set<String> instancesToCheck = new HashSet<>(instanceSet); + while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { + ExternalView newExternalView = getTableExternalView(tableNameWithType); + Preconditions + .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType); + Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName); + if (newExternalViewStateMap == null) { + continue; + } + instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))); + } + if (!instancesToCheck.isEmpty()) { + throw new IllegalStateException(String.format( + "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s", + segmentName, tableNameWithType)); + } + + // Enable partition + LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType); + for (String instance : instanceSet) { + _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions); + } + } + + /** + * Resets all segments of a table by disabling and then enabling the segments + */ + public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + + Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>(); + Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>(); + Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>(); + + for (String partition : idealState.getPartitionSet()) { + Set<String> instanceSet = idealState.getInstanceSet(partition); + Map<String, String> externalViewStateMap = externalView.getStateMap(partition); + for (String instance : instanceSet) { + if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition); + } else { + disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition); + } + } + partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet)); + } + + // First, disable/reset the partitions + LOGGER.info("Disabling/resetting partitions of table: {}", tableNameWithType); + for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) { + ArrayList<String> partitions = Lists.newArrayList(entry.getValue()); + _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, partitions); + } + for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) { + ArrayList<String> partitions = Lists.newArrayList(entry.getValue()); + _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, partitions); + } + + // Wait for external view to stabilize + LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partitions of table: {}", + externalViewWaitTimeMs, tableNameWithType); + long startTime = System.currentTimeMillis(); + while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { + ExternalView newExternalView = getTableExternalView(tableNameWithType); + Preconditions Review comment: Not sure if we can start off with no external view at thsi point, since helix will still be processing the reset calls. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java ########## @@ -355,6 +354,61 @@ public SuccessResponse reloadSegment( } } + /** + * Resets the segment of the table, by disabling and then enabling it. + * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state, + * thus effective in resetting segments or consumers in error states. + */ + @POST + @Path("segments/{tableNameWithType}/{segmentName}/reset") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again", + notes = "Resets a segment by disabling and then enabling a segment") + public SuccessResponse resetSegment( + @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType, + @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName, + @ApiParam(value = "Time in millis to wait for external view to converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) { + segmentName = URIUtils.decode(segmentName); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + try { + Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType); Review comment: Please make sure that this error message shows up on the console or curl command if table type is not given. Sometimes we see that the precondition check error message does not show up, and we get a 5xx error (this should be a 4xx error) ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) { return numMessagesSent; } + /** + * Resets a segment by disabling and then enabling the segment + */ + public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Preconditions + .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s"); Review comment: should be a 4xx error ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) { return numMessagesSent; } + /** + * Resets a segment by disabling and then enabling the segment + */ + public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Preconditions + .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s"); + Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + List<String> partitions = Lists.newArrayList(segmentName); + + // First, disable or reset partition + for (String instance : instanceSet) { + if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions); + } else { + LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions); + } + } + + // Wait for external view to stabilize + LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}", + externalViewWaitTimeMs, segmentName, tableNameWithType); + long startTime = System.currentTimeMillis(); + Set<String> instancesToCheck = new HashSet<>(instanceSet); + while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { + ExternalView newExternalView = getTableExternalView(tableNameWithType); + Preconditions + .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType); + Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName); + if (newExternalViewStateMap == null) { + continue; + } + instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))); + } + if (!instancesToCheck.isEmpty()) { + throw new IllegalStateException(String.format( + "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s", + segmentName, tableNameWithType)); + } + + // Enable partition + LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType); + for (String instance : instanceSet) { + _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions); + } + } + + /** + * Resets all segments of a table by disabling and then enabling the segments + */ + public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + + Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>(); + Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>(); + Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>(); + + for (String partition : idealState.getPartitionSet()) { + Set<String> instanceSet = idealState.getInstanceSet(partition); + Map<String, String> externalViewStateMap = externalView.getStateMap(partition); + for (String instance : instanceSet) { + if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition); + } else { + disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition); + } + } + partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet)); + } + + // First, disable/reset the partitions + LOGGER.info("Disabling/resetting partitions of table: {}", tableNameWithType); + for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) { + ArrayList<String> partitions = Lists.newArrayList(entry.getValue()); + _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, partitions); + } + for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) { + ArrayList<String> partitions = Lists.newArrayList(entry.getValue()); + _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, partitions); + } + + // Wait for external view to stabilize + LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partitions of table: {}", + externalViewWaitTimeMs, tableNameWithType); + long startTime = System.currentTimeMillis(); + while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { + ExternalView newExternalView = getTableExternalView(tableNameWithType); + Preconditions + .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType); + Iterator<Map.Entry<String, Set<String>>> iterator = partitionInstancesToCheck.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, Set<String>> entryToCheck = iterator.next(); + String partitionToCheck = entryToCheck.getKey(); + Set<String> instancesToCheck = entryToCheck.getValue(); + Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(partitionToCheck); + if (newExternalViewStateMap == null) { + continue; + } + boolean allOffline = true; + for (String instance : instancesToCheck) { + if (!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) { + allOffline = false; + break; + } + } + if (allOffline) { + iterator.remove(); + } + } + } + if (!partitionInstancesToCheck.isEmpty()) { + throw new IllegalStateException(String.format( + "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of segments of table: %s", + tableNameWithType)); + } + + // Enable partitions + LOGGER.info("Enabling partitions of table: {}", tableNameWithType); + for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) { + ArrayList<String> partitions = Lists.newArrayList(entry.getValue()); + _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, partitions); + } + for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) { + ArrayList<String> partitions = Lists.newArrayList(entry.getValue()); + _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, partitions); Review comment: Why do we have an enable here and one in line 1869? Can you clarify again if helix expects two of these in the reset api ? ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java ########## @@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) { return numMessagesSent; } + /** + * Resets a segment by disabling and then enabling the segment + */ + public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Preconditions + .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s"); + Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + List<String> partitions = Lists.newArrayList(segmentName); + + // First, disable or reset partition + for (String instance : instanceSet) { + if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions); + } else { + LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType); + _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions); + } + } + + // Wait for external view to stabilize + LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}", + externalViewWaitTimeMs, segmentName, tableNameWithType); + long startTime = System.currentTimeMillis(); + Set<String> instancesToCheck = new HashSet<>(instanceSet); + while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { + ExternalView newExternalView = getTableExternalView(tableNameWithType); + Preconditions + .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType); + Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName); + if (newExternalViewStateMap == null) { + continue; + } + instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))); + } + if (!instancesToCheck.isEmpty()) { + throw new IllegalStateException(String.format( + "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s", + segmentName, tableNameWithType)); + } + + // Enable partition + LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType); + for (String instance : instanceSet) { + _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions); + } + } + + /** + * Resets all segments of a table by disabling and then enabling the segments + */ + public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + + Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>(); + Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>(); + Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>(); + + for (String partition : idealState.getPartitionSet()) { + Set<String> instanceSet = idealState.getInstanceSet(partition); + Map<String, String> externalViewStateMap = externalView.getStateMap(partition); + for (String instance : instanceSet) { + if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition); + } else { + disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition); + } + } + partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet)); + } + + // First, disable/reset the partitions + LOGGER.info("Disabling/resetting partitions of table: {}", tableNameWithType); + for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) { + ArrayList<String> partitions = Lists.newArrayList(entry.getValue()); + _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, partitions); + } + for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) { + ArrayList<String> partitions = Lists.newArrayList(entry.getValue()); + _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, partitions); + } + + // Wait for external view to stabilize + LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partitions of table: {}", + externalViewWaitTimeMs, tableNameWithType); + long startTime = System.currentTimeMillis(); + while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { + ExternalView newExternalView = getTableExternalView(tableNameWithType); + Preconditions + .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType); + Iterator<Map.Entry<String, Set<String>>> iterator = partitionInstancesToCheck.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, Set<String>> entryToCheck = iterator.next(); + String partitionToCheck = entryToCheck.getKey(); + Set<String> instancesToCheck = entryToCheck.getValue(); + Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(partitionToCheck); + if (newExternalViewStateMap == null) { + continue; + } + boolean allOffline = true; + for (String instance : instancesToCheck) { + if (!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) { + allOffline = false; + break; + } + } + if (allOffline) { + iterator.remove(); + } + } + } Review comment: Please add a sleep here like in the other case, instead of busy-waiting. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org