This is an automated email from the ASF dual-hosted git repository. richardstartin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f67b5aa Cache ideal state and external view paths in RoutingEntry (#8296) f67b5aa is described below commit f67b5aacc5b494a2f5d78e87d67a84ea3aadc99a Author: Richard Startin <rich...@startree.ai> AuthorDate: Sun Mar 6 10:06:30 2022 +0000 Cache ideal state and external view paths in RoutingEntry (#8296) * cache ideal state and external view paths in RoutingEntry * review comments --- .../pinot/broker/routing/RoutingManager.java | 51 ++++++++++++++-------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java index ca14dd0..ef97aee 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java @@ -134,10 +134,9 @@ public class RoutingManager implements ClusterChangeHandler { List<String> idealStatePaths = new ArrayList<>(numTables); List<String> externalViewPaths = new ArrayList<>(numTables); for (Map.Entry<String, RoutingEntry> entry : _routingEntryMap.entrySet()) { - String tableNameWithType = entry.getKey(); routingEntries.add(entry.getValue()); - idealStatePaths.add(_idealStatePathPrefix + tableNameWithType); - externalViewPaths.add(_externalViewPathPrefix + tableNameWithType); + idealStatePaths.add(entry.getValue()._idealStatePath); + externalViewPaths.add(entry.getValue()._externalViewPath); } Stat[] idealStateStats = _zkDataAccessor.getStats(idealStatePaths, AccessOption.PERSISTENT); Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths, AccessOption.PERSISTENT); @@ -154,13 +153,13 @@ public class RoutingManager implements ClusterChangeHandler { String tableNameWithType = routingEntry.getTableNameWithType(); tablesToUpdate.add(tableNameWithType); try { - IdealState idealState = getIdealState(tableNameWithType); + IdealState idealState = getIdealState(routingEntry._idealStatePath); if (idealState == null) { LOGGER.warn("Failed to find ideal state for table: {}, skipping updating routing entry", tableNameWithType); continue; } - ExternalView externalView = getExternalView(tableNameWithType); + ExternalView externalView = getExternalView(routingEntry._externalViewPath); if (externalView == null) { LOGGER.warn("Failed to find external view for table: {}, skipping updating routing entry", tableNameWithType); @@ -185,9 +184,9 @@ public class RoutingManager implements ClusterChangeHandler { } @Nullable - private IdealState getIdealState(String tableNameWithType) { + private IdealState getIdealState(String idealStatePath) { Stat stat = new Stat(); - ZNRecord znRecord = _zkDataAccessor.get(_idealStatePathPrefix + tableNameWithType, stat, AccessOption.PERSISTENT); + ZNRecord znRecord = _zkDataAccessor.get(idealStatePath, stat, AccessOption.PERSISTENT); if (znRecord != null) { znRecord.setVersion(stat.getVersion()); return new IdealState(znRecord); @@ -197,9 +196,9 @@ public class RoutingManager implements ClusterChangeHandler { } @Nullable - private ExternalView getExternalView(String tableNameWithType) { + private ExternalView getExternalView(String externalViewPath) { Stat stat = new Stat(); - ZNRecord znRecord = _zkDataAccessor.get(_externalViewPathPrefix + tableNameWithType, stat, AccessOption.PERSISTENT); + ZNRecord znRecord = _zkDataAccessor.get(externalViewPath, stat, AccessOption.PERSISTENT); if (znRecord != null) { znRecord.setVersion(stat.getVersion()); return new ExternalView(znRecord); @@ -303,11 +302,13 @@ public class RoutingManager implements ClusterChangeHandler { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); - IdealState idealState = getIdealState(tableNameWithType); + String idealStatePath = getIdealStatePath(tableNameWithType); + IdealState idealState = getIdealState(idealStatePath); Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType); int idealStateVersion = idealState.getRecord().getVersion(); - ExternalView externalView = getExternalView(tableNameWithType); + String externalViewPath = getExternalViewPath(tableNameWithType); + ExternalView externalView = getExternalView(externalViewPath); int externalViewVersion; // NOTE: External view might be null for new created tables. In such case, create an empty one and set the version // to -1 to ensure the version does not match the next external view @@ -355,11 +356,11 @@ public class RoutingManager implements ClusterChangeHandler { TableConfig offlineTableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName); Preconditions.checkState(offlineTableConfig != null, "Failed to find table config for table: %s", offlineTableName); - IdealState offlineTableIdealState = getIdealState(offlineTableName); + IdealState offlineTableIdealState = getIdealState(getIdealStatePath(offlineTableName)); Preconditions.checkState(offlineTableIdealState != null, "Failed to find ideal state for table: %s", offlineTableName); // NOTE: External view might be null for new created tables. In such case, create an empty one. - ExternalView offlineTableExternalView = getExternalView(offlineTableName); + ExternalView offlineTableExternalView = getExternalView(getExternalViewPath(offlineTableName)); if (offlineTableExternalView == null) { offlineTableExternalView = new ExternalView(offlineTableName); } @@ -380,8 +381,9 @@ public class RoutingManager implements ClusterChangeHandler { Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() : null; RoutingEntry routingEntry = - new RoutingEntry(tableNameWithType, segmentPreSelector, segmentSelector, segmentPruners, instanceSelector, - idealStateVersion, externalViewVersion, timeBoundaryManager, queryTimeoutMs); + new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath, segmentPreSelector, segmentSelector, + segmentPruners, instanceSelector, idealStateVersion, externalViewVersion, timeBoundaryManager, + queryTimeoutMs); if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) { LOGGER.info("Built routing for table: {}", tableNameWithType); } else { @@ -477,6 +479,14 @@ public class RoutingManager implements ClusterChangeHandler { return new RoutingTable(serverInstanceToSegmentsMap, selectionResult.getUnavailableSegments()); } + private String getIdealStatePath(String tableNameWithType) { + return _idealStatePathPrefix + tableNameWithType; + } + + private String getExternalViewPath(String tableNameWithType) { + return _externalViewPathPrefix + tableNameWithType; + } + /** * Returns the time boundary info for the given offline table, or {@code null} if the routing or time boundary does * not exist. @@ -504,6 +514,8 @@ public class RoutingManager implements ClusterChangeHandler { private static class RoutingEntry { final String _tableNameWithType; + final String _idealStatePath; + final String _externalViewPath; final SegmentPreSelector _segmentPreSelector; final SegmentSelector _segmentSelector; final List<SegmentPruner> _segmentPruners; @@ -516,11 +528,14 @@ public class RoutingManager implements ClusterChangeHandler { // Time boundary manager is only available for the offline part of the hybrid table transient TimeBoundaryManager _timeBoundaryManager; - RoutingEntry(String tableNameWithType, SegmentPreSelector segmentPreSelector, SegmentSelector segmentSelector, - List<SegmentPruner> segmentPruners, InstanceSelector instanceSelector, int lastUpdateIdealStateVersion, - int lastUpdateExternalViewVersion, @Nullable TimeBoundaryManager timeBoundaryManager, + RoutingEntry(String tableNameWithType, String idealStatePath, String externalViewPath, + SegmentPreSelector segmentPreSelector, SegmentSelector segmentSelector, List<SegmentPruner> segmentPruners, + InstanceSelector instanceSelector, int lastUpdateIdealStateVersion, int lastUpdateExternalViewVersion, + @Nullable TimeBoundaryManager timeBoundaryManager, @Nullable Long queryTimeoutMs) { _tableNameWithType = tableNameWithType; + _idealStatePath = idealStatePath; + _externalViewPath = externalViewPath; _segmentPreSelector = segmentPreSelector; _segmentSelector = segmentSelector; _segmentPruners = segmentPruners; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org