This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6061b8926a Fix race condition in IdealStateGroupCommit (#14237)
6061b8926a is described below

commit 6061b8926a605012c65803b91c1039858a56b018
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Wed Oct 16 20:24:54 2024 +0800

    Fix race condition in IdealStateGroupCommit (#14237)
---
 .../common/utils/helix/IdealStateGroupCommit.java  |  75 ++++++++-------
 .../helix/IdealStateGroupCommitTest.java           | 103 +++++++++++++++------
 2 files changed, 115 insertions(+), 63 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
index ea74fb18e2..f7e7981a1a 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
@@ -72,6 +72,7 @@ public class IdealStateGroupCommit {
     final Function<IdealState, IdealState> _updater;
     IdealState _updatedIdealState = null;
     AtomicBoolean _sent = new AtomicBoolean(false);
+    Throwable _exception;
 
     Entry(String resourceName, Function<IdealState, IdealState> updater) {
       _resourceName = resourceName;
@@ -106,8 +107,8 @@ public class IdealStateGroupCommit {
    * @param updater the idealState updater to be applied
    * @return IdealState if the update is successful, null if not
    */
-  public IdealState commit(HelixManager helixManager, String resourceName,
-      Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, 
boolean noChangeOk) {
+  public IdealState commit(HelixManager helixManager, String resourceName, 
Function<IdealState, IdealState> updater,
+      RetryPolicy retryPolicy, boolean noChangeOk) {
     Queue queue = getQueue(resourceName);
     Entry entry = new Entry(resourceName, updater);
 
@@ -120,39 +121,41 @@ public class IdealStateGroupCommit {
             // All pending entries have been processed, the updatedIdealState 
should be set.
             return entry._updatedIdealState;
           }
-          // remove from queue
-          Entry first = queue._pending.poll();
-          processed.add(first);
-          String mergedResourceName = first._resourceName;
-          HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
-          PropertyKey idealStateKey = 
dataAccessor.keyBuilder().idealStates(resourceName);
-          IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
-          // Make a copy of the idealState above to pass it to the updater
-          // NOTE: new IdealState(idealState.getRecord()) does not work 
because it's shallow copy for map fields and
-          // list fields
-          IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
-
-          /**
-           * If the local cache does not contain a value, need to check if 
there is a
-           * value in ZK; use it as initial value if exists
-           */
-          IdealState updatedIdealState = first._updater.apply(idealStateCopy);
-          first._updatedIdealState = updatedIdealState;
-          Iterator<Entry> it = queue._pending.iterator();
-          while (it.hasNext()) {
-            Entry ent = it.next();
-            if (!ent._resourceName.equals(mergedResourceName)) {
-              continue;
+          IdealState response = updateIdealState(helixManager, resourceName, 
idealState -> {
+            IdealState updatedIdealState = idealState;
+            if (!processed.isEmpty()) {
+              queue._pending.addAll(processed);
+              processed.clear();
+            }
+            Iterator<Entry> it = queue._pending.iterator();
+            while (it.hasNext()) {
+              Entry ent = it.next();
+              if (!ent._resourceName.equals(resourceName)) {
+                continue;
+              }
+              processed.add(ent);
+              it.remove();
+              updatedIdealState = ent._updater.apply(updatedIdealState);
+              ent._updatedIdealState = updatedIdealState;
+              ent._exception = null;
             }
-            processed.add(ent);
-            updatedIdealState = ent._updater.apply(idealStateCopy);
-            ent._updatedIdealState = updatedIdealState;
-            it.remove();
+            return updatedIdealState;
+          }, retryPolicy, noChangeOk);
+          if (response == null) {
+            RuntimeException ex = new RuntimeException("Failed to update 
IdealState");
+            for (Entry ent : processed) {
+              ent._exception = ex;
+              ent._updatedIdealState = null;
+            }
+            throw ex;
+          }
+        } catch (Throwable e) {
+          // If there is an exception, set the exception for all processed 
entries
+          for (Entry ent : processed) {
+            ent._exception = e;
+            ent._updatedIdealState = null;
           }
-          IdealState finalUpdatedIdealState = updatedIdealState;
-          updateIdealState(helixManager, resourceName, anyIdealState -> 
finalUpdatedIdealState,
-              retryPolicy, noChangeOk);
+          throw e;
         } finally {
           queue._running.set(null);
           for (Entry e : processed) {
@@ -176,6 +179,10 @@ public class IdealStateGroupCommit {
         }
       }
     }
+    if (entry._exception != null) {
+      throw new RuntimeException("Caught exception while updating ideal state 
for resource: " + resourceName,
+          entry._exception);
+    }
     return entry._updatedIdealState;
   }
 
@@ -298,7 +305,7 @@ public class IdealStateGroupCommit {
         controllerMetrics.addMeteredValue(resourceName, 
ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS, 1L);
       }
       return idealStateWrapper._idealState;
-    } catch (Exception e) {
+    } catch (Throwable e) {
       if (controllerMetrics != null) {
         controllerMetrics.addMeteredValue(resourceName, 
ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
       }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
index ffe39764a4..40304d14c6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.controller.helix;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Function;
@@ -32,60 +35,93 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 
 public class IdealStateGroupCommitTest {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IdealStateGroupCommit.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IdealStateGroupCommitTest.class);
   private static final ControllerTest TEST_INSTANCE = 
ControllerTest.getInstance();
-  private static final String TABLE_NAME = "potato_OFFLINE";
-  private static final int NUM_UPDATES = 2400;
+  private static final String TABLE_NAME_PREFIX = "potato_";
+  private static final int NUM_PROCESSORS = 100;
+  private static final int NUM_UPDATES = 2000;
+  private static final int NUM_TABLES = 20;
+
+  private ExecutorService _executorService;
 
   @BeforeClass
   public void setUp()
       throws Exception {
     TEST_INSTANCE.setupSharedStateAndValidate();
+    _executorService = Executors.newFixedThreadPool(100);
+  }
+
+  @BeforeMethod
+  public void beforeTest() {
+    for (int i = 0; i < NUM_UPDATES; i++) {
+      String tableName = TABLE_NAME_PREFIX + i + "_OFFLINE";
+      IdealState idealState = new IdealState(tableName);
+      idealState.setStateModelDefRef("OnlineOffline");
+      idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+      idealState.setReplicas("1");
+      idealState.setNumPartitions(0);
+      
TEST_INSTANCE.getHelixAdmin().addResource(TEST_INSTANCE.getHelixClusterName(), 
tableName, idealState);
+      ControllerMetrics.get().removeTableMeter(tableName, 
ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS);
+    }
+  }
 
-    IdealState idealState = new IdealState(TABLE_NAME);
-    idealState.setStateModelDefRef("OnlineOffline");
-    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
-    idealState.setReplicas("1");
-    idealState.setNumPartitions(0);
-    TEST_INSTANCE.getHelixAdmin()
-        .addResource(TEST_INSTANCE.getHelixClusterName(), TABLE_NAME, 
idealState);
+  @AfterMethod
+  public void afterTest() {
+    for (int i = 0; i < NUM_UPDATES; i++) {
+      String tableName = TABLE_NAME_PREFIX + i + "_OFFLINE";
+      
TEST_INSTANCE.getHelixAdmin().dropResource(TEST_INSTANCE.getHelixClusterName(), 
tableName);
+    }
   }
 
   @AfterClass
   public void tearDown() {
+    _executorService.shutdown();
     TEST_INSTANCE.cleanup();
   }
 
-  @Test
+  @Test(invocationCount = 5)
   public void testGroupCommit()
       throws InterruptedException {
-    final IdealStateGroupCommit commit = new IdealStateGroupCommit();
-    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(400);
+    List<IdealStateGroupCommit> groupCommitList = new ArrayList<>();
+    for (int i = 0; i < NUM_PROCESSORS; i++) {
+      groupCommitList.add(new IdealStateGroupCommit());
+    }
     for (int i = 0; i < NUM_UPDATES; i++) {
-      Runnable runnable = new 
IdealStateUpdater(TEST_INSTANCE.getHelixManager(), commit, TABLE_NAME, i);
-      newFixedThreadPool.submit(runnable);
+      for (int j = 0; j < NUM_TABLES; j++) {
+        String tableName = TABLE_NAME_PREFIX + j + "_OFFLINE";
+        IdealStateGroupCommit commit = groupCommitList.get(new 
Random().nextInt(NUM_PROCESSORS));
+        Runnable runnable = new 
IdealStateUpdater(TEST_INSTANCE.getHelixManager(), commit, tableName, i);
+        _executorService.submit(runnable);
+      }
     }
-    IdealState idealState = 
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
-    while (idealState.getNumPartitions() < NUM_UPDATES) {
-      Thread.sleep(500);
-      idealState = 
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
+    for (int i = 0; i < NUM_TABLES; i++) {
+      String tableName = TABLE_NAME_PREFIX + i + "_OFFLINE";
+      IdealState idealState = 
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName);
+      while (idealState.getNumPartitions() < NUM_UPDATES) {
+        Thread.sleep(500);
+        idealState = 
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName);
+      }
+      Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES);
+      ControllerMetrics controllerMetrics = ControllerMetrics.get();
+      long idealStateUpdateSuccessCount =
+          controllerMetrics.getMeteredTableValue(tableName, 
ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count();
+      Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES);
+      LOGGER.info("{} IdealState update are successfully commited with {} 
times zk updates.", NUM_UPDATES,
+          idealStateUpdateSuccessCount);
     }
-    Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES);
-    ControllerMetrics controllerMetrics = ControllerMetrics.get();
-    long idealStateUpdateSuccessCount =
-        controllerMetrics.getMeteredTableValue(TABLE_NAME, 
ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count();
-    Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES);
-    LOGGER.info("{} IdealState update are successfully commited with {} times 
zk updates.", NUM_UPDATES,
-        idealStateUpdateSuccessCount);
   }
 }
 
 class IdealStateUpdater implements Runnable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IdealStateGroupCommitTest.class);
+
   private final HelixManager _helixManager;
   private final IdealStateGroupCommit _commit;
   private final String _tableName;
@@ -100,13 +136,22 @@ class IdealStateUpdater implements Runnable {
 
   @Override
   public void run() {
-    _commit.commit(_helixManager, _tableName, new Function<IdealState, 
IdealState>() {
+    Function<IdealState, IdealState> updater = new Function<IdealState, 
IdealState>() {
       @Override
       public IdealState apply(IdealState idealState) {
         idealState.setPartitionState("test_id" + _i, "test_id" + _i, "ONLINE");
         return idealState;
       }
-    }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f), false);
-    HelixHelper.getTableIdealState(_helixManager, _tableName);
+    };
+
+    while (true) {
+      try {
+        if (_commit.commit(_helixManager, _tableName, updater, 
RetryPolicies.noDelayRetryPolicy(1), false) != null) {
+          break;
+        }
+      } catch (Throwable e) {
+        LOGGER.warn("IdealState updater {} failed to commit.", _i, e);
+      }
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to