This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 717895be69 Group commit IdealState updates (#13976)
717895be69 is described below

commit 717895be6961aaa213f57b1438cd7faed2c0f16d
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Sat Sep 14 22:58:00 2024 +0800

    Group commit IdealState updates (#13976)
---
 .../pinot/common/metrics/ControllerMeter.java      |   3 +-
 .../pinot/common/utils/helix/HelixHelper.java      | 162 +----------
 .../common/utils/helix/IdealStateGroupCommit.java  | 308 +++++++++++++++++++++
 .../pinot/controller/BaseControllerStarter.java    |   3 +-
 .../api/resources/PinotTableRestletResource.java   |   9 +-
 .../helix/core/PinotHelixResourceManager.java      |  76 ++---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 113 ++++----
 .../helix/IdealStateGroupCommitTest.java           | 112 ++++++++
 .../PinotLLCRealtimeSegmentManagerTest.java        |   5 -
 9 files changed, 524 insertions(+), 267 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index b474a44a6d..ee034ec952 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -67,7 +67,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   TABLE_REBALANCE_RETRY_TOO_MANY_TIMES("TableRebalanceRetryTooManyTimes", 
false),
   NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false),
   IDEAL_STATE_UPDATE_FAILURE("IdealStateUpdateFailure", false),
-  IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false);
+  IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false),
+  IDEAL_STATE_UPDATE_SUCCESS("IdealStateUpdateSuccess", false);
 
 
   private final String _brokerMeterName;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index 4ffad84d61..43e6210e18 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -22,22 +22,17 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.helix.AccessOption;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
@@ -47,12 +42,8 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
-import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.pinot.common.helix.ExtraInstanceConfig;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metrics.ControllerMeter;
-import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.common.metrics.ControllerTimer;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -69,156 +60,38 @@ public class HelixHelper {
   private HelixHelper() {
   }
 
-  private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION = 
1000;
-  private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression";
-
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
   private static final RetryPolicy 
DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY =
       RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L);
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixHelper.class);
   private static final ZNRecordSerializer ZN_RECORD_SERIALIZER = new 
ZNRecordSerializer();
+  private static final IdealStateGroupCommit IDEAL_STATE_GROUP_COMMIT = new 
IdealStateGroupCommit();
 
   private static final String ONLINE = "ONLINE";
   private static final String OFFLINE = "OFFLINE";
 
   public static final String BROKER_RESOURCE = 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE;
 
-  private static int _minNumCharsInISToTurnOnCompression = -1;
-
-  public static synchronized void setMinNumCharsInISToTurnOnCompression(int 
minNumChars) {
-    _minNumCharsInISToTurnOnCompression = minNumChars;
-  }
-
   public static IdealState cloneIdealState(IdealState idealState) {
     return new IdealState(
         (ZNRecord) 
ZN_RECORD_SERIALIZER.deserialize(ZN_RECORD_SERIALIZER.serialize(idealState.getRecord())));
   }
 
