This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch full-auto-oss-abstraction in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 3dba068168ad0ae9184d9586a15f7ea4a29721f2 Author: jlli_LinkedIn <j...@linkedin.com> AuthorDate: Fri Mar 22 16:00:43 2024 -0700 Extract methods for Pinot table ideal state --- .../MultiStageReplicaGroupSelector.java | 13 +- .../FullAutoInstancePartitionsUtils.java | 52 +++++++ .../common/assignment/InstancePartitionsUtils.java | 34 +++-- .../assignment/InstancePartitionsUtilsHelper.java | 58 ++++++++ .../InstancePartitionsUtilsHelperFactory.java | 46 ++++++ .../pinot/common/utils/config/TierConfigUtils.java | 7 +- .../pinot/controller/BaseControllerStarter.java | 10 ++ .../apache/pinot/controller/ControllerConf.java | 11 ++ .../PinotInstanceAssignmentRestletResource.java | 59 ++++---- .../api/resources/PinotTenantRestletResource.java | 12 +- .../helix/core/PinotHelixResourceManager.java | 98 +++++------- .../helix/core/PinotTableIdealStateHelper.java | 145 ------------------ .../instance/InstanceAssignmentDriver.java | 8 +- .../DefaultPinotTableIdealStateHelper.java | 151 +++++++++++++++++++ .../FullAutoPinotTableIdealStateHelper.java | 165 +++++++++++++++++++++ .../PinotTableIdealStateHelper.java | 87 +++++++++++ .../PinotTableIdealStateHelperFactory.java | 50 +++++++ .../realtime/MissingConsumingSegmentFinder.java | 4 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 97 +++--------- .../helix/core/rebalance/TableRebalancer.java | 80 +++++----- .../helix/core/util/HelixSetupUtils.java | 15 ++ ...anceAssignmentRestletResourceStatelessTest.java | 4 +- ...NonReplicaGroupTieredSegmentAssignmentTest.java | 8 +- ...NonReplicaGroupTieredSegmentAssignmentTest.java | 8 +- .../PinotLLCRealtimeSegmentManagerTest.java | 6 +- .../TableRebalancerClusterStatelessTest.java | 4 +- .../helix/core/retention/RetentionManagerTest.java | 10 +- .../tools/admin/command/MoveReplicaGroup.java | 20 +-- 28 files changed, 845 insertions(+), 417 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java index 15fb525a8c..3f794c4017 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java @@ -34,7 +34,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; @@ -145,12 +145,13 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector { Preconditions.checkNotNull(tableType); InstancePartitions instancePartitions; if (tableType.equals(TableType.OFFLINE)) { - instancePartitions = InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, - InstancePartitionsUtils.getInstancePartitionsName(_tableNameWithType, tableType.name())); + instancePartitions = InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_propertyStore, + InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsName(_tableNameWithType, tableType.name())); } else { - instancePartitions = InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, - InstancePartitionsUtils.getInstancePartitionsName(_tableNameWithType, - InstancePartitionsType.CONSUMING.name())); + instancePartitions = InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_propertyStore, + InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsName(_tableNameWithType, InstancePartitionsType.CONSUMING.name())); } Preconditions.checkNotNull(instancePartitions); return instancePartitions; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/FullAutoInstancePartitionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/FullAutoInstancePartitionsUtils.java new file mode 100644 index 0000000000..2821cc1c99 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/FullAutoInstancePartitionsUtils.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.assignment; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.store.HelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; + + +public class FullAutoInstancePartitionsUtils extends InstancePartitionsUtils { + + @Override + public InstancePartitions computeDefaultInstancePartitions(HelixManager helixManager, TableConfig tableConfig, + InstancePartitionsType instancePartitionsType) { + // Use the corresponding method to compute default instance partitions or no-op. + return super.computeDefaultInstancePartitions(helixManager, tableConfig, instancePartitionsType); + } + + @Override + public InstancePartitions computeDefaultInstancePartitionsForTag(HelixManager helixManager, String tableNameWithType, + String instancePartitionsType, String serverTag) { + // Use the corresponding method to compute default instance partitions for tags or no-op. + return super.computeDefaultInstancePartitionsForTag(helixManager, tableNameWithType, instancePartitionsType, + serverTag); + } + + @Override + public void persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, ConfigAccessor configAccessor, + String helixClusterName, InstancePartitions instancePartitions) { + // Use the corresponding method to persist instance partitions. + super.persistInstancePartitions(propertyStore, configAccessor, helixClusterName, instancePartitions); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java index f8bbd08934..3e3ae7c667 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java @@ -43,9 +43,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; /** * Utility class for instance partitions. */ -public class InstancePartitionsUtils { - private InstancePartitionsUtils() { - } +public class InstancePartitionsUtils implements InstancePartitionsUtilsHelper { public static final char TYPE_SUFFIX_SEPARATOR = '_'; public static final String TIER_SUFFIX = "__TIER__"; @@ -54,14 +52,16 @@ public class InstancePartitionsUtils { * Returns the name of the instance partitions for the given table name (with or without type suffix) and instance * partitions type. */ - public static String getInstancePartitionsName(String tableName, String instancePartitionsType) { + @Override + public String getInstancePartitionsName(String tableName, String instancePartitionsType) { return TableNameBuilder.extractRawTableName(tableName) + TYPE_SUFFIX_SEPARATOR + instancePartitionsType; } /** * Fetches the instance partitions from Helix property store if it exists, or computes it for backward-compatibility. */ - public static InstancePartitions fetchOrComputeInstancePartitions(HelixManager helixManager, TableConfig tableConfig, + @Override + public InstancePartitions fetchOrComputeInstancePartitions(HelixManager helixManager, TableConfig tableConfig, InstancePartitionsType instancePartitionsType) { String tableNameWithType = tableConfig.getTableName(); String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); @@ -88,15 +88,17 @@ public class InstancePartitionsUtils { /** * Fetches the instance partitions from Helix property store. */ + @Override @Nullable - public static InstancePartitions fetchInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + public InstancePartitions fetchInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, String instancePartitionsName) { String path = ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitionsName); ZNRecord znRecord = propertyStore.get(path, null, AccessOption.PERSISTENT); return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null; } - public static String getInstancePartitionsNameForTier(String tableName, String tierName) { + @Override + public String getInstancePartitionsNameForTier(String tableName, String tierName) { return TableNameBuilder.extractRawTableName(tableName) + TIER_SUFFIX + tierName; } @@ -106,7 +108,8 @@ public class InstancePartitionsUtils { * This method is useful when we use a table with instancePartitionsMap since in that case * the value of a table's instance partitions are copied over from an existing instancePartitions. */ - public static InstancePartitions fetchInstancePartitionsWithRename(HelixPropertyStore<ZNRecord> propertyStore, + @Override + public InstancePartitions fetchInstancePartitionsWithRename(HelixPropertyStore<ZNRecord> propertyStore, String instancePartitionsName, String newName) { InstancePartitions instancePartitions = fetchInstancePartitions(propertyStore, instancePartitionsName); Preconditions.checkNotNull(instancePartitions, @@ -121,7 +124,8 @@ public class InstancePartitionsUtils { * <p>Choose both enabled and disabled instances with the server tag as the qualified instances to avoid unexpected * data shuffling when instances get disabled. */ - public static InstancePartitions computeDefaultInstancePartitions(HelixManager helixManager, TableConfig tableConfig, + @Override + public InstancePartitions computeDefaultInstancePartitions(HelixManager helixManager, TableConfig tableConfig, InstancePartitionsType instancePartitionsType) { TenantConfig tenantConfig = tableConfig.getTenantConfig(); String serverTag; @@ -148,7 +152,8 @@ public class InstancePartitionsUtils { * <p>Choose both enabled and disabled instances with the server tag as the qualified instances to avoid unexpected * data shuffling when instances get disabled. */ - public static InstancePartitions computeDefaultInstancePartitionsForTag(HelixManager helixManager, + @Override + public InstancePartitions computeDefaultInstancePartitionsForTag(HelixManager helixManager, String tableNameWithType, String instancePartitionsType, String serverTag) { List<String> instances = HelixHelper.getInstancesWithTag(helixManager, serverTag); int numInstances = instances.size(); @@ -166,7 +171,8 @@ public class InstancePartitionsUtils { /** * Persists the instance partitions to Helix property store. */ - public static void persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + @Override + public void persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, ConfigAccessor configAccessor, String helixClusterName, InstancePartitions instancePartitions) { String path = ZKMetadataProvider .constructPropertyStorePathForInstancePartitions(instancePartitions.getInstancePartitionsName()); @@ -185,7 +191,8 @@ public class InstancePartitionsUtils { /** * Removes the instance partitions from Helix property store. */ - public static void removeInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + @Override + public void removeInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, String instancePartitionsName) { String path = ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitionsName); if (!propertyStore.remove(path, AccessOption.PERSISTENT)) { @@ -193,7 +200,8 @@ public class InstancePartitionsUtils { } } - public static void removeTierInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + @Override + public void removeTierInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType) { List<InstancePartitions> instancePartitions = ZKMetadataProvider.getAllInstancePartitions(propertyStore); instancePartitions.stream().filter(instancePartition -> instancePartition.getInstancePartitionsName() diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelper.java new file mode 100644 index 0000000000..66a4c5e157 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelper.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.assignment; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.store.HelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; + + +public interface InstancePartitionsUtilsHelper { + + String getInstancePartitionsName(String tableName, String instancePartitionsType); + + InstancePartitions fetchOrComputeInstancePartitions(HelixManager helixManager, TableConfig tableConfig, + InstancePartitionsType instancePartitionsType); + + InstancePartitions fetchInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + String instancePartitionsName); + + InstancePartitions computeDefaultInstancePartitions(HelixManager helixManager, TableConfig tableConfig, + InstancePartitionsType instancePartitionsType); + + InstancePartitions computeDefaultInstancePartitionsForTag(HelixManager helixManager, + String tableNameWithType, String instancePartitionsType, String serverTag); + + String getInstancePartitionsNameForTier(String tableName, String tierName); + + void persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + ConfigAccessor configAccessor, String helixClusterName, InstancePartitions instancePartitions); + + InstancePartitions fetchInstancePartitionsWithRename(HelixPropertyStore<ZNRecord> propertyStore, + String instancePartitionsName, String newName); + + void removeInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + String instancePartitionsName); + + void removeTierInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + String tableNameWithType); +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelperFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelperFactory.java new file mode 100644 index 0000000000..be3587a374 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtilsHelperFactory.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.assignment; + +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class InstancePartitionsUtilsHelperFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(InstancePartitionsUtilsHelperFactory.class); + + private static InstancePartitionsUtilsHelper _instance = null; + + private static PinotConfiguration _pinotConfiguration; + + private InstancePartitionsUtilsHelperFactory() { + } + + public static void init(PinotConfiguration pinotConfiguration) { + _pinotConfiguration = pinotConfiguration; + } + + public static InstancePartitionsUtilsHelper create() { + if (_instance == null) { + _instance = new InstancePartitionsUtils(); + } + return _instance; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java index 31e32113e7..94298917be 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java @@ -28,7 +28,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.helix.HelixManager; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.tier.FixedTierSegmentSelector; import org.apache.pinot.common.tier.PinotServerTierStorage; import org.apache.pinot.common.tier.Tier; @@ -79,8 +79,9 @@ public final class TierConfigUtils { if (tier.getSegmentSelector().selectSegment(tableNameWithType, segmentName)) { // Compute default instance partitions PinotServerTierStorage storage = (PinotServerTierStorage) tier.getStorage(); - return InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(helixManager, tableNameWithType, - tier.getName(), storage.getServerTag()); + return InstancePartitionsUtilsHelperFactory.create() + .computeDefaultInstancePartitionsForTag(helixManager, tableNameWithType, tier.getName(), + storage.getServerTag()); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 281c397401..8e2ca24a22 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -57,6 +57,7 @@ import org.apache.http.config.SocketConfig; import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.pinot.common.Utils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.function.FunctionRegistry; import org.apache.pinot.common.metadata.ZKMetadataProvider; @@ -88,6 +89,7 @@ import org.apache.pinot.controller.helix.RealtimeConsumerMonitor; import org.apache.pinot.controller.helix.SegmentStatusChecker; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter; @@ -200,6 +202,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { ServiceStartableUtils.applyClusterConfig(_config, _helixZkURL, _helixClusterName, ServiceRole.CONTROLLER); setupHelixSystemProperties(); + // Initialize the ideal state helper for Pinot tables. + PinotTableIdealStateHelperFactory.init(_config); HelixHelper.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression()); _listenerConfigs = ListenerConfigUtil.buildControllerConfigs(_config); _controllerMode = _config.getControllerMode(); @@ -408,6 +412,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { initControllerFilePathProvider(); initSegmentFetcherFactory(); initPinotCrypterFactory(); + initInstancePartitionUtilsHelperFactory(); LOGGER.info("Initializing QueryRewriterFactory"); QueryRewriterFactory.init( @@ -732,6 +737,11 @@ public abstract class BaseControllerStarter implements ServiceStartable { } } + private void initInstancePartitionUtilsHelperFactory() { + LOGGER.info("Initializing InstancePartitionUtilsHelperFactory"); + InstancePartitionsUtilsHelperFactory.init(_config); + } + /** * Registers, connects to Helix cluster as PARTICIPANT role, and adds listeners. */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 4598b48eeb..3cfc4705fd 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -275,6 +275,7 @@ public class ControllerConf extends PinotConfiguration { public static final String ACCESS_CONTROL_USERNAME = "access.control.init.username"; public static final String ACCESS_CONTROL_PASSWORD = "access.control.init.password"; public static final String LINEAGE_MANAGER_CLASS = "controller.lineage.manager.class"; + public static final String PINOT_TABLE_IDEALSTATE_HELPER_CLASS = "controller.pinot.table.idealstate.class"; // Amount of the time the segment can take from the beginning of upload to the end of upload. Used when parallel push // protection is enabled. If the upload does not finish within the timeout, next upload can override the previous one. private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = "controller.segment.upload.timeoutInMillis"; @@ -298,6 +299,8 @@ public class ControllerConf extends PinotConfiguration { private static final String DEFAULT_ACCESS_CONTROL_PASSWORD = "admin"; private static final String DEFAULT_LINEAGE_MANAGER = "org.apache.pinot.controller.helix.core.lineage.DefaultLineageManager"; + private static final String DEFAULT_PINOT_TABLE_IDEALSTATE_HELPER_CLASS = + "org.apache.pinot.controller.helix.core.idealstatehelper.DefaultPinotTableIdealStateHelper"; private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 600_000L; // 10 minutes private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION = -1; private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64; @@ -872,6 +875,14 @@ public class ControllerConf extends PinotConfiguration { setProperty(LINEAGE_MANAGER_CLASS, lineageModifierClass); } + public String getPinotTableIdealstateHelperClass() { + return getProperty(PINOT_TABLE_IDEALSTATE_HELPER_CLASS, DEFAULT_PINOT_TABLE_IDEALSTATE_HELPER_CLASS); + } + + public void setPinotTableIdealstateHelperClass(String pinotTableIdealstateHelperClass) { + setProperty(PINOT_TABLE_IDEALSTATE_HELPER_CLASS, pinotTableIdealstateHelperClass); + } + public long getSegmentUploadTimeoutInMillis() { return getProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS, DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java index 6fd157d620..73306c1f35 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java @@ -50,7 +50,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.api.access.Authenticate; @@ -97,7 +97,7 @@ public class PinotInstanceAssignmentRestletResource { if (tableType != TableType.REALTIME) { if (InstancePartitionsType.OFFLINE.toString().equals(type) || type == null) { InstancePartitions offlineInstancePartitions = - InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(), + InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_resourceManager.getPropertyStore(), InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName)); if (offlineInstancePartitions != null) { instancePartitionsMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstancePartitions); @@ -107,7 +107,7 @@ public class PinotInstanceAssignmentRestletResource { if (tableType != TableType.OFFLINE) { if (InstancePartitionsType.CONSUMING.toString().equals(type) || type == null) { InstancePartitions consumingInstancePartitions = - InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(), + InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_resourceManager.getPropertyStore(), InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName)); if (consumingInstancePartitions != null) { instancePartitionsMap.put(InstancePartitionsType.CONSUMING.toString(), consumingInstancePartitions); @@ -115,7 +115,7 @@ public class PinotInstanceAssignmentRestletResource { } if (InstancePartitionsType.COMPLETED.toString().equals(type) || type == null) { InstancePartitions completedInstancePartitions = - InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(), + InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_resourceManager.getPropertyStore(), InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName)); if (completedInstancePartitions != null) { instancePartitionsMap.put(InstancePartitionsType.COMPLETED.toString(), completedInstancePartitions); @@ -130,10 +130,10 @@ public class PinotInstanceAssignmentRestletResource { if (tableConfig != null && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) { for (TierConfig tierConfig : tableConfig.getTierConfigsList()) { if (type == null || type.equals(tierConfig.getName())) { - InstancePartitions instancePartitions = - InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(), - InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), - tierConfig.getName())); + InstancePartitions instancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitions(_resourceManager.getPropertyStore(), + InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsNameForTier(tableConfig.getTableName(), tierConfig.getName())); if (instancePartitions != null) { instancePartitionsMap.put(tierConfig.getName(), instancePartitions); } @@ -245,9 +245,10 @@ public class PinotInstanceAssignmentRestletResource { TableConfig tableConfig, List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType) { String tableNameWithType = tableConfig.getTableName(); if (!TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) { - InstancePartitions existingInstancePartitions = - InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(), - InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString())); + InstancePartitions existingInstancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(), + InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString())); instancePartitionsMap.put(instancePartitionsType.toString(), new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions)); @@ -255,14 +256,15 @@ public class PinotInstanceAssignmentRestletResource { if (InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, instancePartitionsType)) { // fetch the existing instance partitions, if the table, this is referenced in the new instance partitions // generation for minimum difference - InstancePartitions existingInstancePartitions = InstancePartitionsUtils.fetchInstancePartitions( - _resourceManager.getHelixZkManager().getHelixPropertyStore(), - InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString())); + InstancePartitions existingInstancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(), + InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(tableNameWithType, + instancePartitionsType.toString())); String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); // fetch the pre-configured instance partitions, the renaming part is irrelevant as we are not really // preserving this preConfigured, but only using it as a reference to generate the new instance partitions - InstancePartitions preConfigured = - InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(), + InstancePartitions preConfigured = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(), tableConfig.getInstancePartitionsMap().get(instancePartitionsType), instancePartitionsType.getInstancePartitionsName(rawTableName)); instancePartitionsMap.put(instancePartitionsType.toString(), @@ -270,8 +272,8 @@ public class PinotInstanceAssignmentRestletResource { existingInstancePartitions, preConfigured)); } else { String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - instancePartitionsMap.put(instancePartitionsType.toString(), - InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(), + instancePartitionsMap.put(instancePartitionsType.toString(), InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(), tableConfig.getInstancePartitionsMap().get(instancePartitionsType), instancePartitionsType.getInstancePartitionsName(rawTableName))); } @@ -285,10 +287,10 @@ public class PinotInstanceAssignmentRestletResource { for (TierConfig tierConfig : tableConfig.getTierConfigsList()) { if ((tierConfig.getName().equals(tierName) || tierName == null) && tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()) != null) { - InstancePartitions existingInstancePartitions = InstancePartitionsUtils.fetchInstancePartitions( - _resourceManager.getHelixZkManager().getHelixPropertyStore(), - InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), - tierConfig.getName())); + InstancePartitions existingInstancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(), + InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsNameForTier(tableConfig.getTableName(), tierConfig.getName())); instancePartitionsMap.put(tierConfig.getName(), new InstanceAssignmentDriver(tableConfig).assignInstances(tierConfig.getName(), instanceConfigs, @@ -301,7 +303,7 @@ public class PinotInstanceAssignmentRestletResource { private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) { try { LOGGER.info("Persisting instance partitions: {}", instancePartitions); - InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(), + InstancePartitionsUtilsHelperFactory.create().persistInstancePartitions(_resourceManager.getPropertyStore(), _resourceManager.getHelixZkManager().getConfigAccessor(), _resourceManager.getHelixClusterName(), instancePartitions); } catch (Exception e) { @@ -352,7 +354,8 @@ public class PinotInstanceAssignmentRestletResource { for (TableConfig tableConfig : tableConfigs) { if (tableConfig != null && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) { for (TierConfig tierConfig : tableConfig.getTierConfigsList()) { - if (InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tierConfig.getName()) + if (InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsNameForTier(tableConfig.getTableName(), tierConfig.getName()) .equals(instancePartitionsName)) { persistInstancePartitionsHelper(instancePartitions); return Collections.singletonMap(tierConfig.getName(), instancePartitions); @@ -399,9 +402,8 @@ public class PinotInstanceAssignmentRestletResource { if (tableConfig != null && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) { for (TierConfig tierConfig : tableConfig.getTierConfigsList()) { if (instancePartitionsType == null || instancePartitionsType.equals(tierConfig.getName())) { - removeInstancePartitionsHelper( - InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), - tierConfig.getName())); + removeInstancePartitionsHelper(InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsNameForTier(tableConfig.getTableName(), tierConfig.getName())); } } } @@ -412,7 +414,8 @@ public class PinotInstanceAssignmentRestletResource { private void removeInstancePartitionsHelper(String instancePartitionsName) { try { LOGGER.info("Removing instance partitions: {}", instancePartitionsName); - InstancePartitionsUtils.removeInstancePartitions(_resourceManager.getPropertyStore(), instancePartitionsName); + InstancePartitionsUtilsHelperFactory.create() + .removeInstancePartitions(_resourceManager.getPropertyStore(), instancePartitionsName); } catch (Exception e) { throw new ControllerApplicationException(LOGGER, "Caught Exception while removing the instance partitions", Response.Status.INTERNAL_SERVER_ERROR, e); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java index 1fea68c75f..7ef327df18 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java @@ -52,7 +52,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -305,9 +305,8 @@ public class PinotTenantRestletResource { @QueryParam("instancePartitionType") String instancePartitionType) { String tenantNameWithType = InstancePartitionsType.valueOf(instancePartitionType) .getInstancePartitionsName(tenantName); - InstancePartitions instancePartitions = - InstancePartitionsUtils.fetchInstancePartitions(_pinotHelixResourceManager.getPropertyStore(), - tenantNameWithType); + InstancePartitions instancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitions(_pinotHelixResourceManager.getPropertyStore(), tenantNameWithType); if (instancePartitions == null) { throw new ControllerApplicationException(LOGGER, @@ -358,8 +357,9 @@ public class PinotTenantRestletResource { private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) { try { LOGGER.info("Persisting instance partitions: {}", instancePartitions); - InstancePartitionsUtils.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(), - _pinotHelixResourceManager.getHelixZkManager().getConfigAccessor(), + InstancePartitionsUtilsHelperFactory.create() + .persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(), + _pinotHelixResourceManager.getHelixZkManager().getConfigAccessor(), _pinotHelixResourceManager.getHelixClusterName(), instancePartitions); } catch (Exception e) { throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions", diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 57c75d7618..98a8c467f5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -90,7 +90,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.exception.SchemaAlreadyExistsException; @@ -142,6 +142,8 @@ import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelper; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.controller.helix.core.lineage.LineageManager; import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; @@ -232,11 +234,14 @@ public class PinotHelixResourceManager { private SegmentDeletionManager _segmentDeletionManager; private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; private TableCache _tableCache; + + private final PinotTableIdealStateHelper _pinotTableIdealStateHelper; private final LineageManager _lineageManager; public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, - boolean enableTieredSegmentAssignment, LineageManager lineageManager) { + boolean enableTieredSegmentAssignment, PinotTableIdealStateHelper pinotTableIdealStateHelper, + LineageManager lineageManager) { _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL); _helixClusterName = helixClusterName; _dataDir = dataDir; @@ -258,6 +263,7 @@ public class PinotHelixResourceManager { for (int i = 0; i < _tableUpdaterLocks.length; i++) { _tableUpdaterLocks[i] = new Object(); } + _pinotTableIdealStateHelper = pinotTableIdealStateHelper; _lineageManager = lineageManager; } @@ -265,7 +271,7 @@ public class PinotHelixResourceManager { this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(), controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(), - LineageManagerFactory.create(controllerConf)); + PinotTableIdealStateHelperFactory.create(), LineageManagerFactory.create(controllerConf)); } /** @@ -1583,9 +1589,7 @@ public class PinotHelixResourceManager { Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME, "Invalid table type: %s", tableType); - IdealState idealState = - PinotTableIdealStateHelper.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(), - _enableBatchMessageMode); + IdealState idealState = _pinotTableIdealStateHelper.buildEmptyIdealStateFor(tableConfig, _enableBatchMessageMode); // if (tableType == TableType.REALTIME) { // idealState = // PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), @@ -1763,7 +1767,7 @@ public class PinotHelixResourceManager { List<InstancePartitionsType> instancePartitionsTypesToAssign = new ArrayList<>(); for (InstancePartitionsType instancePartitionsType : InstancePartitionsType.values()) { if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) { - if (override || InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, + if (override || InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_propertyStore, instancePartitionsType.getInstancePartitionsName(rawTableName)) == null) { instancePartitionsTypesToAssign.add(instancePartitionsType); } @@ -1786,22 +1790,24 @@ public class PinotHelixResourceManager { } else { String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType); if (isPreConfigurationBasedAssignment) { - InstancePartitions preConfiguredInstancePartitions = - InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore, - referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName)); + InstancePartitions preConfiguredInstancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitionsWithRename(_propertyStore, referenceInstancePartitionsName, + instancePartitionsType.getInstancePartitionsName(rawTableName)); instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null, preConfiguredInstancePartitions); LOGGER.info("Persisting instance partitions: {} (based on {})", instancePartitions, preConfiguredInstancePartitions); } else { - instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore, - referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName)); + instancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitionsWithRename(_propertyStore, referenceInstancePartitionsName, + instancePartitionsType.getInstancePartitionsName(rawTableName)); LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions, referenceInstancePartitionsName); } } - InstancePartitionsUtils.persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(), - _helixClusterName, instancePartitions); + InstancePartitionsUtilsHelperFactory.create() + .persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(), _helixClusterName, + instancePartitions); } } @@ -1810,17 +1816,18 @@ public class PinotHelixResourceManager { && tableConfig.getInstanceAssignmentConfigMap() != null) { for (TierConfig tierConfig : tableConfig.getTierConfigsList()) { if (tableConfig.getInstanceAssignmentConfigMap().containsKey(tierConfig.getName())) { - if (override || InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, - InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, tierConfig.getName())) - == null) { + if (override || InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_propertyStore, + InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsNameForTier(tableNameWithType, tierConfig.getName())) == null) { LOGGER.info("Calculating instance partitions for tier: {}, table : {}", tierConfig.getName(), tableNameWithType); InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(tierConfig.getName(), instanceConfigs, null, tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName())); LOGGER.info("Persisting instance partitions: {}", instancePartitions); - InstancePartitionsUtils.persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(), - _helixClusterName, instancePartitions); + InstancePartitionsUtilsHelperFactory.create() + .persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(), _helixClusterName, + instancePartitions); } } } @@ -1971,18 +1978,18 @@ public class PinotHelixResourceManager { // Remove instance partitions if (tableType == TableType.OFFLINE) { - InstancePartitionsUtils.removeInstancePartitions(_propertyStore, tableNameWithType); + InstancePartitionsUtilsHelperFactory.create().removeInstancePartitions(_propertyStore, tableNameWithType); } else { String rawTableName = TableNameBuilder.extractRawTableName(tableName); - InstancePartitionsUtils.removeInstancePartitions(_propertyStore, + InstancePartitionsUtilsHelperFactory.create().removeInstancePartitions(_propertyStore, InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName)); - InstancePartitionsUtils.removeInstancePartitions(_propertyStore, + InstancePartitionsUtilsHelperFactory.create().removeInstancePartitions(_propertyStore, InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName)); } LOGGER.info("Deleting table {}: Removed instance partitions", tableNameWithType); // Remove tier instance partitions - InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, tableNameWithType); + InstancePartitionsUtilsHelperFactory.create().removeTierInstancePartitions(_propertyStore, tableNameWithType); LOGGER.info("Deleting table {}: Removed tier instance partitions", tableNameWithType); // Remove segment lineage @@ -2256,36 +2263,8 @@ public class PinotHelixResourceManager { SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics); synchronized (getTableUpdaterLock(tableNameWithType)) { - Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap; - HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { - assert idealState != null; - Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); - Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields(); - if (currentAssignment.containsKey(segmentName) && currentAssignmentList.containsKey(segmentName)) { - LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, - tableNameWithType); - } else { - List<String> assignedInstances = - segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); - LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, - tableNameWithType); - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); - if (tableType == TableType.REALTIME) { - // TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated - currentAssignmentList.put(segmentName, Collections.emptyList() - /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */); -// currentAssignment.put(segmentName, -// SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); - } else { - // TODO: Assess whether to pass in an empty instance list or to set the preferred list - currentAssignmentList.put(segmentName, Collections.emptyList() - /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */); - } - // currentAssignment.put(segmentName, - // SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); - } - return idealState; - }); + PinotTableIdealStateHelperFactory.create() + .assignSegment(_helixZkManager, tableNameWithType, segmentName, segmentAssignment, instancePartitionsMap); LOGGER.info("Added segment: {} to IdealState for table: {}", segmentName, tableNameWithType); } } catch (Exception e) { @@ -2306,22 +2285,23 @@ public class PinotHelixResourceManager { TableConfig tableConfig) { if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) { return Collections.singletonMap(InstancePartitionsType.OFFLINE, - InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, + InstancePartitionsUtilsHelperFactory.create().fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, InstancePartitionsType.OFFLINE)); } if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) { // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType. return Collections.singletonMap(InstancePartitionsType.CONSUMING, - InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, + InstancePartitionsUtilsHelperFactory.create().fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, InstancePartitionsType.CONSUMING)); } // for non-upsert realtime tables, if COMPLETED instance partitions is available or tag override for // completed segments is provided in the tenant config, COMPLETED instance partitions type is used // otherwise CONSUMING instance partitions type is used. InstancePartitionsType instancePartitionsType = InstancePartitionsType.COMPLETED; - InstancePartitions instancePartitions = InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, - InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString())); + InstancePartitions instancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitions(_propertyStore, InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString())); if (instancePartitions != null) { return Collections.singletonMap(instancePartitionsType, instancePartitions); } @@ -2329,8 +2309,8 @@ public class PinotHelixResourceManager { if (tagOverrideConfig == null || tagOverrideConfig.getRealtimeCompleted() == null) { instancePartitionsType = InstancePartitionsType.CONSUMING; } - return Collections.singletonMap(instancePartitionsType, - InstancePartitionsUtils.computeDefaultInstancePartitions(_helixZkManager, tableConfig, instancePartitionsType)); + return Collections.singletonMap(instancePartitionsType, InstancePartitionsUtilsHelperFactory.create() + .computeDefaultInstancePartitions(_helixZkManager, tableConfig, instancePartitionsType)); } public boolean isUpsertTable(String tableName) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java deleted file mode 100644 index 37c4b7555b..0000000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core; - -import java.util.List; -import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.builder.CustomModeISBuilder; -import org.apache.helix.model.builder.FullAutoModeISBuilder; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; -import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; -import org.apache.pinot.spi.stream.StreamConfig; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.apache.pinot.spi.utils.retry.RetryPolicies; -import org.apache.pinot.spi.utils.retry.RetryPolicy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class PinotTableIdealStateHelper { - private PinotTableIdealStateHelper() { - } - - private static final Logger LOGGER = LoggerFactory.getLogger(PinotTableIdealStateHelper.class); - private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY = - RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L); - - public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int numReplicas, - boolean enableBatchMessageMode) { - LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); - CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableNameWithType); - customModeIdealStateBuilder - .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) - .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1); - IdealState idealState = customModeIdealStateBuilder.build(); - idealState.setInstanceGroupTag(tableNameWithType); - idealState.setBatchMessageMode(enableBatchMessageMode); - return idealState; - } - - public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas, - boolean enableBatchMessageMode) { - LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); - String stateModel; - if (tableType == null) { - throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType); - } else if (TableType.OFFLINE.equals(tableType)) { - stateModel = - PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - } else { - stateModel = - PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - } - - // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default - // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too - FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType); - idealStateBuilder - .setStateModel(stateModel) - .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1) - // TODO: Revisit the rebalance strategy to use (maybe we add a custom one) - .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); - // The below config guarantees if active number of replicas is no less than minimum active replica, there will - // not be partition movements happened. - // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix - // controller waits at most 5 minutes and then re-calculate the participant assignment. - // TODO: Assess which of these values need to be tweaked, removed, and what additional values that need to be added - idealStateBuilder.setMinActiveReplica(numReplicas - 1); - idealStateBuilder.setRebalanceDelay(300_000); - idealStateBuilder.enableDelayRebalance(); - // Set instance group tag - IdealState idealState = idealStateBuilder.build(); - idealState.setInstanceGroupTag(tableNameWithType); - idealState.setBatchMessageMode(enableBatchMessageMode); - return idealState; - } - - /** - * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, - * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. - * - * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed: - * - * 1) - * The current {@link PartitionGroupConsumptionStatus} is used to determine the offsets that have been consumed for - * a partition group. - * An example of where the offsets would be used: - * e.g. If partition group 1 contains shardId 1, with status DONE and endOffset 150. There's 2 possibilities: - * 1) the stream indicates that shardId's last offset is 200. - * This tells Pinot that partition group 1 still has messages which haven't been consumed, and must be included in - * the response. - * 2) the stream indicates that shardId's last offset is 150, - * This tells Pinot that all messages of partition group 1 have been consumed, and it need not be included in the - * response. - * Thus, this call will skip a partition group when it has reached end of life and all messages from that partition - * group have been consumed. - * - * The current {@link PartitionGroupConsumptionStatus} is also used to know about existing groupings of partitions, - * and accordingly make the new partition groups. - * e.g. Assume that partition group 1 has status IN_PROGRESS and contains shards 0,1,2 - * and partition group 2 has status DONE and contains shards 3,4. - * In the above example, the <code>partitionGroupConsumptionStatusList</code> indicates that - * the collection of shards in partition group 1, should remain unchanged in the response, - * whereas shards 3,4 can be added to new partition groups if needed. - * - * @param streamConfig the streamConfig from the tableConfig - * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current - * partition groups. - * The size of this list is equal to the number of partition groups, - * and is created using the latest segment zk metadata. - */ - public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, - List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { - PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = - new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList); - try { - DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); - return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); - } catch (Exception e) { - Exception fetcherException = partitionGroupMetadataFetcher.getException(); - LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfig.getTopicName(), - streamConfig.getTableNameWithType(), fetcherException); - throw new RuntimeException(fetcherException); - } - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java index 6d869b86c1..8248a04283 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java @@ -25,7 +25,7 @@ import javax.annotation.Nullable; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig; @@ -76,9 +76,9 @@ public class InstanceAssignmentDriver { public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig) { - return getInstancePartitions( - InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(), tierName), - instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null); + return getInstancePartitions(InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsNameForTier(_tableConfig.getTableName(), tierName), instanceAssignmentConfig, + instanceConfigs, existingInstancePartitions, null); } private InstancePartitions getInstancePartitions(String instancePartitionsName, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java new file mode 100644 index 0000000000..e6cef02d99 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.helix.HelixManager; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.builder.CustomModeISBuilder; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; +import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; +import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DefaultPinotTableIdealStateHelper implements PinotTableIdealStateHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPinotTableIdealStateHelper.class); + private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY = + RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L); + + @Override + public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean enableBatchMessageMode) { + String tableNameWithType = tableConfig.getTableName(); + int numReplicas = tableConfig.getReplication(); + LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); + CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableNameWithType); + customModeIdealStateBuilder + .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) + .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1); + IdealState idealState = customModeIdealStateBuilder.build(); + idealState.setInstanceGroupTag(tableNameWithType); + idealState.setBatchMessageMode(enableBatchMessageMode); + return idealState; + } + + @Override + public void assignSegment(HelixManager helixManager, String tableNameWithType, String segmentName, + SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + HelixHelper.updateIdealState(helixManager, tableNameWithType, idealState -> { + assert idealState != null; + Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); + if (currentAssignment.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List<String> assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, + CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)); + } + return idealState; + }); + } + + @Override + public void updateInstanceStatesForNewConsumingSegment(IdealState idealState, @Nullable String committingSegmentName, + @Nullable String newSegmentName, SegmentAssignment segmentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + if (committingSegmentName != null) { + // Change committing segment state to ONLINE + Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); + instanceStatesMap.put(committingSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instances, + CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)); + LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName); + } + + // There used to be a race condition in pinot (caused by heavy GC on the controller during segment commit) + // that ended up creating multiple consuming segments for the same stream partition, named somewhat like + // tableName__1__25__20210920T190005Z and tableName__1__25__20210920T190007Z. It was fixed by checking the + // Zookeeper Stat object before updating the segment metadata. + // These conditions can happen again due to manual operations considered as fixes in Issues #5559 and #5263 + // The following check prevents the table from going into such a state (but does not prevent the root cause + // of attempting such a zk update). + if (newSegmentName != null) { + LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName); + int partitionId = newLLCSegmentName.getPartitionGroupId(); + int seqNum = newLLCSegmentName.getSequenceNumber(); + for (String segmentNameStr : instanceStatesMap.keySet()) { + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr); + if (llcSegmentName == null) { + // skip the segment name if the name is not in low-level consumer format + // such segment name can appear for uploaded segment + LOGGER.debug("Skip segment name {} not in low-level consumer format", segmentNameStr); + continue; + } + if (llcSegmentName.getPartitionGroupId() == partitionId && llcSegmentName.getSequenceNumber() == seqNum) { + String errorMsg = + String.format("Segment %s is a duplicate of existing segment %s", newSegmentName, segmentNameStr); + LOGGER.error(errorMsg); + throw new HelixHelper.PermanentUpdaterException(errorMsg); + } + } + // Assign instances to the new segment and add instances as state CONSUMING + List<String> instancesAssigned = + segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); + instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, + CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)); + LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); + } + } + + @Override + public List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { + PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = + new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList); + try { + DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); + return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); + } catch (Exception e) { + Exception fetcherException = partitionGroupMetadataFetcher.getException(); + LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfig.getTopicName(), + streamConfig.getTableNameWithType(), fetcherException); + throw new RuntimeException(fetcherException); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java new file mode 100644 index 0000000000..7b1889fdd7 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.helix.HelixManager; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.builder.FullAutoModeISBuilder; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator; +import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FullAutoPinotTableIdealStateHelper extends DefaultPinotTableIdealStateHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(FullAutoPinotTableIdealStateHelper.class); + + @Override + public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean enableBatchMessageMode) { + String tableNameWithType = tableConfig.getTableName(); + int numReplicas = tableConfig.getReplication(); + + LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + String stateModel; + if (tableType == null) { + throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType); + } else if (TableType.OFFLINE.equals(tableType)) { + stateModel = + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + } else { + stateModel = + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + } + + // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default + // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too + FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType); + idealStateBuilder + .setStateModel(stateModel) + .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1) + // TODO: Revisit the rebalance strategy to use (maybe we add a custom one) + .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); + // The below config guarantees if active number of replicas is no less than minimum active replica, there will + // not be partition movements happened. + // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix + // controller waits at most 5 minutes and then re-calculate the participant assignment. + // TODO: Assess which of these values need to be tweaked, removed, and what additional values that need to be added + idealStateBuilder.setMinActiveReplica(numReplicas - 1); + idealStateBuilder.setRebalanceDelay(300_000); + idealStateBuilder.enableDelayRebalance(); + // Set instance group tag + IdealState idealState = idealStateBuilder.build(); + idealState.setInstanceGroupTag(tableNameWithType); + idealState.setBatchMessageMode(enableBatchMessageMode); + return idealState; + } + + @Override + public void assignSegment(HelixManager helixManager, String tableNameWithType, String segmentName, + SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + HelixHelper.updateIdealState(helixManager, tableNameWithType, idealState -> { + assert idealState != null; + Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); + Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields(); + if (currentAssignment.containsKey(segmentName) && currentAssignmentList.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List<String> assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignmentList.put(segmentName, Collections.emptyList()); + } + return idealState; + }); + } + + @Override + public void updateInstanceStatesForNewConsumingSegment(IdealState idealState, @Nullable String committingSegmentName, + @Nullable String newSegmentName, SegmentAssignment segmentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + Map<String, List<String>> segmentList = idealState.getRecord().getListFields(); + // TODO: Need to figure out the best way to handle committed segments' state change + if (committingSegmentName != null) { + // Change committing segment state to ONLINE +// Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); +// instanceStatesMap.put(committingSegmentName, +// SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE)); +// instanceStatesList.put(newSegmentName, Collections.emptyList() +// /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); + LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName); + } + + // There used to be a race condition in pinot (caused by heavy GC on the controller during segment commit) + // that ended up creating multiple consuming segments for the same stream partition, named somewhat like + // tableName__1__25__20210920T190005Z and tableName__1__25__20210920T190007Z. It was fixed by checking the + // Zookeeper Stat object before updating the segment metadata. + // These conditions can happen again due to manual operations considered as fixes in Issues #5559 and #5263 + // The following check prevents the table from going into such a state (but does not prevent the root cause + // of attempting such a zk update). + if (newSegmentName != null) { + LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName); + int partitionId = newLLCSegmentName.getPartitionGroupId(); + int seqNum = newLLCSegmentName.getSequenceNumber(); + for (String segmentNameStr : instanceStatesMap.keySet()) { + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr); + if (llcSegmentName == null) { + // skip the segment name if the name is not in low-level consumer format + // such segment name can appear for uploaded segment + LOGGER.debug("Skip segment name {} not in low-level consumer format", segmentNameStr); + continue; + } + if (llcSegmentName.getPartitionGroupId() == partitionId && llcSegmentName.getSequenceNumber() == seqNum) { + String errorMsg = + String.format("Segment %s is a duplicate of existing segment %s", newSegmentName, segmentNameStr); + LOGGER.error(errorMsg); + throw new HelixHelper.PermanentUpdaterException(errorMsg); + } + } + // Assign instances to the new segment and add instances as state CONSUMING + List<String> instancesAssigned = + segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); + // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME +// instanceStatesMap.put(newSegmentName, +// SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); + // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list. + // Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support + // this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it + // looks like it does) + segmentList.put(newSegmentName, Collections.emptyList() + /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); + LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java new file mode 100644 index 0000000000..a30872df83 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.helix.HelixManager; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.StreamConfig; + + +public interface PinotTableIdealStateHelper { + + /** + * Builds an empty ideal state for the Pinot table. + * @param tableConfig table config. + * @param enableBatchMessageMode whether to enable batch message mode when building the ideal state. + */ + IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean enableBatchMessageMode); + + void assignSegment(HelixManager helixManager, String tableNameWithType, String segmentName, + SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap); + + void updateInstanceStatesForNewConsumingSegment(IdealState idealState, @Nullable String committingSegmentName, + @Nullable String newSegmentName, SegmentAssignment segmentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap); + + /** + * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, + * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. + * + * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed: + * + * 1) + * The current {@link PartitionGroupConsumptionStatus} is used to determine the offsets that have been consumed for + * a partition group. + * An example of where the offsets would be used: + * e.g. If partition group 1 contains shardId 1, with status DONE and endOffset 150. There's 2 possibilities: + * 1) the stream indicates that shardId's last offset is 200. + * This tells Pinot that partition group 1 still has messages which haven't been consumed, and must be included in + * the response. + * 2) the stream indicates that shardId's last offset is 150, + * This tells Pinot that all messages of partition group 1 have been consumed, and it need not be included in the + * response. + * Thus, this call will skip a partition group when it has reached end of life and all messages from that partition + * group have been consumed. + * + * The current {@link PartitionGroupConsumptionStatus} is also used to know about existing groupings of partitions, + * and accordingly make the new partition groups. + * e.g. Assume that partition group 1 has status IN_PROGRESS and contains shards 0,1,2 + * and partition group 2 has status DONE and contains shards 3,4. + * In the above example, the <code>partitionGroupConsumptionStatusList</code> indicates that + * the collection of shards in partition group 1, should remain unchanged in the response, + * whereas shards 3,4 can be added to new partition groups if needed. + * + * @param streamConfig the streamConfig from the tableConfig + * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current + * partition groups. + * The size of this list is equal to the number of partition groups, + * and is created using the latest segment zk metadata. + */ + List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java new file mode 100644 index 0000000000..9327ecba04 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import org.apache.pinot.controller.ControllerConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PinotTableIdealStateHelperFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotTableIdealStateHelperFactory.class); + private static PinotTableIdealStateHelper _instance = null; + private static ControllerConf _controllerConf; + + private PinotTableIdealStateHelperFactory() { + } + + public static void init(ControllerConf controllerConf) { + _controllerConf = controllerConf; + } + + public static PinotTableIdealStateHelper create() { + if (_instance == null) { + String pinotTableIdealstateHelperClassPath = _controllerConf.getPinotTableIdealstateHelperClass(); + try { + _instance = (PinotTableIdealStateHelper) Class.forName(pinotTableIdealstateHelperClassPath).newInstance(); + } catch (Exception e) { + LOGGER.error("PinotTableIdealStateHelper not found: {}", pinotTableIdealstateHelperClassPath); + throw new RuntimeException(e); + } + } + return _instance; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index c9850856cd..94d158220c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -36,7 +36,7 @@ import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; @@ -79,7 +79,7 @@ public class MissingConsumingSegmentFinder { _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>(); streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA); try { - PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig, Collections.emptyList()) + PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig, Collections.emptyList()) .forEach(metadata -> { _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); }); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 40215c43a4..a30731274c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -50,7 +50,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.messages.ForceCommitMessage; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; @@ -68,9 +68,9 @@ import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory; import org.apache.pinot.controller.api.resources.Constants; import org.apache.pinot.controller.api.resources.PauseStatus; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager; import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater; @@ -336,7 +336,7 @@ public class PinotLLCRealtimeSegmentManager { String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, numPartitionGroups, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, segmentName, + updateInstanceStatesForNewConsumingSegment(idealState, null, segmentName, segmentAssignment, instancePartitionsMap); } @@ -362,7 +362,7 @@ public class PinotLLCRealtimeSegmentManager { @VisibleForTesting InstancePartitions getConsumingInstancePartitions(TableConfig tableConfig) { try { - return InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager, tableConfig, + return InstancePartitionsUtilsHelperFactory.create().fetchOrComputeInstancePartitions(_helixManager, tableConfig, InstancePartitionsType.CONSUMING); } catch (Exception e) { _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, @@ -817,7 +817,7 @@ public class PinotLLCRealtimeSegmentManager { @VisibleForTesting List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig, List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) { - return PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig, + return PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); } @@ -1000,8 +1000,7 @@ public class PinotLLCRealtimeSegmentManager { throw new HelixHelper.PermanentUpdaterException( "Exceeded max segment completion time for segment " + committingSegmentName); } - updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), - idealState.getRecord().getListFields(), committingSegmentName, + updateInstanceStatesForNewConsumingSegment(idealState, committingSegmentName, isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap); return idealState; }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f)); @@ -1012,61 +1011,12 @@ public class PinotLLCRealtimeSegmentManager { } @VisibleForTesting - void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap, - Map<String, List<String>> instanceStatesList, - @Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment, + void updateInstanceStatesForNewConsumingSegment(IdealState idealState, @Nullable String committingSegmentName, + @Nullable String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { - // TODO: Need to figure out the best way to handle committed segments' state change - if (committingSegmentName != null) { - // Change committing segment state to ONLINE -// Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); -// instanceStatesMap.put(committingSegmentName, -// SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE)); -// instanceStatesList.put(newSegmentName, Collections.emptyList() -// /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); - LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName); - } - - // There used to be a race condition in pinot (caused by heavy GC on the controller during segment commit) - // that ended up creating multiple consuming segments for the same stream partition, named somewhat like - // tableName__1__25__20210920T190005Z and tableName__1__25__20210920T190007Z. It was fixed by checking the - // Zookeeper Stat object before updating the segment metadata. - // These conditions can happen again due to manual operations considered as fixes in Issues #5559 and #5263 - // The following check prevents the table from going into such a state (but does not prevent the root cause - // of attempting such a zk update). - if (newSegmentName != null) { - LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName); - int partitionId = newLLCSegmentName.getPartitionGroupId(); - int seqNum = newLLCSegmentName.getSequenceNumber(); - for (String segmentNameStr : instanceStatesMap.keySet()) { - LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr); - if (llcSegmentName == null) { - // skip the segment name if the name is not in low-level consumer format - // such segment name can appear for uploaded segment - LOGGER.debug("Skip segment name {} not in low-level consumer format", segmentNameStr); - continue; - } - if (llcSegmentName.getPartitionGroupId() == partitionId && llcSegmentName.getSequenceNumber() == seqNum) { - String errorMsg = - String.format("Segment %s is a duplicate of existing segment %s", newSegmentName, segmentNameStr); - LOGGER.error(errorMsg); - throw new HelixHelper.PermanentUpdaterException(errorMsg); - } - } - // Assign instances to the new segment and add instances as state CONSUMING - List<String> instancesAssigned = - segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); - // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME -// instanceStatesMap.put(newSegmentName, -// SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); - // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list. - // Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support - // this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it - // looks like it does) - instanceStatesList.put(newSegmentName, Collections.emptyList() - /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); - LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); - } + PinotTableIdealStateHelperFactory.create() + .updateInstanceStatesForNewConsumingSegment(idealState, committingSegmentName, newSegmentName, + segmentAssignment, instancePartitionsMap); } /* @@ -1222,14 +1172,14 @@ public class PinotLLCRealtimeSegmentManager { (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName, - newSegmentName, segmentAssignment, instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(idealState, latestSegmentName, newSegmentName, + segmentAssignment, instancePartitionsMap); } else { // partition group reached end of life LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. " + "Skipping creation of new ZK metadata and new segment in ideal state", partitionGroupId, latestSegmentName); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName, - null, segmentAssignment, instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(idealState, latestSegmentName, null, segmentAssignment, + instancePartitionsMap); } } // else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment. @@ -1253,7 +1203,7 @@ public class PinotLLCRealtimeSegmentManager { partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, - newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList, + newPartitionGroupMetadataList, instancePartitions, idealState, segmentAssignment, instancePartitionsMap, startOffset); } else { if (newPartitionGroupSet.contains(partitionGroupId)) { @@ -1272,7 +1222,7 @@ public class PinotLLCRealtimeSegmentManager { partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getEndOffset()); createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, - newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList, + newPartitionGroupMetadataList, instancePartitions, idealState, segmentAssignment, instancePartitionsMap, startOffset); } else { LOGGER.error( @@ -1320,8 +1270,8 @@ public class PinotLLCRealtimeSegmentManager { partitionGroupId, realtimeTableName); _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); } - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, previousConsumingSegment, - latestSegmentName, segmentAssignment, instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(idealState, previousConsumingSegment, latestSegmentName, + segmentAssignment, instancePartitionsMap); } else { LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}", latestSegmentZKMetadata.getStatus(), latestSegmentName); @@ -1336,8 +1286,8 @@ public class PinotLLCRealtimeSegmentManager { String newSegmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, numPartitions, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, null, newSegmentName, - segmentAssignment, instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(idealState, null, newSegmentName, segmentAssignment, + instancePartitionsMap); } } @@ -1347,8 +1297,7 @@ public class PinotLLCRealtimeSegmentManager { private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig streamConfig, SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions, - Map<String, Map<String, String>> instanceStatesMap, Map<String, List<String>> instancesStateList, - SegmentAssignment segmentAssignment, + IdealState idealState, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) { int numReplicas = getNumReplicas(tableConfig, instancePartitions); int numPartitions = newPartitionGroupMetadataList.size(); @@ -1359,7 +1308,7 @@ public class PinotLLCRealtimeSegmentManager { createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); String newSegmentName = newLLCSegmentName.getSegmentName(); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, newSegmentName, + updateInstanceStatesForNewConsumingSegment(idealState, null, newSegmentName, segmentAssignment, instancePartitionsMap); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index f97aa7b06a..dcba5949d3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -48,7 +48,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.ControllerTimer; import org.apache.pinot.common.tier.PinotServerTierStorage; @@ -589,11 +589,11 @@ public class TableRebalancer { "COMPLETED segments should not be relocated, skipping fetching/computing COMPLETED instance partitions " + "for table: {}", tableNameWithType); if (!dryRun) { - String instancePartitionsName = InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, - InstancePartitionsType.COMPLETED.toString()); + String instancePartitionsName = InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsName(tableNameWithType, InstancePartitionsType.COMPLETED.toString()); LOGGER.info("Removing instance partitions: {} from ZK if it exists", instancePartitionsName); - InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(), - instancePartitionsName); + InstancePartitionsUtilsHelperFactory.create() + .removeInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName); } } } @@ -606,10 +606,10 @@ public class TableRebalancer { private Pair<InstancePartitions, Boolean> getInstancePartitions(TableConfig tableConfig, InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean bootstrap, boolean dryRun) { String tableNameWithType = tableConfig.getTableName(); - String instancePartitionsName = - InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()); - InstancePartitions existingInstancePartitions = - InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName); + String instancePartitionsName = InstancePartitionsUtilsHelperFactory.create() + .getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()); + InstancePartitions existingInstancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName); if (reassignInstances) { if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) { @@ -630,14 +630,15 @@ public class TableRebalancer { instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); - InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), - _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions); + InstancePartitionsUtilsHelperFactory.create() + .persistInstancePartitions(_helixManager.getHelixPropertyStore(), _helixManager.getConfigAccessor(), + _helixManager.getClusterName(), instancePartitions); } } else { String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType); if (isPreConfigurationBasedAssignment) { - InstancePartitions preConfiguredInstancePartitions = - InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(), + InstancePartitions preConfiguredInstancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(), referenceInstancePartitionsName, instancePartitionsName); instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true), @@ -646,19 +647,21 @@ public class TableRebalancer { if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} (based on {})", instancePartitions, preConfiguredInstancePartitions); - InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), - _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions); + InstancePartitionsUtilsHelperFactory.create() + .persistInstancePartitions(_helixManager.getHelixPropertyStore(), _helixManager.getConfigAccessor(), + _helixManager.getClusterName(), instancePartitions); } } else { - instancePartitions = - InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(), + instancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(), referenceInstancePartitionsName, instancePartitionsName); instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions, referenceInstancePartitionsName); - InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), - _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions); + InstancePartitionsUtilsHelperFactory.create() + .persistInstancePartitions(_helixManager.getHelixPropertyStore(), _helixManager.getConfigAccessor(), + _helixManager.getClusterName(), instancePartitions); } } } @@ -666,22 +669,20 @@ public class TableRebalancer { } else { LOGGER.info("{} instance assignment is not allowed, using default instance partitions for table: {}", instancePartitionsType, tableNameWithType); - InstancePartitions instancePartitions = - InstancePartitionsUtils.computeDefaultInstancePartitions(_helixManager, tableConfig, - instancePartitionsType); + InstancePartitions instancePartitions = InstancePartitionsUtilsHelperFactory.create() + .computeDefaultInstancePartitions(_helixManager, tableConfig, instancePartitionsType); boolean noExistingInstancePartitions = existingInstancePartitions == null; if (!dryRun && !noExistingInstancePartitions) { LOGGER.info("Removing instance partitions: {} from ZK", instancePartitionsName); - InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(), - instancePartitionsName); + InstancePartitionsUtilsHelperFactory.create() + .removeInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName); } return Pair.of(instancePartitions, noExistingInstancePartitions); } } else { LOGGER.info("Fetching/computing {} instance partitions for table: {}", instancePartitionsType, tableNameWithType); - return Pair.of( - InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager, tableConfig, instancePartitionsType), - true); + return Pair.of(InstancePartitionsUtilsHelperFactory.create() + .fetchOrComputeInstancePartitions(_helixManager, tableConfig, instancePartitionsType), true); } } @@ -730,9 +731,9 @@ public class TableRebalancer { String tableNameWithType = tableConfig.getTableName(); String tierName = tier.getName(); String instancePartitionsName = - InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, tierName); - InstancePartitions existingInstancePartitions = - InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName); + InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsNameForTier(tableNameWithType, tierName); + InstancePartitions existingInstancePartitions = InstancePartitionsUtilsHelperFactory.create() + .fetchInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName); if (reassignInstances) { Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap(); @@ -743,14 +744,13 @@ public class TableRebalancer { "Instance assignment config for tier: {} does not exist for table: {}, using default instance partitions", tierName, tableNameWithType); PinotServerTierStorage storage = (PinotServerTierStorage) tier.getStorage(); - InstancePartitions instancePartitions = - InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, tableNameWithType, tierName, - storage.getServerTag()); + InstancePartitions instancePartitions = InstancePartitionsUtilsHelperFactory.create() + .computeDefaultInstancePartitionsForTag(_helixManager, tableNameWithType, tierName, storage.getServerTag()); boolean noExistingInstancePartitions = existingInstancePartitions == null; if (!dryRun && !noExistingInstancePartitions) { LOGGER.info("Removing instance partitions: {} from ZK", instancePartitionsName); - InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(), - instancePartitionsName); + InstancePartitionsUtilsHelperFactory.create() + .removeInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName); } return Pair.of(instancePartitions, noExistingInstancePartitions); } else { @@ -763,8 +763,9 @@ public class TableRebalancer { boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); - InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), - _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions); + InstancePartitionsUtilsHelperFactory.create() + .persistInstancePartitions(_helixManager.getHelixPropertyStore(), _helixManager.getConfigAccessor(), + _helixManager.getClusterName(), instancePartitions); } return Pair.of(instancePartitions, instancePartitionsUnchanged); } @@ -773,9 +774,8 @@ public class TableRebalancer { return Pair.of(existingInstancePartitions, true); } else { PinotServerTierStorage storage = (PinotServerTierStorage) tier.getStorage(); - InstancePartitions instancePartitions = - InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, tableNameWithType, tierName, - storage.getServerTag()); + InstancePartitions instancePartitions = InstancePartitionsUtilsHelperFactory.create() + .computeDefaultInstancePartitionsForTag(_helixManager, tableNameWithType, tierName, storage.getServerTag()); return Pair.of(instancePartitions, true); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java index a14010f17f..778c264d54 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java @@ -48,6 +48,7 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator; import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator; import org.apache.pinot.controller.helix.core.PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator; +import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,6 +168,20 @@ public class HelixSetupUtils { helixDataAccessor.createStateModelDef( PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); } + + String segmentStateModelName = + PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + StateModelDefinition defaultStateModelDefinition = + helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName); + if (defaultStateModelDefinition == null || isUpdateStateModel) { + if (defaultStateModelDefinition == null) { + LOGGER.info("Adding state model: {} with CONSUMING state", segmentStateModelName); + } else { + LOGGER.info("Updating state model: {} to contain CONSUMING state", segmentStateModelName); + } + helixDataAccessor.createStateModelDef( + PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); + } } private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin, diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java index dedc79384e..a95b1bb40d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java @@ -25,7 +25,7 @@ import java.util.HashMap; import java.util.Map; import java.util.TreeMap; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.controller.ControllerConf; @@ -238,7 +238,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, TIER_NAME))); assertEquals(instancePartitionsMap.size(), 1); assertEquals(instancePartitionsMap.get(TIER_NAME).getInstancePartitionsName(), - InstancePartitionsUtils.getInstancePartitionsNameForTier(RAW_TABLE_NAME, TIER_NAME)); + InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsNameForTier(RAW_TABLE_NAME, TIER_NAME)); // Remove the instance partitions for both offline and real-time table sendDeleteRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null)); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java index c9a42536d3..e74de87dfd 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java @@ -27,7 +27,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.TreeSet; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.tier.PinotServerTierStorage; import org.apache.pinot.common.tier.Tier; import org.apache.pinot.common.tier.TierFactory; @@ -77,21 +77,21 @@ public class OfflineNonReplicaGroupTieredSegmentAssignmentTest { private static final List<String> INSTANCES_TIER_A = SegmentAssignmentTestUtils.getNameList(TIER_A_INSTANCE_NAME_PREFIX, NUM_INSTANCES_TIER_A); private static final String TIER_A_INSTANCE_PARTITIONS_NAME = - InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, TIER_A_NAME); + InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME, TIER_A_NAME); private static final String TIER_B_INSTANCE_NAME_PREFIX = "tierB_instance_"; private static final int NUM_INSTANCES_TIER_B = 4; private static final List<String> INSTANCES_TIER_B = SegmentAssignmentTestUtils.getNameList(TIER_B_INSTANCE_NAME_PREFIX, NUM_INSTANCES_TIER_B); private static final String TIER_B_INSTANCE_PARTITIONS_NAME = - InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, TIER_B_NAME); + InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME, TIER_B_NAME); private static final String TIER_C_INSTANCE_NAME_PREFIX = "tierC_instance_"; private static final int NUM_INSTANCES_TIER_C = 3; private static final List<String> INSTANCES_TIER_C = SegmentAssignmentTestUtils.getNameList(TIER_C_INSTANCE_NAME_PREFIX, NUM_INSTANCES_TIER_C); private static final String TIER_C_INSTANCE_PARTITIONS_NAME = - InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, TIER_C_NAME); + InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME, TIER_C_NAME); private Map<InstancePartitionsType, InstancePartitions> _instancePartitionsMap; private Map<String, InstancePartitions> _tierInstancePartitionsMap; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java index e7b5895a47..48c37afb8b 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.tier.PinotServerTierStorage; import org.apache.pinot.common.tier.Tier; import org.apache.pinot.common.tier.TierFactory; @@ -80,21 +80,21 @@ public class RealtimeNonReplicaGroupTieredSegmentAssignmentTest { private static final List<String> INSTANCES_TIER_A = SegmentAssignmentTestUtils.getNameList(TIER_A_INSTANCE_NAME_PREFIX, NUM_INSTANCES_TIER_A); private static final String TIER_A_INSTANCE_PARTITIONS_NAME = - InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, TIER_A_NAME); + InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME, TIER_A_NAME); private static final String TIER_B_INSTANCE_NAME_PREFIX = "tierB_instance_"; private static final int NUM_INSTANCES_TIER_B = 4; private static final List<String> INSTANCES_TIER_B = SegmentAssignmentTestUtils.getNameList(TIER_B_INSTANCE_NAME_PREFIX, NUM_INSTANCES_TIER_B); private static final String TIER_B_INSTANCE_PARTITIONS_NAME = - InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, TIER_B_NAME); + InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME, TIER_B_NAME); private static final String TIER_C_INSTANCE_NAME_PREFIX = "tierC_instance_"; private static final int NUM_INSTANCES_TIER_C = 3; private static final List<String> INSTANCES_TIER_C = SegmentAssignmentTestUtils.getNameList(TIER_C_INSTANCE_NAME_PREFIX, NUM_INSTANCES_TIER_C); private static final String TIER_C_INSTANCE_PARTITIONS_NAME = - InstancePartitionsUtils.getInstancePartitionsName(RAW_TABLE_NAME, TIER_C_NAME); + InstancePartitionsUtilsHelperFactory.create().getInstancePartitionsName(RAW_TABLE_NAME, TIER_C_NAME); private List<String> _segments; private Map<InstancePartitionsType, InstancePartitions> _instancePartitionsMap; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 26641d515c..ee90649080 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1220,11 +1220,9 @@ public class PinotLLCRealtimeSegmentManagerTest { void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { - updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), - _idealState.getRecord().getListFields(), committingSegmentName, null, + updateInstanceStatesForNewConsumingSegment(_idealState, committingSegmentName, null, segmentAssignment, instancePartitionsMap); - updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), - _idealState.getRecord().getListFields(), null, newSegmentName, + updateInstanceStatesForNewConsumingSegment(_idealState, null, newSegmentName, segmentAssignment, instancePartitionsMap); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 5d679c0380..1702fb6063 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -25,7 +25,7 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.assignment.InstancePartitionsUtilsHelperFactory; import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.controller.helix.ControllerTest; @@ -257,7 +257,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { rebalanceConfig.setReassignInstances(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); - assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore, + assertNull(InstancePartitionsUtilsHelperFactory.create().fetchInstancePartitions(_propertyStore, InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME))); // All servers should be assigned to the table diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java index dc988ad669..017b0b2a5a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -33,8 +33,8 @@ import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper; import org.apache.pinot.controller.helix.core.SegmentDeletionManager; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -270,9 +270,7 @@ public class RetentionManagerTest { final int replicaCount = tableConfig.getReplication(); List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); - - IdealState idealState = - PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, replicaCount, true); + IdealState idealState = PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, true); final int kafkaPartition = 5; final long millisInDays = TimeUnit.DAYS.toMillis(1); @@ -334,9 +332,7 @@ public class RetentionManagerTest { final int replicaCount = tableConfig.getReplication(); List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); - - IdealState idealState = - PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, replicaCount, true); + IdealState idealState = PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, true); final int kafkaPartition = 5; final long millisInDays = TimeUnit.DAYS.toMillis(1); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java index b14cb9e240..08de258d62 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java @@ -46,7 +46,6 @@ import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -211,23 +210,16 @@ public class MoveReplicaGroup extends AbstractBaseAdminCommand implements Comman @Nullable @Override public IdealState apply(@Nullable IdealState input) { - Map<String, Map<String, String>> existingMapField = input.getRecord().getMapFields(); Map<String, List<String>> existingListField = input.getRecord().getListFields(); - TableType tableType = TableNameBuilder.getTableTypeFromTableName(_tableName); for (Map.Entry<String, Map<String, String>> segmentEntry : proposedIdealState.entrySet()) { // existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue()); - if (tableType == TableType.REALTIME) { - // TODO: Update listField only once REALTIME uses FULL-AUTO - existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue()); - } else { - String segmentName = segmentEntry.getKey(); - Map<String, String> segmentMapping = segmentEntry.getValue(); - List<String> listOfHosts = new ArrayList<>(segmentMapping.keySet()); - Collections.sort(listOfHosts); - // TODO: Assess if we want to add the preferred list of hosts or not - existingListField.put(segmentName, Collections.emptyList() /* listOfHosts */); - } + String segmentName = segmentEntry.getKey(); + Map<String, String> segmentMapping = segmentEntry.getValue(); + List<String> listOfHosts = new ArrayList<>(segmentMapping.keySet()); + Collections.sort(listOfHosts); + // TODO: Assess if we want to add the preferred list of hosts or not + existingListField.put(segmentName, Collections.emptyList() /* listOfHosts */); } return input; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org