KKcorps commented on code in PR #13976: URL: https://github.com/apache/pinot/pull/13976#discussion_r1753527635
########## pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java: ########## @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.helix; + +import it.unimi.dsi.fastutil.objects.ObjectBooleanImmutablePair; +import it.unimi.dsi.fastutil.objects.ObjectBooleanPair; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.helix.AccessOption; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.ControllerTimer; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class IdealStateGroupCommit { + private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommit.class); + + private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION = 1000; + private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression"; + + private static int _minNumCharsInISToTurnOnCompression = -1; + + private static class Queue { + final AtomicReference<Thread> _running = new AtomicReference<Thread>(); + final ConcurrentLinkedQueue<Entry> _pending = new ConcurrentLinkedQueue<Entry>(); + } + + private static class Entry { + final String _resourceName; + final Function<IdealState, IdealState> _updater; + AtomicBoolean _sent = new AtomicBoolean(false); + + Entry(String resourceName, Function<IdealState, IdealState> updater) { + _resourceName = resourceName; + _updater = updater; + } + } + + private final Queue[] _queues = new Queue[100]; + + /** + * Set up a group committer and its associated queues + */ + public IdealStateGroupCommit() { + // Don't use Arrays.fill(); + for (int i = 0; i < _queues.length; i++) { + _queues[i] = new Queue(); + } + } + + private Queue getQueue(String resourceName) { + return _queues[(resourceName.hashCode() & Integer.MAX_VALUE) % _queues.length]; + } + + public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNumChars) { + _minNumCharsInISToTurnOnCompression = minNumChars; + } + + /** + * Do a group update for idealState associated with a given resource key + * @param helixManager helixManager with the ability to pull from the current data\ + * @param resourceName the resource name to be updated + * @param updater the idealState updater to be applied + * @return true if successful, false otherwise + */ + public ObjectBooleanPair<IdealState> commit(HelixManager helixManager, String resourceName, + Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean noChangeOk) { + Queue queue = getQueue(resourceName); + Entry entry = new Entry(resourceName, updater); + + boolean success = true; + queue._pending.add(entry); + IdealState finalIdealState = null; + while (!entry._sent.get()) { + if (queue._running.compareAndSet(null, Thread.currentThread())) { + ArrayList<Entry> processed = new ArrayList<>(); + try { + if (queue._pending.peek() == null) { + return new ObjectBooleanImmutablePair<>(finalIdealState, true); + } + // remove from queue + Entry first = queue._pending.poll(); + processed.add(first); + String mergedResourceName = first._resourceName; + HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); + PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName); + IdealState idealState = dataAccessor.getProperty(idealStateKey); + + // Make a copy of the idealState above to pass it to the updater + // NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and + // list fields + IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState); + + /** + * If the local cache does not contain a value, need to check if there is a + * value in ZK; use it as initial value if exists + */ + IdealState updatedIdealState = first._updater.apply(idealStateCopy); + Iterator<Entry> it = queue._pending.iterator(); + while (it.hasNext()) { + Entry ent = it.next(); + if (!ent._resourceName.equals(mergedResourceName)) { + continue; + } + processed.add(ent); + updatedIdealState = ent._updater.apply(idealStateCopy); + // System.out.println("After merging:" + merged); + it.remove(); Review Comment: Is it safe to modify the queue here while we are iterating on it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org