This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 44dde02db1 Continue Segment Reset post instance errors (#15903)
44dde02db1 is described below

commit 44dde02db1765e73e29e448be7bde77249ab3885
Author: NOOB <43700604+noob-se...@users.noreply.github.com>
AuthorDate: Thu Jun 12 23:02:29 2025 +0530

    Continue Segment Reset post instance errors (#15903)
---
 .../helix/core/PinotHelixResourceManager.java      |  32 +++++-
 .../controller/helix/core/SegmentResetTest.java    | 113 +++++++++++++++++++++
 2 files changed, 142 insertions(+), 3 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index b097ecd05b..a07caac5f1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2912,15 +2912,22 @@ public class PinotHelixResourceManager {
     Set<String> instanceSet = parseInstanceSet(idealState, segmentName, 
targetInstance);
     Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
 
+    List<String> failedInstances = new ArrayList<>();
     for (String instance : instanceSet) {
       if (externalViewStateMap == null || 
SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) {
         LOGGER.info("Skipping resetting for segment: {} of table: {} on 
instance: {}", segmentName, tableNameWithType,
             instance);
       } else {
         LOGGER.info("Resetting segment: {} of table: {} on instance: {}", 
segmentName, tableNameWithType, instance);
-        resetPartitionAllState(instance, tableNameWithType, 
Collections.singleton(segmentName));
+        resetPartitionAllState(instance, tableNameWithType, 
Collections.singleton(segmentName), failedInstances);
       }
     }
+
+    if (!failedInstances.isEmpty()) {
+      throw new RuntimeException(
+          "Reset segment failed for table: " + tableNameWithType + ", segment: 
" + segmentName + ", instances: "
+              + failedInstances);
+    }
   }
 
   /**
@@ -2960,12 +2967,30 @@ public class PinotHelixResourceManager {
     }
 
     LOGGER.info("Resetting segments: {} of table: {}", 
instanceToResetSegmentsMap, tableNameWithType);
+
+    List<String> failedInstances = new ArrayList<>();
     for (Map.Entry<String, Set<String>> entry : 
instanceToResetSegmentsMap.entrySet()) {
-      resetPartitionAllState(entry.getKey(), tableNameWithType, 
entry.getValue());
+      resetPartitionAllState(entry.getKey(), tableNameWithType, 
entry.getValue(), failedInstances);
     }
 
     LOGGER.info("Reset segments for table {} finished. With the following 
segments skipped: {}", tableNameWithType,
         instanceToSkippedSegmentsMap);
+
+    if (!failedInstances.isEmpty()) {
+      throw new RuntimeException(
+          "Reset segment failed for table: " + tableNameWithType + ", 
instances: " + failedInstances);
+    }
+  }
+
+  private void resetPartitionAllState(String instance, String 
tableNameWithType, Set<String> segmentNames,
+      List<String> failedInstances) {
+    try {
+      resetPartitionAllState(instance, tableNameWithType, segmentNames);
+    } catch (Exception e) {
+      LOGGER.error("Failed to reset segment: {} of table: {} on instance: {}", 
segmentNames, tableNameWithType,
+          instance, e);
+      failedInstances.add(instance);
+    }
   }
 
   private static Set<String> parseInstanceSet(IdealState idealState, String 
segmentName,
@@ -2984,7 +3009,8 @@ public class PinotHelixResourceManager {
    * This util is similar to {@link HelixAdmin#resetPartition(String, String, 
String, List)}.
    * However instead of resetting only the ERROR state to its initial state. 
we reset all state regardless.
    */
-  private void resetPartitionAllState(String instanceName, String 
resourceName, Set<String> resetPartitionNames) {
+  @VisibleForTesting
+  void resetPartitionAllState(String instanceName, String resourceName, 
Set<String> resetPartitionNames) {
     LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster 
{}.",
         resetPartitionNames == null ? "NULL" : resetPartitionNames, 
resourceName, instanceName, _helixClusterName);
     HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/SegmentResetTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/SegmentResetTest.java
new file mode 100644
index 0000000000..6d795c39d4
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/SegmentResetTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+
+
+public class SegmentResetTest {
+
+  private final String _tableName = "myTable_OFFLINE";
+  private final String _segmentName = "segment_1";
+  private final String _instance1 = "instance_1";
+  private final String _instance2 = "instance_2";
+
+  private class MockPinotHelixResourceManager extends 
PinotHelixResourceManager {
+
+    private final Set<String> _instanceToFail;
+
+    public MockPinotHelixResourceManager(ControllerConf controllerConf) {
+      super(controllerConf);
+      _instanceToFail = new HashSet<>();
+    }
+
+    @Override
+    public IdealState getTableIdealState(String tableNameWithType) {
+      IdealState idealState = Mockito.mock(IdealState.class);
+      when(idealState.getInstanceSet(_segmentName)).thenReturn(new 
HashSet<>(Arrays.asList(_instance1, _instance2)));
+      when(idealState.getPartitionSet()).thenReturn(new 
HashSet<>(Collections.singletonList(_segmentName)));
+      return idealState;
+    }
+
+    @Override
+    public ExternalView getTableExternalView(String tableNameWithType) {
+      ExternalView externalView = Mockito.mock(ExternalView.class);
+      Map<String, String> segmentStateMap = new HashMap<>();
+      segmentStateMap.put(_instance1, 
CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+      segmentStateMap.put(_instance2, 
CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+      when(externalView.getStateMap(_segmentName)).thenReturn(segmentStateMap);
+      return externalView;
+    }
+
+    @Override
+    public void resetPartitionAllState(String instanceName, String 
resourceName, Set<String> resetPartitionNames) {
+      if (_instanceToFail.contains(instanceName)) {
+        throw new RuntimeException("Test: Fail reset for " + instanceName);
+      }
+    }
+
+    public void addInstanceToFail(String instanceName) {
+      _instanceToFail.add(instanceName);
+    }
+  }
+
+  @Test
+  public void testResetSegmentOneFailureOthersStillInvoked() {
+    ControllerConf cfg = new ControllerConf();
+    cfg.setZkStr("localhost:2181");
+    cfg.setHelixClusterName("cluster01");
+    MockPinotHelixResourceManager pinotHelixManager = new 
MockPinotHelixResourceManager(cfg);
+    pinotHelixManager.addInstanceToFail(_instance1);
+
+    RuntimeException runtimeException = 
Assert.expectThrows(RuntimeException.class,
+        () -> pinotHelixManager.resetSegment(_tableName, _segmentName, null));
+    Assert.assertEquals(runtimeException.getMessage(),
+        "Reset segment failed for table: myTable_OFFLINE, segment: segment_1, 
instances: [instance_1]");
+
+    runtimeException =
+        Assert.expectThrows(RuntimeException.class, () -> 
pinotHelixManager.resetSegments(_tableName, null, false));
+    Assert.assertEquals(runtimeException.getMessage(),
+        "Reset segment failed for table: myTable_OFFLINE, instances: 
[instance_1]");
+  }
+
+  @Test
+  public void testResetSegmentNoFailure() {
+    ControllerConf cfg = new ControllerConf();
+    cfg.setZkStr("localhost:2181");
+    cfg.setHelixClusterName("cluster01");
+    MockPinotHelixResourceManager pinotHelixManager = new 
MockPinotHelixResourceManager(cfg);
+
+    pinotHelixManager.resetSegment(_tableName, _segmentName, null);
+    pinotHelixManager.resetSegments(_tableName, null, false);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to