This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 24147dd [Issue 6068] Fixing the calls to Helix to throw exception if zk conne… (#6069) 24147dd is described below commit 24147dd5b7dc188e01af283bd0d025a6cec3527b Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com> AuthorDate: Mon Oct 5 17:07:53 2020 -0700 [Issue 6068] Fixing the calls to Helix to throw exception if zk conne… (#6069) * [Issue 6068] Fixing the calls to Helix to throw exception if zk connection is broken See Issue #6068 * Update zk metadata APIs to use timeout zk timeout These APIs will ensure that if there is a zk disconnect we will get an exception after a minimal number of retries. We can change this to retry once and implement a backoff retry if needed later on. Note that the underlying helix library ends up calling the previous API (as yet), but we will upgrade to a helix version soon that actually implements these * Fixing a unit test mock call * Updating more calls to getChildren --- .../java/org/apache/pinot/broker/routing/RoutingManager.java | 3 ++- .../org/apache/pinot/common/metadata/ZKMetadataProvider.java | 10 +++++++--- .../java/org/apache/pinot/common/utils/CommonConstants.java | 3 +++ .../java/org/apache/pinot/common/utils/helix/HelixHelper.java | 2 +- .../pinot/controller/helix/core/PinotHelixResourceManager.java | 2 +- .../pinot/controller/helix/core/rebalance/TableRebalancer.java | 2 +- .../segment/OfflineReplicaGroupSegmentAssignmentTest.java | 2 +- .../main/java/org/apache/pinot/tools/UpdateSegmentState.java | 4 +++- 8 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java index 8fe3f52..940e40c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java @@ -213,7 +213,8 @@ public class RoutingManager implements ClusterChangeHandler { long startTimeMs = System.currentTimeMillis(); List<ZNRecord> instanceConfigZNRecords = - _zkDataAccessor.getChildren(_instanceConfigsPath, null, AccessOption.PERSISTENT); + _zkDataAccessor.getChildren(_instanceConfigsPath, null, AccessOption.PERSISTENT, + CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS); long fetchInstanceConfigsEndTimeMs = System.currentTimeMillis(); // Calculate new enabled and disabled instances diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 8278777..af5a5d6 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -31,6 +31,7 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; +import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.SegmentName; import org.apache.pinot.common.utils.StringUtil; @@ -305,7 +306,8 @@ public class ZKMetadataProvider { ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) { String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName); String parentPath = constructPropertyStorePathForResource(offlineTableName); - List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT); + List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT, + CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS); if (znRecords != null) { int numZNRecords = znRecords.size(); List<OfflineSegmentZKMetadata> offlineSegmentZKMetadataList = new ArrayList<>(numZNRecords); @@ -335,7 +337,8 @@ public class ZKMetadataProvider { ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) { String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); String parentPath = constructPropertyStorePathForResource(realtimeTableName); - List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT); + List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT, + CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS); if (znRecords != null) { int numZNRecords = znRecords.size(); List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList = new ArrayList<>(numZNRecords); @@ -365,7 +368,8 @@ public class ZKMetadataProvider { ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) { String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName); String parentPath = constructPropertyStorePathForResource(realtimeTableName); - List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT); + List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT, + CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS); if (znRecords != null) { int numZNRecords = znRecords.size(); List<LLCRealtimeSegmentZKMetadata> llcRealtimeSegmentZKMetadataList = new ArrayList<>(numZNRecords); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 9b18af7..2ed1259 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -90,6 +90,9 @@ public class CommonConstants { public static class ZkClient { public static final long DEFAULT_CONNECT_TIMEOUT_SEC = 60L; + // Retry interval and count for ZK operations where we would rather fail than get an empty (wrong) result back + public static final int RETRY_INTERVAL_MS = 50; + public static final int RETRY_COUNT = 2; } public static class DataSource { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java index 064a699..7430723 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java @@ -405,7 +405,7 @@ public class HelixHelper { */ public static List<InstanceConfig> getInstanceConfigs(HelixManager helixManager) { HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor(); - return helixDataAccessor.getChildValues(helixDataAccessor.keyBuilder().instanceConfigs()); + return helixDataAccessor.getChildValues(helixDataAccessor.keyBuilder().instanceConfigs(), true); } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 0cc0a34..b631a01 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2159,7 +2159,7 @@ public class PinotHelixResourceManager { boolean toggleSucceeded = true; // Checks all the current states fall into the target states PropertyKey instanceCurrentStatesKey = _keyBuilder.currentStates(instanceName, liveInstance.getSessionId()); - List<CurrentState> instanceCurrentStates = _helixDataAccessor.getChildValues(instanceCurrentStatesKey); + List<CurrentState> instanceCurrentStates = _helixDataAccessor.getChildValues(instanceCurrentStatesKey, true); if (instanceCurrentStates.isEmpty()) { return PinotResourceManagerResponse.SUCCESS; } else { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index e23aab6..644e990 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -421,7 +421,7 @@ public class TableRebalancer { LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType); InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig); InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, - _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs())); + _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true)); if (!dryRun) { LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java index 340a46a..f6728c4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java @@ -128,7 +128,7 @@ public class OfflineReplicaGroupSegmentAssignmentTest { } when(propertyStoreWithPartitions .getChildren(eq(ZKMetadataProvider.constructPropertyStorePathForResource(OFFLINE_TABLE_NAME_WITH_PARTITION)), - any(), anyInt())).thenReturn(segmentZKMetadataZNRecords); + any(), anyInt(), anyInt(), anyInt())).thenReturn(segmentZKMetadataZNRecords); HelixManager helixManagerWithPartitions = mock(HelixManager.class); when(helixManagerWithPartitions.getHelixPropertyStore()).thenReturn(propertyStoreWithPartitions); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpdateSegmentState.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpdateSegmentState.java index fe7b090..e4bcc8a 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpdateSegmentState.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpdateSegmentState.java @@ -28,6 +28,7 @@ import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.kohsuke.args4j.Option; @@ -133,7 +134,8 @@ public class UpdateSegmentState extends AbstractBaseCommand implements Command { public List<String> getAllTenantTables() throws Exception { String tableConfigPath = "/CONFIGS/TABLE"; - List<ZNRecord> tableConfigs = _propertyStore.getChildren(tableConfigPath, null, 0); + List<ZNRecord> tableConfigs = _propertyStore.getChildren(tableConfigPath, null, 0, + CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS); List<String> tables = new ArrayList<>(128); for (ZNRecord znRecord : tableConfigs) { TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org