xiangfu0 commented on code in PR #14214:
URL: https://github.com/apache/pinot/pull/14214#discussion_r1800168227


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java:
##########
@@ -124,34 +124,27 @@ public IdealState commit(HelixManager helixManager, 
String resourceName,
           Entry first = queue._pending.poll();
           processed.add(first);
           String mergedResourceName = first._resourceName;
-          HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
-          PropertyKey idealStateKey = 
dataAccessor.keyBuilder().idealStates(resourceName);
-          IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
-          // Make a copy of the idealState above to pass it to the updater
-          // NOTE: new IdealState(idealState.getRecord()) does not work 
because it's shallow copy for map fields and
-          // list fields
-          IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
 
-          /**
-           * If the local cache does not contain a value, need to check if 
there is a
-           * value in ZK; use it as initial value if exists
-           */
-          IdealState updatedIdealState = first._updater.apply(idealStateCopy);
-          first._updatedIdealState = updatedIdealState;
-          Iterator<Entry> it = queue._pending.iterator();
-          while (it.hasNext()) {
-            Entry ent = it.next();
-            if (!ent._resourceName.equals(mergedResourceName)) {
-              continue;
-            }
-            processed.add(ent);
-            updatedIdealState = ent._updater.apply(idealStateCopy);
-            ent._updatedIdealState = updatedIdealState;
-            it.remove();
-          }
-          IdealState finalUpdatedIdealState = updatedIdealState;
-          updateIdealState(helixManager, resourceName, anyIdealState -> 
finalUpdatedIdealState,
+          updateIdealState(helixManager, mergedResourceName, idealState -> {

Review Comment:
   Good catch!
   Can you try to update the `IdealStateGroupCommitTest.java` test file and see 
if it passes.
   ```
   /**
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *   http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
   package org.apache.pinot.controller.helix;
   
   import java.util.ArrayList;
   import java.util.List;
   import java.util.Random;
   import java.util.concurrent.ExecutorService;
   import java.util.concurrent.Executors;
   import java.util.function.Function;
   import org.apache.helix.HelixManager;
   import org.apache.helix.model.IdealState;
   import org.apache.pinot.common.metrics.ControllerMeter;
   import org.apache.pinot.common.metrics.ControllerMetrics;
   import org.apache.pinot.common.utils.helix.HelixHelper;
   import org.apache.pinot.common.utils.helix.IdealStateGroupCommit;
   import org.apache.pinot.spi.utils.retry.RetryPolicies;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   import org.testng.Assert;
   import org.testng.annotations.AfterClass;
   import org.testng.annotations.BeforeClass;
   import org.testng.annotations.Test;
   
   
   public class IdealStateGroupCommitTest {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(IdealStateGroupCommit.class);
     private static final ControllerTest TEST_INSTANCE = 
ControllerTest.getInstance();
     private static final String TABLE_NAME = "potato_OFFLINE";
     private static final int NUM_PROCESSORS = 10;
     private static final int NUM_UPDATES = 2400;
   
     @BeforeClass
     public void setUp()
         throws Exception {
       TEST_INSTANCE.setupSharedStateAndValidate();
   
       IdealState idealState = new IdealState(TABLE_NAME);
       idealState.setStateModelDefRef("OnlineOffline");
       idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
       idealState.setReplicas("1");
       idealState.setNumPartitions(0);
       TEST_INSTANCE.getHelixAdmin()
           .addResource(TEST_INSTANCE.getHelixClusterName(), TABLE_NAME, 
idealState);
     }
   
     @AfterClass
     public void tearDown() {
       TEST_INSTANCE.cleanup();
     }
   
     @Test
     public void testGroupCommit()
         throws InterruptedException {
       List<IdealStateGroupCommit> groupCommitList = new ArrayList<>();
       for (int i = 0; i < NUM_PROCESSORS; i++) {
         groupCommitList.add(new IdealStateGroupCommit());
       }
       ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(400);
       for (int i = 0; i < NUM_UPDATES; i++) {
         Runnable runnable = new 
IdealStateUpdater(TEST_INSTANCE.getHelixManager(), groupCommitList.get(new 
Random().nextInt(NUM_PROCESSORS)), TABLE_NAME, i);
         newFixedThreadPool.submit(runnable);
       }
       IdealState idealState = 
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
       while (idealState.getNumPartitions() < NUM_UPDATES) {
         Thread.sleep(500);
         idealState = 
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
         System.out.println("idealState.getNumPartitions() = " + 
idealState.getNumPartitions());
       }
       Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES);
       ControllerMetrics controllerMetrics = ControllerMetrics.get();
       long idealStateUpdateSuccessCount =
           controllerMetrics.getMeteredTableValue(TABLE_NAME, 
ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count();
       Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES);
       LOGGER.info("{} IdealState update are successfully commited with {} 
times zk updates.", NUM_UPDATES,
           idealStateUpdateSuccessCount);
     }
   }
   
   class IdealStateUpdater implements Runnable {
     private final HelixManager _helixManager;
     private final IdealStateGroupCommit _commit;
     private final String _tableName;
     private final int _i;
   
     public IdealStateUpdater(HelixManager helixManager, IdealStateGroupCommit 
commit, String tableName, int i) {
       _helixManager = helixManager;
       _commit = commit;
       _tableName = tableName;
       _i = i;
     }
   
     @Override
     public void run() {
       _commit.commit(_helixManager, _tableName, new Function<IdealState, 
IdealState>() {
         @Override
         public IdealState apply(IdealState idealState) {
           idealState.setPartitionState("test_id" + _i, "test_id" + _i, 
"ONLINE");
           return idealState;
         }
       }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f), false);
       HelixHelper.getTableIdealState(_helixManager, _tableName);
     }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to