This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 44dde02db1 Continue Segment Reset post instance errors (#15903) 44dde02db1 is described below commit 44dde02db1765e73e29e448be7bde77249ab3885 Author: NOOB <43700604+noob-se...@users.noreply.github.com> AuthorDate: Thu Jun 12 23:02:29 2025 +0530 Continue Segment Reset post instance errors (#15903) --- .../helix/core/PinotHelixResourceManager.java | 32 +++++- .../controller/helix/core/SegmentResetTest.java | 113 +++++++++++++++++++++ 2 files changed, 142 insertions(+), 3 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index b097ecd05b..a07caac5f1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2912,15 +2912,22 @@ public class PinotHelixResourceManager { Set<String> instanceSet = parseInstanceSet(idealState, segmentName, targetInstance); Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + List<String> failedInstances = new ArrayList<>(); for (String instance : instanceSet) { if (externalViewStateMap == null || SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) { LOGGER.info("Skipping resetting for segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, instance); } else { LOGGER.info("Resetting segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, instance); - resetPartitionAllState(instance, tableNameWithType, Collections.singleton(segmentName)); + resetPartitionAllState(instance, tableNameWithType, Collections.singleton(segmentName), failedInstances); } } + + if (!failedInstances.isEmpty()) { + throw new RuntimeException( + "Reset segment failed for table: " + tableNameWithType + ", segment: " + segmentName + ", instances: " + + failedInstances); + } } /** @@ -2960,12 +2967,30 @@ public class PinotHelixResourceManager { } LOGGER.info("Resetting segments: {} of table: {}", instanceToResetSegmentsMap, tableNameWithType); + + List<String> failedInstances = new ArrayList<>(); for (Map.Entry<String, Set<String>> entry : instanceToResetSegmentsMap.entrySet()) { - resetPartitionAllState(entry.getKey(), tableNameWithType, entry.getValue()); + resetPartitionAllState(entry.getKey(), tableNameWithType, entry.getValue(), failedInstances); } LOGGER.info("Reset segments for table {} finished. With the following segments skipped: {}", tableNameWithType, instanceToSkippedSegmentsMap); + + if (!failedInstances.isEmpty()) { + throw new RuntimeException( + "Reset segment failed for table: " + tableNameWithType + ", instances: " + failedInstances); + } + } + + private void resetPartitionAllState(String instance, String tableNameWithType, Set<String> segmentNames, + List<String> failedInstances) { + try { + resetPartitionAllState(instance, tableNameWithType, segmentNames); + } catch (Exception e) { + LOGGER.error("Failed to reset segment: {} of table: {} on instance: {}", segmentNames, tableNameWithType, + instance, e); + failedInstances.add(instance); + } } private static Set<String> parseInstanceSet(IdealState idealState, String segmentName, @@ -2984,7 +3009,8 @@ public class PinotHelixResourceManager { * This util is similar to {@link HelixAdmin#resetPartition(String, String, String, List)}. * However instead of resetting only the ERROR state to its initial state. we reset all state regardless. */ - private void resetPartitionAllState(String instanceName, String resourceName, Set<String> resetPartitionNames) { + @VisibleForTesting + void resetPartitionAllState(String instanceName, String resourceName, Set<String> resetPartitionNames) { LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster {}.", resetPartitionNames == null ? "NULL" : resetPartitionNames, resourceName, instanceName, _helixClusterName); HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/SegmentResetTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/SegmentResetTest.java new file mode 100644 index 0000000000..6d795c39d4 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/SegmentResetTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.spi.utils.CommonConstants; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.when; + + +public class SegmentResetTest { + + private final String _tableName = "myTable_OFFLINE"; + private final String _segmentName = "segment_1"; + private final String _instance1 = "instance_1"; + private final String _instance2 = "instance_2"; + + private class MockPinotHelixResourceManager extends PinotHelixResourceManager { + + private final Set<String> _instanceToFail; + + public MockPinotHelixResourceManager(ControllerConf controllerConf) { + super(controllerConf); + _instanceToFail = new HashSet<>(); + } + + @Override + public IdealState getTableIdealState(String tableNameWithType) { + IdealState idealState = Mockito.mock(IdealState.class); + when(idealState.getInstanceSet(_segmentName)).thenReturn(new HashSet<>(Arrays.asList(_instance1, _instance2))); + when(idealState.getPartitionSet()).thenReturn(new HashSet<>(Collections.singletonList(_segmentName))); + return idealState; + } + + @Override + public ExternalView getTableExternalView(String tableNameWithType) { + ExternalView externalView = Mockito.mock(ExternalView.class); + Map<String, String> segmentStateMap = new HashMap<>(); + segmentStateMap.put(_instance1, CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE); + segmentStateMap.put(_instance2, CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE); + when(externalView.getStateMap(_segmentName)).thenReturn(segmentStateMap); + return externalView; + } + + @Override + public void resetPartitionAllState(String instanceName, String resourceName, Set<String> resetPartitionNames) { + if (_instanceToFail.contains(instanceName)) { + throw new RuntimeException("Test: Fail reset for " + instanceName); + } + } + + public void addInstanceToFail(String instanceName) { + _instanceToFail.add(instanceName); + } + } + + @Test + public void testResetSegmentOneFailureOthersStillInvoked() { + ControllerConf cfg = new ControllerConf(); + cfg.setZkStr("localhost:2181"); + cfg.setHelixClusterName("cluster01"); + MockPinotHelixResourceManager pinotHelixManager = new MockPinotHelixResourceManager(cfg); + pinotHelixManager.addInstanceToFail(_instance1); + + RuntimeException runtimeException = Assert.expectThrows(RuntimeException.class, + () -> pinotHelixManager.resetSegment(_tableName, _segmentName, null)); + Assert.assertEquals(runtimeException.getMessage(), + "Reset segment failed for table: myTable_OFFLINE, segment: segment_1, instances: [instance_1]"); + + runtimeException = + Assert.expectThrows(RuntimeException.class, () -> pinotHelixManager.resetSegments(_tableName, null, false)); + Assert.assertEquals(runtimeException.getMessage(), + "Reset segment failed for table: myTable_OFFLINE, instances: [instance_1]"); + } + + @Test + public void testResetSegmentNoFailure() { + ControllerConf cfg = new ControllerConf(); + cfg.setZkStr("localhost:2181"); + cfg.setHelixClusterName("cluster01"); + MockPinotHelixResourceManager pinotHelixManager = new MockPinotHelixResourceManager(cfg); + + pinotHelixManager.resetSegment(_tableName, _segmentName, null); + pinotHelixManager.resetSegments(_tableName, null, false); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org