Jackie-Jiang commented on code in PR #13976: URL: https://github.com/apache/pinot/pull/13976#discussion_r1756107098
########## pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java: ########## @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.helix; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.helix.AccessOption; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.ControllerTimer; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class IdealStateGroupCommit { Review Comment: Consider putting some comments so that we can trace back to Helix code? ########## pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java: ########## @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.helix; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.helix.AccessOption; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.ControllerTimer; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class IdealStateGroupCommit { + private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommit.class); + + private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION = 1000; + private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression"; + + private static int _minNumCharsInISToTurnOnCompression = -1; + + private static class Queue { + final AtomicReference<Thread> _running = new AtomicReference<Thread>(); + final ConcurrentLinkedQueue<Entry> _pending = new ConcurrentLinkedQueue<Entry>(); + } + + private static class Entry { + final String _resourceName; + final Function<IdealState, IdealState> _updater; + AtomicBoolean _sent = new AtomicBoolean(false); + + Entry(String resourceName, Function<IdealState, IdealState> updater) { + _resourceName = resourceName; + _updater = updater; + } + } + + private final Queue[] _queues = new Queue[100]; + + /** + * Set up a group committer and its associated queues + */ + public IdealStateGroupCommit() { + // Don't use Arrays.fill(); + for (int i = 0; i < _queues.length; i++) { + _queues[i] = new Queue(); + } + } + + private Queue getQueue(String resourceName) { + return _queues[(resourceName.hashCode() & Integer.MAX_VALUE) % _queues.length]; + } + + public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNumChars) { + _minNumCharsInISToTurnOnCompression = minNumChars; + } + + /** + * Do a group update for idealState associated with a given resource key + * @param helixManager helixManager with the ability to pull from the current data\ + * @param resourceName the resource name to be updated + * @param updater the idealState updater to be applied + * @return true if successful, false otherwise + */ + public boolean commit(HelixManager helixManager, String resourceName, Review Comment: We should be able to achieve that by modifying the `_sent` in `Entry` to store the updated IS ########## pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java: ########## @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.helix; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.helix.AccessOption; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.ControllerTimer; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class IdealStateGroupCommit { + private static final Logger LOGGER = LoggerFactory.getLogger(IdealStateGroupCommit.class); + + private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION = 1000; + private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression"; + + private static int _minNumCharsInISToTurnOnCompression = -1; + + private static class Queue { + final AtomicReference<Thread> _running = new AtomicReference<Thread>(); + final ConcurrentLinkedQueue<Entry> _pending = new ConcurrentLinkedQueue<Entry>(); + } + + private static class Entry { + final String _resourceName; + final Function<IdealState, IdealState> _updater; + AtomicBoolean _sent = new AtomicBoolean(false); + + Entry(String resourceName, Function<IdealState, IdealState> updater) { + _resourceName = resourceName; + _updater = updater; + } + } + + private final Queue[] _queues = new Queue[100]; + + /** + * Set up a group committer and its associated queues + */ + public IdealStateGroupCommit() { + // Don't use Arrays.fill(); + for (int i = 0; i < _queues.length; i++) { + _queues[i] = new Queue(); + } + } + + private Queue getQueue(String resourceName) { + return _queues[(resourceName.hashCode() & Integer.MAX_VALUE) % _queues.length]; + } + + public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNumChars) { + _minNumCharsInISToTurnOnCompression = minNumChars; + } + + /** + * Do a group update for idealState associated with a given resource key + * @param helixManager helixManager with the ability to pull from the current data\ + * @param resourceName the resource name to be updated + * @param updater the idealState updater to be applied + * @return true if successful, false otherwise + */ + public boolean commit(HelixManager helixManager, String resourceName, Review Comment: Shall we keep the existing behavior by returning the updated IS, or throw exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org