Jackie-Jiang commented on code in PR #9203: URL: https://github.com/apache/pinot/pull/9203#discussion_r945993260
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java: ########## @@ -140,6 +140,26 @@ public long getTableSize(String tableName) } } + public void resetTable(String tableNameWithType, String targetInstance) Review Comment: (minor) Annotate `targetInstance` as `@Nullable`, same for other places ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java: ########## @@ -507,14 +507,12 @@ public SuccessResponse resetSegment( @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType, @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName, - @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses " - + "serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) { + @ApiParam(value = "Name of the target instance to reset") @QueryParam("targetInstance") String targetInstance) { Review Comment: (minor) Annotate `targetInstance` as `@Nullable`, same for other places ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java: ########## @@ -223,6 +223,11 @@ public String forTableReload(String tableName, TableType tableType, boolean forc return StringUtil.join("/", _baseUrl, "segments", tableName, query); } + public String forTableReset(String tableNameWithType, String targetInstance) { Review Comment: (minor) Annotate `targetInstance` as `@Nullable`, same for other places ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2369,143 +2372,152 @@ public void resetSegment(String tableNameWithType, String segmentName, long exte "Could not find segment: %s in ideal state for table: %s", segmentName, tableNameWithType); Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); - // First, disable or reset the segment for (String instance : instanceSet) { - if (externalViewStateMap == null || !SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { - LOGGER.info("Disabling segment: {} of table: {}", segmentName, tableNameWithType); - // enablePartition takes a segment which is NOT in ERROR state, to OFFLINE state - // TODO: If the controller fails to re-enable the partition, it will be left in disabled state - _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, - Lists.newArrayList(segmentName)); - } else { - LOGGER.info("Resetting segment: {} of table: {}", segmentName, tableNameWithType); - // resetPartition takes a segment which is in ERROR state, to OFFLINE state - _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); - } - } - - // Wait for external view to stabilize - LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segment: {} of table: {}", - externalViewWaitTimeMs, segmentName, tableNameWithType); - long startTime = System.currentTimeMillis(); - Set<String> instancesToCheck = new HashSet<>(instanceSet); - while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { - ExternalView newExternalView = getTableExternalView(tableNameWithType); - Preconditions.checkState(newExternalView != null, "Could not find external view for table: %s", - tableNameWithType); - Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName); - if (newExternalViewStateMap == null) { - continue; + if (targetInstance == null || targetInstance.equals(instance)) { + if (externalViewStateMap == null || SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Skipping reset for segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, + instance); + } else { + LOGGER.info("Resetting segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, instance); + resetPartitionAllState(_helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); + } } - instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))); - Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); - } - if (!instancesToCheck.isEmpty()) { - throw new TimeoutException(String.format( - "Timed out waiting for external view to stabilize after call to disable/reset segment: %s of table: %s. " - + "Disable/reset might complete in the background, but skipping enable of segment.", segmentName, - tableNameWithType)); - } - - // Lastly, enable segment - LOGGER.info("Enabling segment: {} of table: {}", segmentName, tableNameWithType); - for (String instance : instanceSet) { - _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, - Lists.newArrayList(segmentName)); } } /** - * Resets all segments of a table. The steps involved are - * 1. If segment is in ERROR state in the External View, invoke resetPartition, else invoke disablePartition - * 2. Wait for the external view to stabilize. Step 1 should turn all segments to OFFLINE state - * 3. Invoke enablePartition on the segments + * Resets all segments of a table. This operation invoke resetPartition via state transition message. */ - public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) + public void resetAllSegments(String tableNameWithType, @Nullable String targetInstance) throws InterruptedException, TimeoutException { IdealState idealState = getTableIdealState(tableNameWithType); Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); ExternalView externalView = getTableExternalView(tableNameWithType); Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>(); - Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>(); - Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>(); + Map<String, Set<String>> instanceToSkippedSegmentsMap = new HashMap<>(); for (String segmentName : idealState.getPartitionSet()) { Set<String> instanceSet = idealState.getInstanceSet(segmentName); Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); for (String instance : instanceSet) { - if (externalViewStateMap == null || !SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { - instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); + if (externalViewStateMap == null || SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) { + instanceToSkippedSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); } else { instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); } } - segmentInstancesToCheck.put(segmentName, new HashSet<>(instanceSet)); } - // First, disable/reset the segments - LOGGER.info("Disabling/resetting segments of table: {}", tableNameWithType); + LOGGER.info("Resetting segments of table: {}", tableNameWithType); for (Map.Entry<String, Set<String>> entry : instanceToResetSegmentsMap.entrySet()) { - // resetPartition takes a segment which is in ERROR state, to OFFLINE state - _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); - } - for (Map.Entry<String, Set<String>> entry : instanceToDisableSegmentsMap.entrySet()) { - // enablePartition takes a segment which is NOT in ERROR state, to OFFLINE state - // TODO: If the controller fails to re-enable the partition, it will be left in disabled state - _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); - } - - // Wait for external view to stabilize - LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segments of table: {}", - externalViewWaitTimeMs, tableNameWithType); - long startTime = System.currentTimeMillis(); - while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { - ExternalView newExternalView = getTableExternalView(tableNameWithType); - Preconditions.checkState(newExternalView != null, "Could not find external view for table: %s", - tableNameWithType); - Iterator<Map.Entry<String, Set<String>>> iterator = segmentInstancesToCheck.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<String, Set<String>> entryToCheck = iterator.next(); - String segmentToCheck = entryToCheck.getKey(); - Set<String> instancesToCheck = entryToCheck.getValue(); - Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentToCheck); - if (newExternalViewStateMap == null) { - continue; - } - boolean allOffline = true; - for (String instance : instancesToCheck) { - if (!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) { - allOffline = false; - break; - } - } - if (allOffline) { - iterator.remove(); - } + if (targetInstance == null || targetInstance.equals(entry.getKey())) { + resetPartitionAllState(_helixClusterName, entry.getKey(), tableNameWithType, + Lists.newArrayList(entry.getValue())); Review Comment: Suggest directly passing in the set instead of first converting it to a list, then convert it back in `resetPartitionAllState()` ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java: ########## @@ -531,6 +538,46 @@ private void checkForEmptyRoutingTable(boolean shouldBeEmpty) { }, 60_000L, errorMessage); } + public void testReset(TableType tableType) + throws Exception { + String rawTableName = getTableName(); + + // reset the table. + resetTable(rawTableName, tableType, null); + + // wait for all live messages clear the queue. + List<String> instances = _helixResourceManager.getServerInstancesForTable(rawTableName, tableType); + PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder(); + TestUtils.waitForCondition(aVoid -> { + int liveMessageCount = 0; + for (String instanceName : instances) { + List<Message> messages = _helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true); + liveMessageCount += messages.size(); + } + return liveMessageCount == 0; + }, 30_000L, "Failed to wait for all segment reset messages clear helix state transition!"); + + // Check that all states comes back to ONLINE. + TestUtils.waitForCondition(aVoid -> { + try { + // check external view and wait for everything to come back online + ExternalView externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(), + TableNameBuilder.forType(tableType).tableNameWithType(rawTableName)); + for (String segmentName : externalView.getPartitionSet()) { + Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + Preconditions.checkNotNull(externalViewStateMap); + for (String state : externalViewStateMap.values()) { + Preconditions.checkState(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state) + || CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING.equals(state)); + } + } + return true; + } catch (Throwable t) { + return false; + } Review Comment: Let's not catch exception and return false so that it can retry. We want to catch the unexpected case and fail the test if it happens ```suggestion // check external view and wait for everything to come back online ExternalView externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(), TableNameBuilder.forType(tableType).tableNameWithType(rawTableName)); for (Map<String, String> externalViewStateMap : externalView.getRecord().getMapFields().values()) { for (String state : externalViewStateMap.values()) { if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state) && !CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING.equals(state)) { return false; } } } return true; ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2369,143 +2372,152 @@ public void resetSegment(String tableNameWithType, String segmentName, long exte "Could not find segment: %s in ideal state for table: %s", segmentName, tableNameWithType); Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); - // First, disable or reset the segment for (String instance : instanceSet) { - if (externalViewStateMap == null || !SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { - LOGGER.info("Disabling segment: {} of table: {}", segmentName, tableNameWithType); - // enablePartition takes a segment which is NOT in ERROR state, to OFFLINE state - // TODO: If the controller fails to re-enable the partition, it will be left in disabled state - _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, - Lists.newArrayList(segmentName)); - } else { - LOGGER.info("Resetting segment: {} of table: {}", segmentName, tableNameWithType); - // resetPartition takes a segment which is in ERROR state, to OFFLINE state - _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); - } - } - - // Wait for external view to stabilize - LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segment: {} of table: {}", - externalViewWaitTimeMs, segmentName, tableNameWithType); - long startTime = System.currentTimeMillis(); - Set<String> instancesToCheck = new HashSet<>(instanceSet); - while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { - ExternalView newExternalView = getTableExternalView(tableNameWithType); - Preconditions.checkState(newExternalView != null, "Could not find external view for table: %s", - tableNameWithType); - Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName); - if (newExternalViewStateMap == null) { - continue; + if (targetInstance == null || targetInstance.equals(instance)) { + if (externalViewStateMap == null || SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Skipping reset for segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, + instance); + } else { + LOGGER.info("Resetting segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, instance); + resetPartitionAllState(_helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); + } } - instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))); - Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); - } - if (!instancesToCheck.isEmpty()) { - throw new TimeoutException(String.format( - "Timed out waiting for external view to stabilize after call to disable/reset segment: %s of table: %s. " - + "Disable/reset might complete in the background, but skipping enable of segment.", segmentName, - tableNameWithType)); - } - - // Lastly, enable segment - LOGGER.info("Enabling segment: {} of table: {}", segmentName, tableNameWithType); - for (String instance : instanceSet) { - _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, - Lists.newArrayList(segmentName)); } } /** - * Resets all segments of a table. The steps involved are - * 1. If segment is in ERROR state in the External View, invoke resetPartition, else invoke disablePartition - * 2. Wait for the external view to stabilize. Step 1 should turn all segments to OFFLINE state - * 3. Invoke enablePartition on the segments + * Resets all segments of a table. This operation invoke resetPartition via state transition message. */ - public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) + public void resetAllSegments(String tableNameWithType, @Nullable String targetInstance) throws InterruptedException, TimeoutException { IdealState idealState = getTableIdealState(tableNameWithType); Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); ExternalView externalView = getTableExternalView(tableNameWithType); Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>(); - Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>(); - Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>(); + Map<String, Set<String>> instanceToSkippedSegmentsMap = new HashMap<>(); Review Comment: Why do we maintain this map? ########## pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java: ########## @@ -129,12 +129,23 @@ public void onBecomeOnlineFromConsuming(Message message, NotificationContext con public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) { _logger.info("SegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() : " + message); String realtimeTableName = message.getResourceName(); - String segmentName = message.getPartitionName(); + String segmentNameStr = message.getPartitionName(); try { - _instanceDataManager.removeSegment(realtimeTableName, segmentName); + // Acquire the segment and release it first. + LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); + + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); + Preconditions.checkNotNull(tableDataManager); + SegmentDataManager acquiredSegment = tableDataManager.acquireSegment(segmentNameStr); + if (acquiredSegment == null) { + throw new RuntimeException("Segment " + segmentNameStr + " + not present "); + } + tableDataManager.releaseSegment(acquiredSegment); + // Remove the segment from instance data manager + _instanceDataManager.removeSegment(realtimeTableName, segmentNameStr); Review Comment: I don't follow this part. Why do we need to acquire and release first? Do we simply want to throw exception when the segment does not exist? This can leave segment in ERROR state, which is not desired ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2369,143 +2372,152 @@ public void resetSegment(String tableNameWithType, String segmentName, long exte "Could not find segment: %s in ideal state for table: %s", segmentName, tableNameWithType); Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); - // First, disable or reset the segment for (String instance : instanceSet) { - if (externalViewStateMap == null || !SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { - LOGGER.info("Disabling segment: {} of table: {}", segmentName, tableNameWithType); - // enablePartition takes a segment which is NOT in ERROR state, to OFFLINE state - // TODO: If the controller fails to re-enable the partition, it will be left in disabled state - _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, - Lists.newArrayList(segmentName)); - } else { - LOGGER.info("Resetting segment: {} of table: {}", segmentName, tableNameWithType); - // resetPartition takes a segment which is in ERROR state, to OFFLINE state - _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); - } - } - - // Wait for external view to stabilize - LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segment: {} of table: {}", - externalViewWaitTimeMs, segmentName, tableNameWithType); - long startTime = System.currentTimeMillis(); - Set<String> instancesToCheck = new HashSet<>(instanceSet); - while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { - ExternalView newExternalView = getTableExternalView(tableNameWithType); - Preconditions.checkState(newExternalView != null, "Could not find external view for table: %s", - tableNameWithType); - Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName); - if (newExternalViewStateMap == null) { - continue; + if (targetInstance == null || targetInstance.equals(instance)) { + if (externalViewStateMap == null || SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Skipping reset for segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, + instance); + } else { + LOGGER.info("Resetting segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, instance); + resetPartitionAllState(_helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); + } } - instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))); - Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); - } - if (!instancesToCheck.isEmpty()) { - throw new TimeoutException(String.format( - "Timed out waiting for external view to stabilize after call to disable/reset segment: %s of table: %s. " - + "Disable/reset might complete in the background, but skipping enable of segment.", segmentName, - tableNameWithType)); - } - - // Lastly, enable segment - LOGGER.info("Enabling segment: {} of table: {}", segmentName, tableNameWithType); - for (String instance : instanceSet) { - _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, - Lists.newArrayList(segmentName)); } } /** - * Resets all segments of a table. The steps involved are - * 1. If segment is in ERROR state in the External View, invoke resetPartition, else invoke disablePartition - * 2. Wait for the external view to stabilize. Step 1 should turn all segments to OFFLINE state - * 3. Invoke enablePartition on the segments + * Resets all segments of a table. This operation invoke resetPartition via state transition message. */ - public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) + public void resetAllSegments(String tableNameWithType, @Nullable String targetInstance) throws InterruptedException, TimeoutException { IdealState idealState = getTableIdealState(tableNameWithType); Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); ExternalView externalView = getTableExternalView(tableNameWithType); Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>(); - Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>(); - Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>(); + Map<String, Set<String>> instanceToSkippedSegmentsMap = new HashMap<>(); for (String segmentName : idealState.getPartitionSet()) { Set<String> instanceSet = idealState.getInstanceSet(segmentName); Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); for (String instance : instanceSet) { - if (externalViewStateMap == null || !SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { - instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); + if (externalViewStateMap == null || SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) { + instanceToSkippedSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); } else { instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); } } - segmentInstancesToCheck.put(segmentName, new HashSet<>(instanceSet)); } - // First, disable/reset the segments - LOGGER.info("Disabling/resetting segments of table: {}", tableNameWithType); + LOGGER.info("Resetting segments of table: {}", tableNameWithType); for (Map.Entry<String, Set<String>> entry : instanceToResetSegmentsMap.entrySet()) { - // resetPartition takes a segment which is in ERROR state, to OFFLINE state - _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); - } - for (Map.Entry<String, Set<String>> entry : instanceToDisableSegmentsMap.entrySet()) { - // enablePartition takes a segment which is NOT in ERROR state, to OFFLINE state - // TODO: If the controller fails to re-enable the partition, it will be left in disabled state - _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); - } - - // Wait for external view to stabilize - LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segments of table: {}", - externalViewWaitTimeMs, tableNameWithType); - long startTime = System.currentTimeMillis(); - while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { - ExternalView newExternalView = getTableExternalView(tableNameWithType); - Preconditions.checkState(newExternalView != null, "Could not find external view for table: %s", - tableNameWithType); - Iterator<Map.Entry<String, Set<String>>> iterator = segmentInstancesToCheck.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<String, Set<String>> entryToCheck = iterator.next(); - String segmentToCheck = entryToCheck.getKey(); - Set<String> instancesToCheck = entryToCheck.getValue(); - Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentToCheck); - if (newExternalViewStateMap == null) { - continue; - } - boolean allOffline = true; - for (String instance : instancesToCheck) { - if (!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) { - allOffline = false; - break; - } - } - if (allOffline) { - iterator.remove(); - } + if (targetInstance == null || targetInstance.equals(entry.getKey())) { + resetPartitionAllState(_helixClusterName, entry.getKey(), tableNameWithType, + Lists.newArrayList(entry.getValue())); } - Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); } - if (!segmentInstancesToCheck.isEmpty()) { - throw new TimeoutException(String.format( - "Timed out waiting for external view to stabilize after call to disable/reset segments. " - + "Disable/reset might complete in the background, but skipping enable of segments of table: %s", - tableNameWithType)); + } + + /** + * This util is similar to {@link HelixAdmin#resetPartition(String, String, String, List)}. + * However instead of resetting only the ERROR state to its initial state. we reset all state regardless. + */ + private void resetPartitionAllState(String clusterName, String instanceName, String resourceName, Review Comment: (minor) No need to pass in `clusterName` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2369,143 +2372,152 @@ public void resetSegment(String tableNameWithType, String segmentName, long exte "Could not find segment: %s in ideal state for table: %s", segmentName, tableNameWithType); Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); - // First, disable or reset the segment for (String instance : instanceSet) { - if (externalViewStateMap == null || !SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { - LOGGER.info("Disabling segment: {} of table: {}", segmentName, tableNameWithType); - // enablePartition takes a segment which is NOT in ERROR state, to OFFLINE state - // TODO: If the controller fails to re-enable the partition, it will be left in disabled state - _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, - Lists.newArrayList(segmentName)); - } else { - LOGGER.info("Resetting segment: {} of table: {}", segmentName, tableNameWithType); - // resetPartition takes a segment which is in ERROR state, to OFFLINE state - _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); - } - } - - // Wait for external view to stabilize - LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segment: {} of table: {}", - externalViewWaitTimeMs, segmentName, tableNameWithType); - long startTime = System.currentTimeMillis(); - Set<String> instancesToCheck = new HashSet<>(instanceSet); - while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { - ExternalView newExternalView = getTableExternalView(tableNameWithType); - Preconditions.checkState(newExternalView != null, "Could not find external view for table: %s", - tableNameWithType); - Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName); - if (newExternalViewStateMap == null) { - continue; + if (targetInstance == null || targetInstance.equals(instance)) { + if (externalViewStateMap == null || SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Skipping reset for segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, + instance); + } else { + LOGGER.info("Resetting segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, instance); + resetPartitionAllState(_helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); + } } - instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))); - Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); - } - if (!instancesToCheck.isEmpty()) { - throw new TimeoutException(String.format( - "Timed out waiting for external view to stabilize after call to disable/reset segment: %s of table: %s. " - + "Disable/reset might complete in the background, but skipping enable of segment.", segmentName, - tableNameWithType)); - } - - // Lastly, enable segment - LOGGER.info("Enabling segment: {} of table: {}", segmentName, tableNameWithType); - for (String instance : instanceSet) { - _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, - Lists.newArrayList(segmentName)); } } /** - * Resets all segments of a table. The steps involved are - * 1. If segment is in ERROR state in the External View, invoke resetPartition, else invoke disablePartition - * 2. Wait for the external view to stabilize. Step 1 should turn all segments to OFFLINE state - * 3. Invoke enablePartition on the segments + * Resets all segments of a table. This operation invoke resetPartition via state transition message. */ - public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) + public void resetAllSegments(String tableNameWithType, @Nullable String targetInstance) throws InterruptedException, TimeoutException { IdealState idealState = getTableIdealState(tableNameWithType); Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); ExternalView externalView = getTableExternalView(tableNameWithType); Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>(); - Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>(); - Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>(); + Map<String, Set<String>> instanceToSkippedSegmentsMap = new HashMap<>(); for (String segmentName : idealState.getPartitionSet()) { Set<String> instanceSet = idealState.getInstanceSet(segmentName); Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); for (String instance : instanceSet) { - if (externalViewStateMap == null || !SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { - instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); + if (externalViewStateMap == null || SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) { + instanceToSkippedSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); } else { instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); } } - segmentInstancesToCheck.put(segmentName, new HashSet<>(instanceSet)); } - // First, disable/reset the segments - LOGGER.info("Disabling/resetting segments of table: {}", tableNameWithType); + LOGGER.info("Resetting segments of table: {}", tableNameWithType); for (Map.Entry<String, Set<String>> entry : instanceToResetSegmentsMap.entrySet()) { - // resetPartition takes a segment which is in ERROR state, to OFFLINE state - _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); - } - for (Map.Entry<String, Set<String>> entry : instanceToDisableSegmentsMap.entrySet()) { - // enablePartition takes a segment which is NOT in ERROR state, to OFFLINE state - // TODO: If the controller fails to re-enable the partition, it will be left in disabled state - _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); - } - - // Wait for external view to stabilize - LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segments of table: {}", - externalViewWaitTimeMs, tableNameWithType); - long startTime = System.currentTimeMillis(); - while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { - ExternalView newExternalView = getTableExternalView(tableNameWithType); - Preconditions.checkState(newExternalView != null, "Could not find external view for table: %s", - tableNameWithType); - Iterator<Map.Entry<String, Set<String>>> iterator = segmentInstancesToCheck.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<String, Set<String>> entryToCheck = iterator.next(); - String segmentToCheck = entryToCheck.getKey(); - Set<String> instancesToCheck = entryToCheck.getValue(); - Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentToCheck); - if (newExternalViewStateMap == null) { - continue; - } - boolean allOffline = true; - for (String instance : instancesToCheck) { - if (!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) { - allOffline = false; - break; - } - } - if (allOffline) { - iterator.remove(); - } + if (targetInstance == null || targetInstance.equals(entry.getKey())) { + resetPartitionAllState(_helixClusterName, entry.getKey(), tableNameWithType, + Lists.newArrayList(entry.getValue())); } - Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); } - if (!segmentInstancesToCheck.isEmpty()) { - throw new TimeoutException(String.format( - "Timed out waiting for external view to stabilize after call to disable/reset segments. " - + "Disable/reset might complete in the background, but skipping enable of segments of table: %s", - tableNameWithType)); + } + + /** + * This util is similar to {@link HelixAdmin#resetPartition(String, String, String, List)}. + * However instead of resetting only the ERROR state to its initial state. we reset all state regardless. + */ + private void resetPartitionAllState(String clusterName, String instanceName, String resourceName, + List<String> partitionNames) { + LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster {}.", + partitionNames == null ? "NULL" : HelixUtil.serializeByComma(partitionNames), resourceName, + instanceName, clusterName); + HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + // check the instance is alive + LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); + if (liveInstance == null) { + // check if the instance exists in the cluster + String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + throw new RuntimeException(String.format("Can't find instance: %s on %s", instanceName, instanceConfigPath)); + } + + // check resource group exists + IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName)); + if (idealState == null) { + throw new RuntimeException("RESOURCE_NON_EXISTENT"); } - // Lastly, enable segments - LOGGER.info("Enabling segments of table: {}", tableNameWithType); - for (Map.Entry<String, Set<String>> entry : instanceToResetSegmentsMap.entrySet()) { - _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); + // check partition exists in resource group + Set<String> resetPartitionNames = new HashSet<String>(partitionNames); + Set<String> partitions = + (idealState.getRebalanceMode() == IdealState.RebalanceMode.CUSTOMIZED) ? idealState.getRecord() + .getMapFields().keySet() : idealState.getRecord().getListFields().keySet(); + if (!partitions.containsAll(resetPartitionNames)) { + throw new RuntimeException("PARTITION_NON_EXISTENT"); } - for (Map.Entry<String, Set<String>> entry : instanceToDisableSegmentsMap.entrySet()) { - _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); + + // check current partition state for the transition message. + String sessionId = liveInstance.getEphemeralOwner(); + CurrentState curState = + accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName)); + + // check stateModelDef exists and get initial state + String stateModelDef = idealState.getStateModelDefRef(); + StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef)); + if (stateModel == null) { + throw new RuntimeException("STATE_MODEL_NON_EXISTENT"); } Review Comment: This part is redundant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org