xiangfu0 commented on code in PR #14214: URL: https://github.com/apache/pinot/pull/14214#discussion_r1800168227
########## pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java: ########## @@ -124,34 +124,27 @@ public IdealState commit(HelixManager helixManager, String resourceName, Entry first = queue._pending.poll(); processed.add(first); String mergedResourceName = first._resourceName; - HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); - PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName); - IdealState idealState = dataAccessor.getProperty(idealStateKey); - - // Make a copy of the idealState above to pass it to the updater - // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and - // list fields - IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState); - /** - * If the local cache does not contain a value, need to check if there is a - * value in ZK; use it as initial value if exists - */ - IdealState updatedIdealState = first._updater.apply(idealStateCopy); - first._updatedIdealState = updatedIdealState; - Iterator<Entry> it = queue._pending.iterator(); - while (it.hasNext()) { - Entry ent = it.next(); - if (!ent._resourceName.equals(mergedResourceName)) { - continue; - } - processed.add(ent); - updatedIdealState = ent._updater.apply(idealStateCopy); - ent._updatedIdealState = updatedIdealState; - it.remove(); - } - IdealState finalUpdatedIdealState = updatedIdealState; - updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState, + updateIdealState(helixManager, mergedResourceName, idealState -> { Review Comment: Good catch! Can you try to update the `IdealStateGroupCommitTest.java` test file and see if it passes. ``` /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.pinot.controller.helix; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; import org.apache.helix.HelixManager; import org.apache.helix.model.IdealState; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.common.utils.helix.IdealStateGroupCommit; import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class IdealStateGroupCommitTest { private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommit.class); private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance(); private static final String TABLE_NAME = "potato_OFFLINE"; private static final int NUM_PROCESSORS = 10; private static final int NUM_UPDATES = 2400; @BeforeClass public void setUp() throws Exception { TEST_INSTANCE.setupSharedStateAndValidate(); IdealState idealState = new IdealState(TABLE_NAME); idealState.setStateModelDefRef("OnlineOffline"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); idealState.setReplicas("1"); idealState.setNumPartitions(0); TEST_INSTANCE.getHelixAdmin() .addResource(TEST_INSTANCE.getHelixClusterName(), TABLE_NAME, idealState); } @AfterClass public void tearDown() { TEST_INSTANCE.cleanup(); } @Test public void testGroupCommit() throws InterruptedException { List<IdealStateGroupCommit> groupCommitList = new ArrayList<>(); for (int i = 0; i < NUM_PROCESSORS; i++) { groupCommitList.add(new IdealStateGroupCommit()); } ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(400); for (int i = 0; i < NUM_UPDATES; i++) { Runnable runnable = new IdealStateUpdater(TEST_INSTANCE.getHelixManager(), groupCommitList.get(new Random().nextInt(NUM_PROCESSORS)), TABLE_NAME, i); newFixedThreadPool.submit(runnable); } IdealState idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME); while (idealState.getNumPartitions() < NUM_UPDATES) { Thread.sleep(500); idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME); System.out.println("idealState.getNumPartitions() = " + idealState.getNumPartitions()); } Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES); ControllerMetrics controllerMetrics = ControllerMetrics.get(); long idealStateUpdateSuccessCount = controllerMetrics.getMeteredTableValue(TABLE_NAME, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count(); Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES); LOGGER.info("{} IdealState update are successfully commited with {} times zk updates.", NUM_UPDATES, idealStateUpdateSuccessCount); } } class IdealStateUpdater implements Runnable { private final HelixManager _helixManager; private final IdealStateGroupCommit _commit; private final String _tableName; private final int _i; public IdealStateUpdater(HelixManager helixManager, IdealStateGroupCommit commit, String tableName, int i) { _helixManager = helixManager; _commit = commit; _tableName = tableName; _i = i; } @Override public void run() { _commit.commit(_helixManager, _tableName, new Function<IdealState, IdealState>() { @Override public IdealState apply(IdealState idealState) { idealState.setPartitionState("test_id" + _i, "test_id" + _i, "ONLINE"); return idealState; } }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f), false); HelixHelper.getTableIdealState(_helixManager, _tableName); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org