This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 717895be69 Group commit IdealState updates (#13976) 717895be69 is described below commit 717895be6961aaa213f57b1438cd7faed2c0f16d Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Sat Sep 14 22:58:00 2024 +0800 Group commit IdealState updates (#13976) --- .../pinot/common/metrics/ControllerMeter.java | 3 +- .../pinot/common/utils/helix/HelixHelper.java | 162 +---------- .../common/utils/helix/IdealStateGroupCommit.java | 308 +++++++++++++++++++++ .../pinot/controller/BaseControllerStarter.java | 3 +- .../api/resources/PinotTableRestletResource.java | 9 +- .../helix/core/PinotHelixResourceManager.java | 76 ++--- .../realtime/PinotLLCRealtimeSegmentManager.java | 113 ++++---- .../helix/IdealStateGroupCommitTest.java | 112 ++++++++ .../PinotLLCRealtimeSegmentManagerTest.java | 5 - 9 files changed, 524 insertions(+), 267 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java index b474a44a6d..ee034ec952 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java @@ -67,7 +67,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter { TABLE_REBALANCE_RETRY_TOO_MANY_TIMES("TableRebalanceRetryTooManyTimes", false), NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false), IDEAL_STATE_UPDATE_FAILURE("IdealStateUpdateFailure", false), - IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false); + IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false), + IDEAL_STATE_UPDATE_SUCCESS("IdealStateUpdateSuccess", false); private final String _brokerMeterName; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java index 4ffad84d61..43e6210e18 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java @@ -22,22 +22,17 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; -import org.apache.helix.AccessOption; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope; @@ -47,12 +42,8 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; -import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; import org.apache.pinot.common.helix.ExtraInstanceConfig; import org.apache.pinot.common.metadata.ZKMetadataProvider; -import org.apache.pinot.common.metrics.ControllerMeter; -import org.apache.pinot.common.metrics.ControllerMetrics; -import org.apache.pinot.common.metrics.ControllerTimer; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -69,156 +60,38 @@ public class HelixHelper { private HelixHelper() { } - private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION = 1000; - private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression"; - private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f); private static final RetryPolicy DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY = RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L); + private static final Logger LOGGER = LoggerFactory.getLogger(HelixHelper.class); private static final ZNRecordSerializer ZN_RECORD_SERIALIZER = new ZNRecordSerializer(); + private static final IdealStateGroupCommit IDEAL_STATE_GROUP_COMMIT = new IdealStateGroupCommit(); private static final String ONLINE = "ONLINE"; private static final String OFFLINE = "OFFLINE"; public static final String BROKER_RESOURCE = CommonConstants.Helix.BROKER_RESOURCE_INSTANCE; - private static int _minNumCharsInISToTurnOnCompression = -1; - - public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNumChars) { - _minNumCharsInISToTurnOnCompression = minNumChars; - } - public static IdealState cloneIdealState(IdealState idealState) { return new IdealState( (ZNRecord) ZN_RECORD_SERIALIZER.deserialize(ZN_RECORD_SERIALIZER.serialize(idealState.getRecord()))); } - /** - * Updates the ideal state, retrying if necessary in case of concurrent updates to the ideal state. - * - * @param helixManager The HelixManager used to interact with the Helix cluster - * @param resourceName The resource for which to update the ideal state - * @param updater A function that returns an updated ideal state given an input ideal state - * @return updated ideal state if successful, null if not - */ public static IdealState updateIdealState(HelixManager helixManager, String resourceName, - Function<IdealState, IdealState> updater, RetryPolicy policy, boolean noChangeOk) { - // NOTE: ControllerMetrics could be null because this method might be invoked by Broker. - ControllerMetrics controllerMetrics = ControllerMetrics.get(); - try { - long startTimeMs = System.currentTimeMillis(); - IdealStateWrapper idealStateWrapper = new IdealStateWrapper(); - int retries = policy.attempt(new Callable<>() { - @Override - public Boolean call() { - HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); - PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName); - IdealState idealState = dataAccessor.getProperty(idealStateKey); - - // Make a copy of the idealState above to pass it to the updater - // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and - // list fields - IdealState idealStateCopy = cloneIdealState(idealState); - - IdealState updatedIdealState; - try { - updatedIdealState = updater.apply(idealStateCopy); - } catch (PermanentUpdaterException e) { - LOGGER.error("Caught permanent exception while updating ideal state for resource: {}", resourceName, e); - throw e; - } catch (Exception e) { - LOGGER.error("Caught exception while updating ideal state for resource: {}", resourceName, e); - return false; - } - - // If there are changes to apply, apply them - if (updatedIdealState != null && !idealState.equals(updatedIdealState)) { - ZNRecord updatedZNRecord = updatedIdealState.getRecord(); - - // Update number of partitions - int numPartitions = updatedZNRecord.getMapFields().size(); - updatedIdealState.setNumPartitions(numPartitions); - - // If the ideal state is large enough, enable compression - boolean enableCompression = shouldCompress(updatedIdealState); - if (enableCompression) { - updatedZNRecord.setBooleanField(ENABLE_COMPRESSIONS_KEY, true); - } else { - updatedZNRecord.getSimpleFields().remove(ENABLE_COMPRESSIONS_KEY); - } - - // Check version and set ideal state - try { - if (dataAccessor.getBaseDataAccessor() - .set(idealStateKey.getPath(), updatedZNRecord, idealState.getRecord().getVersion(), - AccessOption.PERSISTENT)) { - idealStateWrapper._idealState = updatedIdealState; - return true; - } else { - LOGGER.warn("Failed to update ideal state for resource: {}", resourceName); - return false; - } - } catch (ZkBadVersionException e) { - LOGGER.warn("Version changed while updating ideal state for resource: {}", resourceName); - return false; - } catch (Exception e) { - LOGGER.warn("Caught exception while updating ideal state for resource: {} (compressed={})", resourceName, - enableCompression, e); - return false; - } - } else { - if (noChangeOk) { - LOGGER.info("Idempotent or null ideal state update for resource {}, skipping update.", resourceName); - } else { - LOGGER.warn("Idempotent or null ideal state update for resource {}, skipping update.", resourceName); - } - idealStateWrapper._idealState = idealState; - return true; - } - } - - private boolean shouldCompress(IdealState is) { - if (is.getNumPartitions() > NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION) { - return true; - } + Function<IdealState, IdealState> updater) { + return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, + DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false); + } - // Find the number of characters in one partition in idealstate, and extrapolate - // to estimate the number of characters. - // We could serialize the znode to determine the exact size, but that would mean serializing every - // idealstate znode twice. We avoid some extra GC by estimating the size instead. Such estimations - // should be good for most installations that have similar segment and instance names. - Iterator<String> it = is.getPartitionSet().iterator(); - if (it.hasNext()) { - String partitionName = it.next(); - int numChars = partitionName.length(); - Map<String, String> stateMap = is.getInstanceStateMap(partitionName); - for (Map.Entry<String, String> entry : stateMap.entrySet()) { - numChars += entry.getKey().length(); - numChars += entry.getValue().length(); - } - numChars *= is.getNumPartitions(); - return _minNumCharsInISToTurnOnCompression > 0 && numChars > _minNumCharsInISToTurnOnCompression; - } - return false; - } - }); - if (controllerMetrics != null) { - controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries); - controllerMetrics.addTimedValue(resourceName, ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS, - System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS); - } - return idealStateWrapper._idealState; - } catch (Exception e) { - if (controllerMetrics != null) { - controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L); - } - throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e); - } + public static IdealState updateIdealState(HelixManager helixManager, String resourceName, + Function<IdealState, IdealState> updater, RetryPolicy retryPolicy) { + return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, false); } - private static class IdealStateWrapper { - IdealState _idealState; + public static IdealState updateIdealState(HelixManager helixManager, String resourceName, + Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean noChangeOk) { + return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater, retryPolicy, noChangeOk); } /** @@ -235,16 +108,6 @@ public class HelixHelper { } } - public static IdealState updateIdealState(HelixManager helixManager, String resourceName, - Function<IdealState, IdealState> updater) { - return updateIdealState(helixManager, resourceName, updater, DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false); - } - - public static IdealState updateIdealState(final HelixManager helixManager, final String resourceName, - final Function<IdealState, IdealState> updater, RetryPolicy policy) { - return updateIdealState(helixManager, resourceName, updater, policy, false); - } - /** * Updates broker resource ideal state for the given broker with the given broker tags. Optional {@code tablesAdded} * and {@code tablesRemoved} can be provided to track the tables added/removed during the update. @@ -554,7 +417,6 @@ public class HelixHelper { return instancesWithoutTag.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList()); } - public static List<InstanceConfig> getInstancesConfigsWithTag(List<InstanceConfig> instanceConfigs, String tag) { List<InstanceConfig> instancesWithTag = new ArrayList<>(); for (InstanceConfig instanceConfig : instanceConfigs) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java new file mode 100644 index 0000000000..ea74fb18e2 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.helix; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.helix.AccessOption; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.ControllerTimer; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * IdealStateGroupCommit is a utility class to commit group updates to IdealState. + * It is designed to be used in a multi-threaded environment where multiple threads + * may try to update the same IdealState concurrently. + * The implementation is shamelessly borrowed from (<a href= + * "https://github.com/apache/helix/blob/helix-1.4.1/helix-core/src/main/java/org/apache/helix/GroupCommit.java" + * >HelixGroupCommit</a>). + * This is especially useful for updating large IdealState, which each update may + * take a long time, e.g. to update one IdealState with 100k segments may take + * ~4 seconds, then 15 updates will take 1 minute and cause other requests + * (e.g. Segment upload, realtime segment commit, segment deletion, etc) timeout. + */ +public class IdealStateGroupCommit { + private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommit.class); + + private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION = 1000; + private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression"; + + private static int _minNumCharsInISToTurnOnCompression = -1; + + private static class Queue { + final AtomicReference<Thread> _running = new AtomicReference<Thread>(); + final ConcurrentLinkedQueue<Entry> _pending = new ConcurrentLinkedQueue<Entry>(); + } + + private static class Entry { + final String _resourceName; + final Function<IdealState, IdealState> _updater; + IdealState _updatedIdealState = null; + AtomicBoolean _sent = new AtomicBoolean(false); + + Entry(String resourceName, Function<IdealState, IdealState> updater) { + _resourceName = resourceName; + _updater = updater; + } + } + + private final Queue[] _queues = new Queue[100]; + + /** + * Set up a group committer and its associated queues + */ + public IdealStateGroupCommit() { + // Don't use Arrays.fill(); + for (int i = 0; i < _queues.length; i++) { + _queues[i] = new Queue(); + } + } + + private Queue getQueue(String resourceName) { + return _queues[(resourceName.hashCode() & Integer.MAX_VALUE) % _queues.length]; + } + + public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNumChars) { + _minNumCharsInISToTurnOnCompression = minNumChars; + } + + /** + * Do a group update for idealState associated with a given resource key + * @param helixManager helixManager with the ability to pull from the current data\ + * @param resourceName the resource name to be updated + * @param updater the idealState updater to be applied + * @return IdealState if the update is successful, null if not + */ + public IdealState commit(HelixManager helixManager, String resourceName, + Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean noChangeOk) { + Queue queue = getQueue(resourceName); + Entry entry = new Entry(resourceName, updater); + + queue._pending.add(entry); + while (!entry._sent.get()) { + if (queue._running.compareAndSet(null, Thread.currentThread())) { + ArrayList<Entry> processed = new ArrayList<>(); + try { + if (queue._pending.peek() == null) { + // All pending entries have been processed, the updatedIdealState should be set. + return entry._updatedIdealState; + } + // remove from queue + Entry first = queue._pending.poll(); + processed.add(first); + String mergedResourceName = first._resourceName; + HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); + PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName); + IdealState idealState = dataAccessor.getProperty(idealStateKey); + + // Make a copy of the idealState above to pass it to the updater + // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and + // list fields + IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState); + + /** + * If the local cache does not contain a value, need to check if there is a + * value in ZK; use it as initial value if exists + */ + IdealState updatedIdealState = first._updater.apply(idealStateCopy); + first._updatedIdealState = updatedIdealState; + Iterator<Entry> it = queue._pending.iterator(); + while (it.hasNext()) { + Entry ent = it.next(); + if (!ent._resourceName.equals(mergedResourceName)) { + continue; + } + processed.add(ent); + updatedIdealState = ent._updater.apply(idealStateCopy); + ent._updatedIdealState = updatedIdealState; + it.remove(); + } + IdealState finalUpdatedIdealState = updatedIdealState; + updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState, + retryPolicy, noChangeOk); + } finally { + queue._running.set(null); + for (Entry e : processed) { + synchronized (e) { + e._sent.set(true); + e.notify(); + } + } + } + } else { + synchronized (entry) { + try { + entry.wait(10); + } catch (InterruptedException e) { + LOGGER.error("Interrupted while committing change, resourceName: " + resourceName + ", updater: " + updater, + e); + // Restore interrupt status + Thread.currentThread().interrupt(); + return null; + } + } + } + } + return entry._updatedIdealState; + } + + private static class IdealStateWrapper { + IdealState _idealState; + } + + /** + * Updates the ideal state, retrying if necessary in case of concurrent updates to the ideal state. + * + * @param helixManager The HelixManager used to interact with the Helix cluster + * @param resourceName The resource for which to update the ideal state + * @param updater A function that returns an updated ideal state given an input ideal state + * @return updated ideal state if successful, null if not + */ + private static IdealState updateIdealState(HelixManager helixManager, String resourceName, + Function<IdealState, IdealState> updater, RetryPolicy policy, boolean noChangeOk) { + // NOTE: ControllerMetrics could be null because this method might be invoked by Broker. + ControllerMetrics controllerMetrics = ControllerMetrics.get(); + try { + long startTimeMs = System.currentTimeMillis(); + IdealStateWrapper idealStateWrapper = new IdealStateWrapper(); + int retries = policy.attempt(new Callable<>() { + @Override + public Boolean call() { + HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); + PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName); + IdealState idealState = dataAccessor.getProperty(idealStateKey); + + // Make a copy of the idealState above to pass it to the updater + // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and + // list fields + IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState); + IdealState updatedIdealState; + try { + updatedIdealState = updater.apply(idealStateCopy); + } catch (HelixHelper.PermanentUpdaterException e) { + LOGGER.error("Caught permanent exception while updating ideal state for resource: {}", resourceName, e); + throw e; + } catch (Exception e) { + LOGGER.error("Caught exception while updating ideal state for resource: {}", resourceName, e); + return false; + } + + // If there are changes to apply, apply them + if (updatedIdealState != null && !idealState.equals(updatedIdealState)) { + ZNRecord updatedZNRecord = updatedIdealState.getRecord(); + + // Update number of partitions + int numPartitions = updatedZNRecord.getMapFields().size(); + updatedIdealState.setNumPartitions(numPartitions); + + // If the ideal state is large enough, enable compression + boolean enableCompression = shouldCompress(updatedIdealState); + if (enableCompression) { + updatedZNRecord.setBooleanField(ENABLE_COMPRESSIONS_KEY, true); + } else { + updatedZNRecord.getSimpleFields().remove(ENABLE_COMPRESSIONS_KEY); + } + + // Check version and set ideal state + try { + if (dataAccessor.getBaseDataAccessor() + .set(idealStateKey.getPath(), updatedZNRecord, idealState.getRecord().getVersion(), + AccessOption.PERSISTENT)) { + idealStateWrapper._idealState = updatedIdealState; + return true; + } else { + LOGGER.warn("Failed to update ideal state for resource: {}", resourceName); + return false; + } + } catch (ZkBadVersionException e) { + LOGGER.warn("Version changed while updating ideal state for resource: {}", resourceName); + return false; + } catch (Exception e) { + LOGGER.warn("Caught exception while updating ideal state for resource: {} (compressed={})", resourceName, + enableCompression, e); + return false; + } + } else { + if (noChangeOk) { + LOGGER.info("Idempotent or null ideal state update for resource {}, skipping update.", resourceName); + } else { + LOGGER.warn("Idempotent or null ideal state update for resource {}, skipping update.", resourceName); + } + idealStateWrapper._idealState = idealState; + return true; + } + } + + private boolean shouldCompress(IdealState is) { + if (is.getNumPartitions() > NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION) { + return true; + } + + // Find the number of characters in one partition in idealstate, and extrapolate + // to estimate the number of characters. + // We could serialize the znode to determine the exact size, but that would mean serializing every + // idealstate znode twice. We avoid some extra GC by estimating the size instead. Such estimations + // should be good for most installations that have similar segment and instance names. + Iterator<String> it = is.getPartitionSet().iterator(); + if (it.hasNext()) { + String partitionName = it.next(); + int numChars = partitionName.length(); + Map<String, String> stateMap = is.getInstanceStateMap(partitionName); + for (Map.Entry<String, String> entry : stateMap.entrySet()) { + numChars += entry.getKey().length(); + numChars += entry.getValue().length(); + } + numChars *= is.getNumPartitions(); + return _minNumCharsInISToTurnOnCompression > 0 && numChars > _minNumCharsInISToTurnOnCompression; + } + return false; + } + }); + if (controllerMetrics != null) { + controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries); + controllerMetrics.addTimedValue(resourceName, ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS, + System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS); + controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS, 1L); + } + return idealStateWrapper._idealState; + } catch (Exception e) { + if (controllerMetrics != null) { + controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L); + } + throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index eae4276b84..5e4ff8751f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -76,6 +76,7 @@ import org.apache.pinot.common.utils.ServiceStartableUtils; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.common.utils.helix.IdealStateGroupCommit; import org.apache.pinot.common.utils.helix.LeadControllerUtils; import org.apache.pinot.common.utils.log.DummyLogFileServer; import org.apache.pinot.common.utils.log.LocalLogFileServer; @@ -213,7 +214,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { CommonConstants.DEFAULT_PINOT_INSECURE_MODE))); setupHelixSystemProperties(); - HelixHelper.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression()); + IdealStateGroupCommit.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression()); _listenerConfigs = ListenerConfigUtil.buildControllerConfigs(_config); _controllerMode = _config.getControllerMode(); inferHostnameIfNeeded(_config); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index bc7d16bde6..dee89efd64 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -129,15 +129,18 @@ import static org.apache.pinot.spi.utils.CommonConstants.DATABASE; import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; -@Api(tags = Constants.TABLE_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY), - @Authorization(value = DATABASE)}) +@Api(tags = Constants.TABLE_TAG, authorizations = { + @Authorization(value = SWAGGER_AUTHORIZATION_KEY), + @Authorization(value = DATABASE) +}) @SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = { @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY, description = "The format of the key is ```\"Basic <token>\" or \"Bearer <token>\"```"), @ApiKeyAuthDefinition(name = DATABASE, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE, description = "Database context passed through http header. If no context is provided 'default' database " - + "context will be considered.")})) + + "context will be considered.") +})) @Path("/") public class PinotTableRestletResource { /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index c7eeb341e1..2b59109b89 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -212,7 +212,6 @@ public class PinotHelixResourceManager { private final Map<String, Map<String, Long>> _segmentCrcMap = new HashMap<>(); private final Map<String, Map<String, Integer>> _lastKnownSegmentMetadataVersionMap = new HashMap<>(); - private final Object[] _idealStateUpdaterLocks; private final Object[] _lineageUpdaterLocks; private final LoadingCache<String, String> _instanceAdminEndpointCache; @@ -256,10 +255,6 @@ public class PinotHelixResourceManager { return InstanceUtils.getServerAdminEndpoint(instanceConfig); } }); - _idealStateUpdaterLocks = new Object[DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE]; - for (int i = 0; i < _idealStateUpdaterLocks.length; i++) { - _idealStateUpdaterLocks[i] = new Object(); - } _lineageUpdaterLocks = new Object[DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE]; for (int i = 0; i < _lineageUpdaterLocks.length; i++) { _lineageUpdaterLocks[i] = new Object(); @@ -1018,16 +1013,13 @@ public class PinotHelixResourceManager { LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType); Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType), "Table name: %s is not a valid table name with type suffix", tableNameWithType); - - synchronized (getIdealStateUpdaterLock(tableNameWithType)) { - HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames); - if (retentionPeriod != null) { - _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, - TimeUtils.convertPeriodToMillis(retentionPeriod)); - } else { - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); - _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, tableConfig); - } + HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames); + if (retentionPeriod != null) { + _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, + TimeUtils.convertPeriodToMillis(retentionPeriod)); + } else { + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); + _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, tableConfig); } return PinotResourceManagerResponse.success("Segment " + segmentNames + " deleted"); } catch (final Exception e) { @@ -1980,13 +1972,11 @@ public class PinotHelixResourceManager { IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); String replicationConfigured = Integer.toString(tableConfig.getReplication()); if (!idealState.getReplicas().equals(replicationConfigured)) { - synchronized (getIdealStateUpdaterLock(tableNameWithType)) { - HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is -> { - assert is != null; - is.setReplicas(replicationConfigured); - return is; - }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); - } + HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is -> { + assert is != null; + is.setReplicas(replicationConfigured); + return is; + }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); } // Assign instances @@ -2328,26 +2318,24 @@ public class PinotHelixResourceManager { SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics); - synchronized (getIdealStateUpdaterLock(tableNameWithType)) { - Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap; - HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { - assert idealState != null; - Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); - if (currentAssignment.containsKey(segmentName)) { - LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, - tableNameWithType); - } else { - List<String> assignedInstances = - segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); - LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, - tableNameWithType); - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); - } - return idealState; - }); - LOGGER.info("Added segment: {} to IdealState for table: {}", segmentName, tableNameWithType); - } + Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap; + HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { + assert idealState != null; + Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); + if (currentAssignment.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List<String> assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignment.put(segmentName, + SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); + } + return idealState; + }); + LOGGER.info("Added segment: {} to IdealState for table: {}", segmentName, tableNameWithType); } catch (Exception e) { LOGGER.error( "Caught exception while adding segment: {} to IdealState for table: {}, deleting segment ZK metadata", @@ -2402,10 +2390,6 @@ public class PinotHelixResourceManager { return ((upsertConfig != null) && upsertConfig.getMode() != UpsertConfig.Mode.NONE); } - public Object getIdealStateUpdaterLock(String tableNameWithType) { - return _idealStateUpdaterLocks[(tableNameWithType.hashCode() & Integer.MAX_VALUE) % _idealStateUpdaterLocks.length]; - } - public Object getLineageUpdaterLock(String tableNameWithType) { return _lineageUpdaterLocks[(tableNameWithType.hashCode() & Integer.MAX_VALUE) % _lineageUpdaterLocks.length]; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 910291ff00..76bef151a0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -585,11 +585,9 @@ public class PinotLLCRealtimeSegmentManager { // the idealstate update fails due to contention. We serialize the updates to the idealstate // to reduce this contention. We may still contend with RetentionManager, or other updates // to idealstate from other controllers, but then we have the retry mechanism to get around that. - synchronized (_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) { - idealState = - updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentName, - segmentAssignment, instancePartitionsMap); - } + idealState = + updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentName, + segmentAssignment, instancePartitionsMap); long endTimeNs = System.nanoTime(); LOGGER.info( @@ -816,24 +814,22 @@ public class PinotLLCRealtimeSegmentManager { String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName()); String segmentName = llcSegmentName.getSegmentName(); LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", segmentName, instanceName); - synchronized (_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) { - try { - HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { - assert idealState != null; - Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName); - String state = stateMap.get(instanceName); - if (SegmentStateModel.CONSUMING.equals(state)) { - stateMap.put(instanceName, SegmentStateModel.OFFLINE); - } else { - LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", segmentName, state, - instanceName); - } - return idealState; - }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true); - } catch (Exception e) { - _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L); - throw e; - } + try { + HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { + assert idealState != null; + Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName); + String state = stateMap.get(instanceName); + if (SegmentStateModel.CONSUMING.equals(state)) { + stateMap.put(instanceName, SegmentStateModel.OFFLINE); + } else { + LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", segmentName, state, + instanceName); + } + return idealState; + }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true); + } catch (Exception e) { + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L); + throw e; } // We know that we have successfully set the idealstate to be OFFLINE. // We can now do a best effort to reset the externalview to be OFFLINE if it is in ERROR state. @@ -925,33 +921,31 @@ public class PinotLLCRealtimeSegmentManager { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); - synchronized (_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) { - HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { - assert idealState != null; - boolean isTableEnabled = idealState.isEnabled(); - boolean isTablePaused = isTablePaused(idealState); - boolean offsetsHaveToChange = offsetCriteria != null; - if (isTableEnabled && !isTablePaused) { - List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = - offsetsHaveToChange - ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions - : getPartitionGroupConsumptionStatusList(idealState, streamConfig); - OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); - // Read the smallest offset when a new partition is detected - streamConfig.setOffsetCriteria( - offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA); - List<PartitionGroupMetadata> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); - streamConfig.setOffsetCriteria(originalOffsetCriteria); - return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, - recreateDeletedConsumingSegment, offsetCriteria); - } else { - LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", - realtimeTableName, isTableEnabled, isTablePaused); - return idealState; - } - }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true); - } + HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { + assert idealState != null; + boolean isTableEnabled = idealState.isEnabled(); + boolean isTablePaused = isTablePaused(idealState); + boolean offsetsHaveToChange = offsetCriteria != null; + if (isTableEnabled && !isTablePaused) { + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + offsetsHaveToChange + ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions + : getPartitionGroupConsumptionStatusList(idealState, streamConfig); + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + // Read the smallest offset when a new partition is detected + streamConfig.setOffsetCriteria( + offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); + streamConfig.setOffsetCriteria(originalOffsetCriteria); + return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, + recreateDeletedConsumingSegment, offsetCriteria); + } else { + LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", + realtimeTableName, isTableEnabled, isTablePaused); + return idealState; + } + }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true); } /** @@ -961,7 +955,7 @@ public class PinotLLCRealtimeSegmentManager { IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { - return HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { + return HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { assert idealState != null; // When segment completion begins, the zk metadata is updated, followed by ideal state. // We allow only {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms for a segment to @@ -1760,16 +1754,13 @@ public class PinotLLCRealtimeSegmentManager { PauseState.ReasonCode reasonCode, @Nullable String comment) { PauseState pauseState = new PauseState(pause, reasonCode, comment, new Timestamp(System.currentTimeMillis()).toString()); - IdealState updatedIdealState; - synchronized (_helixResourceManager.getIdealStateUpdaterLock(tableNameWithType)) { - updatedIdealState = HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> { - ZNRecord znRecord = idealState.getRecord(); - znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString()); - // maintain for backward compatibility - znRecord.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(pause).toString()); - return new IdealState(znRecord); - }, RetryPolicies.noDelayRetryPolicy(3)); - } + IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> { + ZNRecord znRecord = idealState.getRecord(); + znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString()); + // maintain for backward compatibility + znRecord.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(pause).toString()); + return new IdealState(znRecord); + }, RetryPolicies.noDelayRetryPolicy(3)); LOGGER.info("Set 'pauseState' to {} in the Ideal State for table {}. " + "Also set 'isTablePaused' to {} for backward compatibility.", pauseState, tableNameWithType, pause); return updatedIdealState; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java new file mode 100644 index 0000000000..ffe39764a4 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Function; +import org.apache.helix.HelixManager; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.common.utils.helix.IdealStateGroupCommit; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class IdealStateGroupCommitTest { + private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommit.class); + private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance(); + private static final String TABLE_NAME = "potato_OFFLINE"; + private static final int NUM_UPDATES = 2400; + + @BeforeClass + public void setUp() + throws Exception { + TEST_INSTANCE.setupSharedStateAndValidate(); + + IdealState idealState = new IdealState(TABLE_NAME); + idealState.setStateModelDefRef("OnlineOffline"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + idealState.setReplicas("1"); + idealState.setNumPartitions(0); + TEST_INSTANCE.getHelixAdmin() + .addResource(TEST_INSTANCE.getHelixClusterName(), TABLE_NAME, idealState); + } + + @AfterClass + public void tearDown() { + TEST_INSTANCE.cleanup(); + } + + @Test + public void testGroupCommit() + throws InterruptedException { + final IdealStateGroupCommit commit = new IdealStateGroupCommit(); + ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(400); + for (int i = 0; i < NUM_UPDATES; i++) { + Runnable runnable = new IdealStateUpdater(TEST_INSTANCE.getHelixManager(), commit, TABLE_NAME, i); + newFixedThreadPool.submit(runnable); + } + IdealState idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME); + while (idealState.getNumPartitions() < NUM_UPDATES) { + Thread.sleep(500); + idealState = HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME); + } + Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES); + ControllerMetrics controllerMetrics = ControllerMetrics.get(); + long idealStateUpdateSuccessCount = + controllerMetrics.getMeteredTableValue(TABLE_NAME, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count(); + Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES); + LOGGER.info("{} IdealState update are successfully commited with {} times zk updates.", NUM_UPDATES, + idealStateUpdateSuccessCount); + } +} + +class IdealStateUpdater implements Runnable { + private final HelixManager _helixManager; + private final IdealStateGroupCommit _commit; + private final String _tableName; + private final int _i; + + public IdealStateUpdater(HelixManager helixManager, IdealStateGroupCommit commit, String tableName, int i) { + _helixManager = helixManager; + _commit = commit; + _tableName = tableName; + _i = i; + } + + @Override + public void run() { + _commit.commit(_helixManager, _tableName, new Function<IdealState, IdealState>() { + @Override + public IdealState apply(IdealState idealState) { + idealState.setPartitionState("test_id" + _i, "test_id" + _i, "ONLINE"); + return idealState; + } + }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f), false); + HelixHelper.getTableIdealState(_helixManager, _tableName); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 16d1983a21..b1f8520aad 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -90,7 +90,6 @@ import org.testng.annotations.Test; import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION; import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS; import static org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.*; @@ -202,7 +201,6 @@ public class PinotLLCRealtimeSegmentManagerTest { public void testCommitSegment() { // Set up a new table with 2 replicas, 5 instances, 4 partition PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); - when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new Object()); FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); setUpNewTable(segmentManager, 2, 5, 4); @@ -325,7 +323,6 @@ public class PinotLLCRealtimeSegmentManagerTest { public void testSetUpNewPartitions() { // Set up a new table with 2 replicas, 5 instances, 0 partition PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); - when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new Object()); FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); setUpNewTable(segmentManager, 2, 5, 0); @@ -499,7 +496,6 @@ public class PinotLLCRealtimeSegmentManagerTest { public void testRepairs() { // Set up a new table with 2 replicas, 5 instances, 4 partitions PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); - when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new Object()); FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); setUpNewTable(segmentManager, 2, 5, 4); @@ -894,7 +890,6 @@ public class PinotLLCRealtimeSegmentManagerTest { public void testCommitSegmentMetadata() { // Set up a new table with 2 replicas, 5 instances, 4 partition PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); - when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new Object()); FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); setUpNewTable(segmentManager, 2, 5, 4); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org