-  /**
-   * Updates the ideal state, retrying if necessary in case of concurrent 
updates to the ideal state.
-   *
-   * @param helixManager The HelixManager used to interact with the Helix 
cluster
-   * @param resourceName The resource for which to update the ideal state
-   * @param updater A function that returns an updated ideal state given an 
input ideal state
-   * @return updated ideal state if successful, null if not
-   */
   public static IdealState updateIdealState(HelixManager helixManager, String 
resourceName,
-      Function<IdealState, IdealState> updater, RetryPolicy policy, boolean 
noChangeOk) {
-    // NOTE: ControllerMetrics could be null because this method might be 
invoked by Broker.
-    ControllerMetrics controllerMetrics = ControllerMetrics.get();
-    try {
-      long startTimeMs = System.currentTimeMillis();
-      IdealStateWrapper idealStateWrapper = new IdealStateWrapper();
-      int retries = policy.attempt(new Callable<>() {
-        @Override
-        public Boolean call() {
-          HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
-          PropertyKey idealStateKey = 
dataAccessor.keyBuilder().idealStates(resourceName);
-          IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
-          // Make a copy of the idealState above to pass it to the updater
-          // NOTE: new IdealState(idealState.getRecord()) does not work 
because it's shallow copy for map fields and
-          // list fields
-          IdealState idealStateCopy = cloneIdealState(idealState);
-
-          IdealState updatedIdealState;
-          try {
-            updatedIdealState = updater.apply(idealStateCopy);
-          } catch (PermanentUpdaterException e) {
-            LOGGER.error("Caught permanent exception while updating ideal 
state for resource: {}", resourceName, e);
-            throw e;
-          } catch (Exception e) {
-            LOGGER.error("Caught exception while updating ideal state for 
resource: {}", resourceName, e);
-            return false;
-          }
-
-          // If there are changes to apply, apply them
-          if (updatedIdealState != null && 
!idealState.equals(updatedIdealState)) {
-            ZNRecord updatedZNRecord = updatedIdealState.getRecord();
-
-            // Update number of partitions
-            int numPartitions = updatedZNRecord.getMapFields().size();
-            updatedIdealState.setNumPartitions(numPartitions);
-
-            // If the ideal state is large enough, enable compression
-            boolean enableCompression = shouldCompress(updatedIdealState);
-            if (enableCompression) {
-              updatedZNRecord.setBooleanField(ENABLE_COMPRESSIONS_KEY, true);
-            } else {
-              
updatedZNRecord.getSimpleFields().remove(ENABLE_COMPRESSIONS_KEY);
-            }
-
-            // Check version and set ideal state
-            try {
-              if (dataAccessor.getBaseDataAccessor()
-                  .set(idealStateKey.getPath(), updatedZNRecord, 
idealState.getRecord().getVersion(),
-                      AccessOption.PERSISTENT)) {
-                idealStateWrapper._idealState = updatedIdealState;
-                return true;
-              } else {
-                LOGGER.warn("Failed to update ideal state for resource: {}", 
resourceName);
-                return false;
-              }
-            } catch (ZkBadVersionException e) {
-              LOGGER.warn("Version changed while updating ideal state for 
resource: {}", resourceName);
-              return false;
-            } catch (Exception e) {
-              LOGGER.warn("Caught exception while updating ideal state for 
resource: {} (compressed={})", resourceName,
-                  enableCompression, e);
-              return false;
-            }
-          } else {
-            if (noChangeOk) {
-              LOGGER.info("Idempotent or null ideal state update for resource 
{}, skipping update.", resourceName);
-            } else {
-              LOGGER.warn("Idempotent or null ideal state update for resource 
{}, skipping update.", resourceName);
-            }
-            idealStateWrapper._idealState = idealState;
-            return true;
-          }
-        }
-
-        private boolean shouldCompress(IdealState is) {
-          if (is.getNumPartitions() > 
NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION) {
-            return true;
-          }
+      Function<IdealState, IdealState> updater) {
+    return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater,
+        DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
+  }
 
-          // Find the number of characters in one partition in idealstate, and 
extrapolate
-          // to estimate the number of characters.
-          // We could serialize the znode to determine the exact size, but 
that would mean serializing every
-          // idealstate znode twice. We avoid some extra GC by estimating the 
size instead. Such estimations
-          // should be good for most installations that have similar segment 
and instance names.
-          Iterator<String> it = is.getPartitionSet().iterator();
-          if (it.hasNext()) {
-            String partitionName = it.next();
-            int numChars = partitionName.length();
-            Map<String, String> stateMap = 
is.getInstanceStateMap(partitionName);
-            for (Map.Entry<String, String> entry : stateMap.entrySet()) {
-              numChars += entry.getKey().length();
-              numChars += entry.getValue().length();
-            }
-            numChars *= is.getNumPartitions();
-            return _minNumCharsInISToTurnOnCompression > 0 && numChars > 
_minNumCharsInISToTurnOnCompression;
-          }
-          return false;
-        }
-      });
-      if (controllerMetrics != null) {
-        controllerMetrics.addMeteredValue(resourceName, 
ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries);
-        controllerMetrics.addTimedValue(resourceName, 
ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS,
-            System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
-      }
-      return idealStateWrapper._idealState;
-    } catch (Exception e) {
-      if (controllerMetrics != null) {
-        controllerMetrics.addMeteredValue(resourceName, 
ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
-      }
-      throw new RuntimeException("Caught exception while updating ideal state 
for resource: " + resourceName, e);
-    }
+  public static IdealState updateIdealState(HelixManager helixManager, String 
resourceName,
+      Function<IdealState, IdealState> updater, RetryPolicy retryPolicy) {
+    return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, 
updater, retryPolicy, false);
   }
 
-  private static class IdealStateWrapper {
-    IdealState _idealState;
+  public static IdealState updateIdealState(HelixManager helixManager, String 
resourceName,
+      Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, 
boolean noChangeOk) {
+    return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, 
updater, retryPolicy, noChangeOk);
   }
 
   /**
@@ -235,16 +108,6 @@ public class HelixHelper {
     }
   }
 
-  public static IdealState updateIdealState(HelixManager helixManager, String 
resourceName,
-      Function<IdealState, IdealState> updater) {
-    return updateIdealState(helixManager, resourceName, updater, 
DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
-  }
-
-  public static IdealState updateIdealState(final HelixManager helixManager, 
final String resourceName,
-      final Function<IdealState, IdealState> updater, RetryPolicy policy) {
-    return updateIdealState(helixManager, resourceName, updater, policy, 
false);
-  }
-
   /**
    * Updates broker resource ideal state for the given broker with the given 
broker tags. Optional {@code tablesAdded}
    * and {@code tablesRemoved} can be provided to track the tables 
added/removed during the update.
@@ -554,7 +417,6 @@ public class HelixHelper {
     return 
instancesWithoutTag.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList());
   }
 
-
   public static List<InstanceConfig> 
getInstancesConfigsWithTag(List<InstanceConfig> instanceConfigs, String tag) {
     List<InstanceConfig> instancesWithTag = new ArrayList<>();
     for (InstanceConfig instanceConfig : instanceConfigs) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
new file mode 100644
index 0000000000..ea74fb18e2
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.helix;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ControllerTimer;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * IdealStateGroupCommit is a utility class to commit group updates to 
IdealState.
+ * It is designed to be used in a multi-threaded environment where multiple 
threads
+ * may try to update the same IdealState concurrently.
+ * The implementation is shamelessly borrowed from (<a href=
+ * 
"https://github.com/apache/helix/blob/helix-1.4.1/helix-core/src/main/java/org/apache/helix/GroupCommit.java";
+ * >HelixGroupCommit</a>).
+ * This is especially useful for updating large IdealState, which each update 
may
+ * take a long time, e.g. to update one IdealState with 100k segments may take
+ * ~4 seconds, then 15 updates will take 1 minute and cause other requests
+ * (e.g. Segment upload, realtime segment commit, segment deletion, etc) 
timeout.
+ */
+public class IdealStateGroupCommit {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IdealStateGroupCommit.class);
+
+  private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION = 
1000;
+  private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression";
+
+  private static int _minNumCharsInISToTurnOnCompression = -1;
+
+  private static class Queue {
+    final AtomicReference<Thread> _running = new AtomicReference<Thread>();
+    final ConcurrentLinkedQueue<Entry> _pending = new 
ConcurrentLinkedQueue<Entry>();
+  }
+
+  private static class Entry {
+    final String _resourceName;
+    final Function<IdealState, IdealState> _updater;
+    IdealState _updatedIdealState = null;
+    AtomicBoolean _sent = new AtomicBoolean(false);
+
+    Entry(String resourceName, Function<IdealState, IdealState> updater) {
+      _resourceName = resourceName;
+      _updater = updater;
+    }
+  }
+
+  private final Queue[] _queues = new Queue[100];
+
+  /**
+   * Set up a group committer and its associated queues
+   */
+  public IdealStateGroupCommit() {
+    // Don't use Arrays.fill();
+    for (int i = 0; i < _queues.length; i++) {
+      _queues[i] = new Queue();
+    }
+  }
+
+  private Queue getQueue(String resourceName) {
+    return _queues[(resourceName.hashCode() & Integer.MAX_VALUE) % 
_queues.length];
+  }
+
+  public static synchronized void setMinNumCharsInISToTurnOnCompression(int 
minNumChars) {
+    _minNumCharsInISToTurnOnCompression = minNumChars;
+  }
+
+  /**
+   * Do a group update for idealState associated with a given resource key
+   * @param helixManager helixManager with the ability to pull from the 
current data\
+   * @param resourceName the resource name to be updated
+   * @param updater the idealState updater to be applied
+   * @return IdealState if the update is successful, null if not
+   */
+  public IdealState commit(HelixManager helixManager, String resourceName,
+      Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, 
boolean noChangeOk) {
+    Queue queue = getQueue(resourceName);
+    Entry entry = new Entry(resourceName, updater);
+
+    queue._pending.add(entry);
+    while (!entry._sent.get()) {
+      if (queue._running.compareAndSet(null, Thread.currentThread())) {
+        ArrayList<Entry> processed = new ArrayList<>();
+        try {
+          if (queue._pending.peek() == null) {
+            // All pending entries have been processed, the updatedIdealState 
should be set.
+            return entry._updatedIdealState;
+          }
+          // remove from queue
+          Entry first = queue._pending.poll();
+          processed.add(first);
+          String mergedResourceName = first._resourceName;
+          HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
+          PropertyKey idealStateKey = 
dataAccessor.keyBuilder().idealStates(resourceName);
+          IdealState idealState = dataAccessor.getProperty(idealStateKey);
+
+          // Make a copy of the idealState above to pass it to the updater
+          // NOTE: new IdealState(idealState.getRecord()) does not work 
because it's shallow copy for map fields and
+          // list fields
+          IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
+
+          /**
+           * If the local cache does not contain a value, need to check if 
there is a
+           * value in ZK; use it as initial value if exists
+           */
+          IdealState updatedIdealState = first._updater.apply(idealStateCopy);
+          first._updatedIdealState = updatedIdealState;
+          Iterator<Entry> it = queue._pending.iterator();
+          while (it.hasNext()) {
+            Entry ent = it.next();
+            if (!ent._resourceName.equals(mergedResourceName)) {
+              continue;
+            }
+            processed.add(ent);
+            updatedIdealState = ent._updater.apply(idealStateCopy);
+            ent._updatedIdealState = updatedIdealState;
+            it.remove();
+          }
+          IdealState finalUpdatedIdealState = updatedIdealState;
+          updateIdealState(helixManager, resourceName, anyIdealState -> 
finalUpdatedIdealState,
+              retryPolicy, noChangeOk);
+        } finally {
+          queue._running.set(null);
+          for (Entry e : processed) {
+            synchronized (e) {
+              e._sent.set(true);
+              e.notify();
+            }
+          }
+        }
+      } else {
+        synchronized (entry) {
+          try {
+            entry.wait(10);
+          } catch (InterruptedException e) {
+            LOGGER.error("Interrupted while committing change, resourceName: " 
+ resourceName + ", updater: " + updater,
+                e);
+            // Restore interrupt status
+            Thread.currentThread().interrupt();
+            return null;
+          }
+        }
+      }
+    }
+    return entry._updatedIdealState;
+  }
+
+  private static class IdealStateWrapper {
+    IdealState _idealState;
+  }
+
+  /**
+   * Updates the ideal state, retrying if necessary in case of concurrent 
updates to the ideal state.
+   *
+   * @param helixManager The HelixManager used to interact with the Helix 
cluster
+   * @param resourceName The resource for which to update the ideal state
+   * @param updater A function that returns an updated ideal state given an 
input ideal state
+   * @return updated ideal state if successful, null if not
+   */
+  private static IdealState updateIdealState(HelixManager helixManager, String 
resourceName,
+      Function<IdealState, IdealState> updater, RetryPolicy policy, boolean 
noChangeOk) {
+    // NOTE: ControllerMetrics could be null because this method might be 
invoked by Broker.
+    ControllerMetrics controllerMetrics = ControllerMetrics.get();
+    try {
+      long startTimeMs = System.currentTimeMillis();
+      IdealStateWrapper idealStateWrapper = new IdealStateWrapper();
+      int retries = policy.attempt(new Callable<>() {
+        @Override
+        public Boolean call() {
+          HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
+          PropertyKey idealStateKey = 
dataAccessor.keyBuilder().idealStates(resourceName);
+          IdealState idealState = dataAccessor.getProperty(idealStateKey);
+
+          // Make a copy of the idealState above to pass it to the updater
+          // NOTE: new IdealState(idealState.getRecord()) does not work 
because it's shallow copy for map fields and
+          // list fields
+          IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
+          IdealState updatedIdealState;
+          try {
+            updatedIdealState = updater.apply(idealStateCopy);
+          } catch (HelixHelper.PermanentUpdaterException e) {
+            LOGGER.error("Caught permanent exception while updating ideal 
state for resource: {}", resourceName, e);
+            throw e;
+          } catch (Exception e) {
+            LOGGER.error("Caught exception while updating ideal state for 
resource: {}", resourceName, e);
+            return false;
+          }
+
+          // If there are changes to apply, apply them
+          if (updatedIdealState != null && 
!idealState.equals(updatedIdealState)) {
+            ZNRecord updatedZNRecord = updatedIdealState.getRecord();
+
+            // Update number of partitions
+            int numPartitions = updatedZNRecord.getMapFields().size();
+            updatedIdealState.setNumPartitions(numPartitions);
+
+            // If the ideal state is large enough, enable compression
+            boolean enableCompression = shouldCompress(updatedIdealState);
+            if (enableCompression) {
+              updatedZNRecord.setBooleanField(ENABLE_COMPRESSIONS_KEY, true);
+            } else {
+              
updatedZNRecord.getSimpleFields().remove(ENABLE_COMPRESSIONS_KEY);
+            }
+
+            // Check version and set ideal state
+            try {
+              if (dataAccessor.getBaseDataAccessor()
+                  .set(idealStateKey.getPath(), updatedZNRecord, 
idealState.getRecord().getVersion(),
+                      AccessOption.PERSISTENT)) {
+                idealStateWrapper._idealState = updatedIdealState;
+                return true;
+              } else {
+                LOGGER.warn("Failed to update ideal state for resource: {}", 
resourceName);
+                return false;
+              }
+            } catch (ZkBadVersionException e) {
+              LOGGER.warn("Version changed while updating ideal state for 
resource: {}", resourceName);
+              return false;
+            } catch (Exception e) {
+              LOGGER.warn("Caught exception while updating ideal state for 
resource: {} (compressed={})", resourceName,
+                  enableCompression, e);
+              return false;
+            }
+          } else {
+            if (noChangeOk) {
+              LOGGER.info("Idempotent or null ideal state update for resource 
{}, skipping update.", resourceName);
+            } else {
+              LOGGER.warn("Idempotent or null ideal state update for resource 
{}, skipping update.", resourceName);
+            }
+            idealStateWrapper._idealState = idealState;
+            return true;
+          }
+        }
+
+        private boolean shouldCompress(IdealState is) {
+          if (is.getNumPartitions() > 
NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION) {
+            return true;
+          }
+
+          // Find the number of characters in one partition in idealstate, and 
extrapolate
+          // to estimate the number of characters.
+          // We could serialize the znode to determine the exact size, but 
that would mean serializing every
+          // idealstate znode twice. We avoid some extra GC by estimating the 
size instead. Such estimations
+          // should be good for most installations that have similar segment 
and instance names.
+          Iterator<String> it = is.getPartitionSet().iterator();
+          if (it.hasNext()) {
+            String partitionName = it.next();
+            int numChars = partitionName.length();
+            Map<String, String> stateMap = 
is.getInstanceStateMap(partitionName);
+            for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+              numChars += entry.getKey().length();
+              numChars += entry.getValue().length();
+            }
+            numChars *= is.getNumPartitions();
+            return _minNumCharsInISToTurnOnCompression > 0 && numChars > 
_minNumCharsInISToTurnOnCompression;
+          }
+          return false;
+        }
+      });
+      if (controllerMetrics != null) {
+        controllerMetrics.addMeteredValue(resourceName, 
ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries);
+        controllerMetrics.addTimedValue(resourceName, 
ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS,
+            System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
+        controllerMetrics.addMeteredValue(resourceName, 
ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS, 1L);
+      }
+      return idealStateWrapper._idealState;
+    } catch (Exception e) {
+      if (controllerMetrics != null) {
+        controllerMetrics.addMeteredValue(resourceName, 
ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
+      }
+      throw new RuntimeException("Caught exception while updating ideal state 
for resource: " + resourceName, e);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index eae4276b84..5e4ff8751f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -76,6 +76,7 @@ import org.apache.pinot.common.utils.ServiceStartableUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.common.utils.helix.IdealStateGroupCommit;
 import org.apache.pinot.common.utils.helix.LeadControllerUtils;
 import org.apache.pinot.common.utils.log.DummyLogFileServer;
 import org.apache.pinot.common.utils.log.LocalLogFileServer;
@@ -213,7 +214,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
             CommonConstants.DEFAULT_PINOT_INSECURE_MODE)));
 
     setupHelixSystemProperties();
-    
HelixHelper.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression());
+    
IdealStateGroupCommit.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression());
     _listenerConfigs = ListenerConfigUtil.buildControllerConfigs(_config);
     _controllerMode = _config.getControllerMode();
     inferHostnameIfNeeded(_config);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index bc7d16bde6..dee89efd64 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -129,15 +129,18 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.DATABASE;
 import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
 
 
-@Api(tags = Constants.TABLE_TAG, authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY),
-    @Authorization(value = DATABASE)})
+@Api(tags = Constants.TABLE_TAG, authorizations = {
+    @Authorization(value = SWAGGER_AUTHORIZATION_KEY),
+    @Authorization(value = DATABASE)
+})
 @SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = {
     @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
         key = SWAGGER_AUTHORIZATION_KEY,
         description = "The format of the key is  ```\"Basic <token>\" or 
\"Bearer <token>\"```"),
     @ApiKeyAuthDefinition(name = DATABASE, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
         description = "Database context passed through http header. If no 
context is provided 'default' database "
-            + "context will be considered.")}))
+            + "context will be considered.")
+}))
 @Path("/")
 public class PinotTableRestletResource {
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index c7eeb341e1..2b59109b89 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -212,7 +212,6 @@ public class PinotHelixResourceManager {
 
   private final Map<String, Map<String, Long>> _segmentCrcMap = new 
HashMap<>();
   private final Map<String, Map<String, Integer>> 
_lastKnownSegmentMetadataVersionMap = new HashMap<>();
-  private final Object[] _idealStateUpdaterLocks;
   private final Object[] _lineageUpdaterLocks;
 
   private final LoadingCache<String, String> _instanceAdminEndpointCache;
@@ -256,10 +255,6 @@ public class PinotHelixResourceManager {
                 return InstanceUtils.getServerAdminEndpoint(instanceConfig);
               }
             });
-    _idealStateUpdaterLocks = new 
Object[DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE];
-    for (int i = 0; i < _idealStateUpdaterLocks.length; i++) {
-      _idealStateUpdaterLocks[i] = new Object();
-    }
     _lineageUpdaterLocks = new Object[DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE];
     for (int i = 0; i < _lineageUpdaterLocks.length; i++) {
       _lineageUpdaterLocks[i] = new Object();
@@ -1018,16 +1013,13 @@ public class PinotHelixResourceManager {
       LOGGER.info("Trying to delete segments: {} from table: {} ", 
segmentNames, tableNameWithType);
       
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
           "Table name: %s is not a valid table name with type suffix", 
tableNameWithType);
-
-      synchronized (getIdealStateUpdaterLock(tableNameWithType)) {
-        HelixHelper.removeSegmentsFromIdealState(_helixZkManager, 
tableNameWithType, segmentNames);
-        if (retentionPeriod != null) {
-          _segmentDeletionManager.deleteSegments(tableNameWithType, 
segmentNames,
-              TimeUtils.convertPeriodToMillis(retentionPeriod));
-        } else {
-          TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-          _segmentDeletionManager.deleteSegments(tableNameWithType, 
segmentNames, tableConfig);
-        }
+      HelixHelper.removeSegmentsFromIdealState(_helixZkManager, 
tableNameWithType, segmentNames);
+      if (retentionPeriod != null) {
+        _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames,
+            TimeUtils.convertPeriodToMillis(retentionPeriod));
+      } else {
+        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+        _segmentDeletionManager.deleteSegments(tableNameWithType, 
segmentNames, tableConfig);
       }
       return PinotResourceManagerResponse.success("Segment " + segmentNames + 
" deleted");
     } catch (final Exception e) {
@@ -1980,13 +1972,11 @@ public class PinotHelixResourceManager {
     IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
     String replicationConfigured = 
Integer.toString(tableConfig.getReplication());
     if (!idealState.getReplicas().equals(replicationConfigured)) {
-      synchronized (getIdealStateUpdaterLock(tableNameWithType)) {
-        HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is -> 
{
-          assert is != null;
-          is.setReplicas(replicationConfigured);
-          return is;
-        }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
-      }
+      HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is -> {
+        assert is != null;
+        is.setReplicas(replicationConfigured);
+        return is;
+      }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
     }
 
     // Assign instances
@@ -2328,26 +2318,24 @@ public class PinotHelixResourceManager {
 
       SegmentAssignment segmentAssignment =
           SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
tableConfig, _controllerMetrics);
-      synchronized (getIdealStateUpdaterLock(tableNameWithType)) {
-        Map<InstancePartitionsType, InstancePartitions> 
finalInstancePartitionsMap = instancePartitionsMap;
-        HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, 
idealState -> {
-          assert idealState != null;
-          Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
-          if (currentAssignment.containsKey(segmentName)) {
-            LOGGER.warn("Segment: {} already exists in the IdealState for 
table: {}, do not update", segmentName,
-                tableNameWithType);
-          } else {
-            List<String> assignedInstances =
-                segmentAssignment.assignSegment(segmentName, 
currentAssignment, finalInstancePartitionsMap);
-            LOGGER.info("Assigning segment: {} to instances: {} for table: 
{}", segmentName, assignedInstances,
-                tableNameWithType);
-            currentAssignment.put(segmentName,
-                SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
-          }
-          return idealState;
-        });
-        LOGGER.info("Added segment: {} to IdealState for table: {}", 
segmentName, tableNameWithType);
-      }
+      Map<InstancePartitionsType, InstancePartitions> 
finalInstancePartitionsMap = instancePartitionsMap;
+      HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, 
idealState -> {
+        assert idealState != null;
+        Map<String, Map<String, String>> currentAssignment = 
idealState.getRecord().getMapFields();
+        if (currentAssignment.containsKey(segmentName)) {
+          LOGGER.warn("Segment: {} already exists in the IdealState for table: 
{}, do not update", segmentName,
+              tableNameWithType);
+        } else {
+          List<String> assignedInstances =
+              segmentAssignment.assignSegment(segmentName, currentAssignment, 
finalInstancePartitionsMap);
+          LOGGER.info("Assigning segment: {} to instances: {} for table: {}", 
segmentName, assignedInstances,
+              tableNameWithType);
+          currentAssignment.put(segmentName,
+              SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, 
SegmentStateModel.ONLINE));
+        }
+        return idealState;
+      });
+      LOGGER.info("Added segment: {} to IdealState for table: {}", 
segmentName, tableNameWithType);
     } catch (Exception e) {
       LOGGER.error(
           "Caught exception while adding segment: {} to IdealState for table: 
{}, deleting segment ZK metadata",
@@ -2402,10 +2390,6 @@ public class PinotHelixResourceManager {
     return ((upsertConfig != null) && upsertConfig.getMode() != 
UpsertConfig.Mode.NONE);
   }
 
-  public Object getIdealStateUpdaterLock(String tableNameWithType) {
-    return _idealStateUpdaterLocks[(tableNameWithType.hashCode() & 
Integer.MAX_VALUE) % _idealStateUpdaterLocks.length];
-  }
-
   public Object getLineageUpdaterLock(String tableNameWithType) {
     return _lineageUpdaterLocks[(tableNameWithType.hashCode() & 
Integer.MAX_VALUE) % _lineageUpdaterLocks.length];
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 910291ff00..76bef151a0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -585,11 +585,9 @@ public class PinotLLCRealtimeSegmentManager {
     // the idealstate update fails due to contention. We serialize the updates 
to the idealstate
     // to reduce this contention. We may still contend with RetentionManager, 
or other updates
     // to idealstate from other controllers, but then we have the retry 
mechanism to get around that.
-    synchronized 
(_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) {
-      idealState =
-          updateIdealStateOnSegmentCompletion(realtimeTableName, 
committingSegmentName, newConsumingSegmentName,
-              segmentAssignment, instancePartitionsMap);
-    }
+    idealState =
+        updateIdealStateOnSegmentCompletion(realtimeTableName, 
committingSegmentName, newConsumingSegmentName,
+            segmentAssignment, instancePartitionsMap);
 
     long endTimeNs = System.nanoTime();
     LOGGER.info(
@@ -816,24 +814,22 @@ public class PinotLLCRealtimeSegmentManager {
     String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName());
     String segmentName = llcSegmentName.getSegmentName();
     LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", 
segmentName, instanceName);
-    synchronized 
(_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) {
-      try {
-        HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
-          assert idealState != null;
-          Map<String, String> stateMap = 
idealState.getInstanceStateMap(segmentName);
-          String state = stateMap.get(instanceName);
-          if (SegmentStateModel.CONSUMING.equals(state)) {
-            stateMap.put(instanceName, SegmentStateModel.OFFLINE);
-          } else {
-            LOGGER.info("Segment {} in state {} when trying to register 
consumption stop from {}", segmentName, state,
-                instanceName);
-          }
-          return idealState;
-        }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
-      } catch (Exception e) {
-        _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
-        throw e;
-      }
+    try {
+      HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
+        assert idealState != null;
+        Map<String, String> stateMap = 
idealState.getInstanceStateMap(segmentName);
+        String state = stateMap.get(instanceName);
+        if (SegmentStateModel.CONSUMING.equals(state)) {
+          stateMap.put(instanceName, SegmentStateModel.OFFLINE);
+        } else {
+          LOGGER.info("Segment {} in state {} when trying to register 
consumption stop from {}", segmentName, state,
+              instanceName);
+        }
+        return idealState;
+      }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
+    } catch (Exception e) {
+      _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
+      throw e;
     }
     // We know that we have successfully set the idealstate to be OFFLINE.
     // We can now do a best effort to reset the externalview to be OFFLINE if 
it is in ERROR state.
@@ -925,33 +921,31 @@ public class PinotLLCRealtimeSegmentManager {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
-    synchronized 
(_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) {
-      HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
-        assert idealState != null;
-        boolean isTableEnabled = idealState.isEnabled();
-        boolean isTablePaused = isTablePaused(idealState);
-        boolean offsetsHaveToChange = offsetCriteria != null;
-        if (isTableEnabled && !isTablePaused) {
-          List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
-              offsetsHaveToChange
-                  ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
-                  : getPartitionGroupConsumptionStatusList(idealState, 
streamConfig);
-          OffsetCriteria originalOffsetCriteria = 
streamConfig.getOffsetCriteria();
-          // Read the smallest offset when a new partition is detected
-          streamConfig.setOffsetCriteria(
-              offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
-          List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-              getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
-          streamConfig.setOffsetCriteria(originalOffsetCriteria);
-          return ensureAllPartitionsConsuming(tableConfig, streamConfig, 
idealState, newPartitionGroupMetadataList,
-              recreateDeletedConsumingSegment, offsetCriteria);
-        } else {
-          LOGGER.info("Skipping LLC segments validation for table: {}, 
isTableEnabled: {}, isTablePaused: {}",
-              realtimeTableName, isTableEnabled, isTablePaused);
-          return idealState;
-        }
-      }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
-    }
+    HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState 
-> {
+      assert idealState != null;
+      boolean isTableEnabled = idealState.isEnabled();
+      boolean isTablePaused = isTablePaused(idealState);
+      boolean offsetsHaveToChange = offsetCriteria != null;
+      if (isTableEnabled && !isTablePaused) {
+        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
+            offsetsHaveToChange
+                ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
+                : getPartitionGroupConsumptionStatusList(idealState, 
streamConfig);
+        OffsetCriteria originalOffsetCriteria = 
streamConfig.getOffsetCriteria();
+        // Read the smallest offset when a new partition is detected
+        streamConfig.setOffsetCriteria(
+            offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+            getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
+        streamConfig.setOffsetCriteria(originalOffsetCriteria);
+        return ensureAllPartitionsConsuming(tableConfig, streamConfig, 
idealState, newPartitionGroupMetadataList,
+            recreateDeletedConsumingSegment, offsetCriteria);
+      } else {
+        LOGGER.info("Skipping LLC segments validation for table: {}, 
isTableEnabled: {}, isTablePaused: {}",
+            realtimeTableName, isTableEnabled, isTablePaused);
+        return idealState;
+      }
+    }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
   }
 
   /**
@@ -961,7 +955,7 @@ public class PinotLLCRealtimeSegmentManager {
   IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, 
String committingSegmentName,
       String newSegmentName, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
-    return HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
+   return HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
       assert idealState != null;
       // When segment completion begins, the zk metadata is updated, followed 
by ideal state.
       // We allow only {@link 
PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms for a 
segment to
@@ -1760,16 +1754,13 @@ public class PinotLLCRealtimeSegmentManager {
       PauseState.ReasonCode reasonCode, @Nullable String comment) {
     PauseState pauseState = new PauseState(pause, reasonCode, comment,
         new Timestamp(System.currentTimeMillis()).toString());
-    IdealState updatedIdealState;
-    synchronized 
(_helixResourceManager.getIdealStateUpdaterLock(tableNameWithType)) {
-      updatedIdealState = HelixHelper.updateIdealState(_helixManager, 
tableNameWithType, idealState -> {
-        ZNRecord znRecord = idealState.getRecord();
-        znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
-        // maintain for backward compatibility
-        znRecord.setSimpleField(IS_TABLE_PAUSED, 
Boolean.valueOf(pause).toString());
-        return new IdealState(znRecord);
-      }, RetryPolicies.noDelayRetryPolicy(3));
-    }
+    IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager, 
tableNameWithType, idealState -> {
+      ZNRecord znRecord = idealState.getRecord();
+      znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
+      // maintain for backward compatibility
+      znRecord.setSimpleField(IS_TABLE_PAUSED, 
Boolean.valueOf(pause).toString());
+      return new IdealState(znRecord);
+    }, RetryPolicies.noDelayRetryPolicy(3));
     LOGGER.info("Set 'pauseState' to {} in the Ideal State for table {}. "
         + "Also set 'isTablePaused' to {} for backward compatibility.", 
pauseState, tableNameWithType, pause);
     return updatedIdealState;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
new file mode 100644
index 0000000000..ffe39764a4
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.common.utils.helix.IdealStateGroupCommit;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class IdealStateGroupCommitTest {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IdealStateGroupCommit.class);
+  private static final ControllerTest TEST_INSTANCE = 
ControllerTest.getInstance();
+  private static final String TABLE_NAME = "potato_OFFLINE";
+  private static final int NUM_UPDATES = 2400;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TEST_INSTANCE.setupSharedStateAndValidate();
+
+    IdealState idealState = new IdealState(TABLE_NAME);
+    idealState.setStateModelDefRef("OnlineOffline");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    idealState.setReplicas("1");
+    idealState.setNumPartitions(0);
+    TEST_INSTANCE.getHelixAdmin()
+        .addResource(TEST_INSTANCE.getHelixClusterName(), TABLE_NAME, 
idealState);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    TEST_INSTANCE.cleanup();
+  }
+
+  @Test
+  public void testGroupCommit()
+      throws InterruptedException {
+    final IdealStateGroupCommit commit = new IdealStateGroupCommit();
+    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(400);
+    for (int i = 0; i < NUM_UPDATES; i++) {
+      Runnable runnable = new 
IdealStateUpdater(TEST_INSTANCE.getHelixManager(), commit, TABLE_NAME, i);
+      newFixedThreadPool.submit(runnable);
+    }
+    IdealState idealState = 
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
+    while (idealState.getNumPartitions() < NUM_UPDATES) {
+      Thread.sleep(500);
+      idealState = 
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
+    }
+    Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES);
+    ControllerMetrics controllerMetrics = ControllerMetrics.get();
+    long idealStateUpdateSuccessCount =
+        controllerMetrics.getMeteredTableValue(TABLE_NAME, 
ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count();
+    Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES);
+    LOGGER.info("{} IdealState update are successfully commited with {} times 
zk updates.", NUM_UPDATES,
+        idealStateUpdateSuccessCount);
+  }
+}
+
+class IdealStateUpdater implements Runnable {
+  private final HelixManager _helixManager;
+  private final IdealStateGroupCommit _commit;
+  private final String _tableName;
+  private final int _i;
+
+  public IdealStateUpdater(HelixManager helixManager, IdealStateGroupCommit 
commit, String tableName, int i) {
+    _helixManager = helixManager;
+    _commit = commit;
+    _tableName = tableName;
+    _i = i;
+  }
+
+  @Override
+  public void run() {
+    _commit.commit(_helixManager, _tableName, new Function<IdealState, 
IdealState>() {
+      @Override
+      public IdealState apply(IdealState idealState) {
+        idealState.setPartitionState("test_id" + _i, "test_id" + _i, "ONLINE");
+        return idealState;
+      }
+    }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f), false);
+    HelixHelper.getTableIdealState(_helixManager, _tableName);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 16d1983a21..b1f8520aad 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -90,7 +90,6 @@ import org.testng.annotations.Test;
 import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION;
 import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.*;
@@ -202,7 +201,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
   public void testCommitSegment() {
     // Set up a new table with 2 replicas, 5 instances, 4 partition
     PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
-    
when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new
 Object());
     FakePinotLLCRealtimeSegmentManager segmentManager =
         new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
     setUpNewTable(segmentManager, 2, 5, 4);
@@ -325,7 +323,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
   public void testSetUpNewPartitions() {
     // Set up a new table with 2 replicas, 5 instances, 0 partition
     PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
-    
when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new
 Object());
     FakePinotLLCRealtimeSegmentManager segmentManager =
         new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
     setUpNewTable(segmentManager, 2, 5, 0);
@@ -499,7 +496,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
   public void testRepairs() {
     // Set up a new table with 2 replicas, 5 instances, 4 partitions
     PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
-    
when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new
 Object());
     FakePinotLLCRealtimeSegmentManager segmentManager =
         new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
     setUpNewTable(segmentManager, 2, 5, 4);
@@ -894,7 +890,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
   public void testCommitSegmentMetadata() {
     // Set up a new table with 2 replicas, 5 instances, 4 partition
     PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
-    
when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new
 Object());
     FakePinotLLCRealtimeSegmentManager segmentManager =
         new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
     setUpNewTable(segmentManager, 2, 5, 4);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to