This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f67b5aa  Cache ideal state and external view paths in RoutingEntry 
(#8296)
f67b5aa is described below

commit f67b5aacc5b494a2f5d78e87d67a84ea3aadc99a
Author: Richard Startin <rich...@startree.ai>
AuthorDate: Sun Mar 6 10:06:30 2022 +0000

    Cache ideal state and external view paths in RoutingEntry (#8296)
    
    * cache ideal state and external view paths in RoutingEntry
    
    * review comments
---
 .../pinot/broker/routing/RoutingManager.java       | 51 ++++++++++++++--------
 1 file changed, 33 insertions(+), 18 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
index ca14dd0..ef97aee 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
@@ -134,10 +134,9 @@ public class RoutingManager implements 
ClusterChangeHandler {
     List<String> idealStatePaths = new ArrayList<>(numTables);
     List<String> externalViewPaths = new ArrayList<>(numTables);
     for (Map.Entry<String, RoutingEntry> entry : _routingEntryMap.entrySet()) {
-      String tableNameWithType = entry.getKey();
       routingEntries.add(entry.getValue());
-      idealStatePaths.add(_idealStatePathPrefix + tableNameWithType);
-      externalViewPaths.add(_externalViewPathPrefix + tableNameWithType);
+      idealStatePaths.add(entry.getValue()._idealStatePath);
+      externalViewPaths.add(entry.getValue()._externalViewPath);
     }
     Stat[] idealStateStats = _zkDataAccessor.getStats(idealStatePaths, 
AccessOption.PERSISTENT);
     Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths, 
AccessOption.PERSISTENT);
@@ -154,13 +153,13 @@ public class RoutingManager implements 
ClusterChangeHandler {
           String tableNameWithType = routingEntry.getTableNameWithType();
           tablesToUpdate.add(tableNameWithType);
           try {
-            IdealState idealState = getIdealState(tableNameWithType);
+            IdealState idealState = 
getIdealState(routingEntry._idealStatePath);
             if (idealState == null) {
               LOGGER.warn("Failed to find ideal state for table: {}, skipping 
updating routing entry",
                   tableNameWithType);
               continue;
             }
-            ExternalView externalView = getExternalView(tableNameWithType);
+            ExternalView externalView = 
getExternalView(routingEntry._externalViewPath);
             if (externalView == null) {
               LOGGER.warn("Failed to find external view for table: {}, 
skipping updating routing entry",
                   tableNameWithType);
@@ -185,9 +184,9 @@ public class RoutingManager implements ClusterChangeHandler 
{
   }
 
   @Nullable
-  private IdealState getIdealState(String tableNameWithType) {
+  private IdealState getIdealState(String idealStatePath) {
     Stat stat = new Stat();
-    ZNRecord znRecord = _zkDataAccessor.get(_idealStatePathPrefix + 
tableNameWithType, stat, AccessOption.PERSISTENT);
+    ZNRecord znRecord = _zkDataAccessor.get(idealStatePath, stat, 
AccessOption.PERSISTENT);
     if (znRecord != null) {
       znRecord.setVersion(stat.getVersion());
       return new IdealState(znRecord);
@@ -197,9 +196,9 @@ public class RoutingManager implements ClusterChangeHandler 
{
   }
 
   @Nullable
-  private ExternalView getExternalView(String tableNameWithType) {
+  private ExternalView getExternalView(String externalViewPath) {
     Stat stat = new Stat();
-    ZNRecord znRecord = _zkDataAccessor.get(_externalViewPathPrefix + 
tableNameWithType, stat, AccessOption.PERSISTENT);
+    ZNRecord znRecord = _zkDataAccessor.get(externalViewPath, stat, 
AccessOption.PERSISTENT);
     if (znRecord != null) {
       znRecord.setVersion(stat.getVersion());
       return new ExternalView(znRecord);
@@ -303,11 +302,13 @@ public class RoutingManager implements 
ClusterChangeHandler {
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", tableNameWithType);
 
-    IdealState idealState = getIdealState(tableNameWithType);
+    String idealStatePath = getIdealStatePath(tableNameWithType);
+    IdealState idealState = getIdealState(idealStatePath);
     Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", tableNameWithType);
     int idealStateVersion = idealState.getRecord().getVersion();
 
-    ExternalView externalView = getExternalView(tableNameWithType);
+    String externalViewPath = getExternalViewPath(tableNameWithType);
+    ExternalView externalView = getExternalView(externalViewPath);
     int externalViewVersion;
     // NOTE: External view might be null for new created tables. In such case, 
create an empty one and set the version
     // to -1 to ensure the version does not match the next external view
@@ -355,11 +356,11 @@ public class RoutingManager implements 
ClusterChangeHandler {
         TableConfig offlineTableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
         Preconditions.checkState(offlineTableConfig != null, "Failed to find 
table config for table: %s",
             offlineTableName);
-        IdealState offlineTableIdealState = getIdealState(offlineTableName);
+        IdealState offlineTableIdealState = 
getIdealState(getIdealStatePath(offlineTableName));
         Preconditions.checkState(offlineTableIdealState != null, "Failed to 
find ideal state for table: %s",
             offlineTableName);
         // NOTE: External view might be null for new created tables. In such 
case, create an empty one.
-        ExternalView offlineTableExternalView = 
getExternalView(offlineTableName);
+        ExternalView offlineTableExternalView = 
getExternalView(getExternalViewPath(offlineTableName));
         if (offlineTableExternalView == null) {
           offlineTableExternalView = new ExternalView(offlineTableName);
         }
@@ -380,8 +381,9 @@ public class RoutingManager implements ClusterChangeHandler 
{
     Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() : 
null;
 
     RoutingEntry routingEntry =
-        new RoutingEntry(tableNameWithType, segmentPreSelector, 
segmentSelector, segmentPruners, instanceSelector,
-            idealStateVersion, externalViewVersion, timeBoundaryManager, 
queryTimeoutMs);
+        new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath, 
segmentPreSelector, segmentSelector,
+            segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, timeBoundaryManager,
+            queryTimeoutMs);
     if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
       LOGGER.info("Built routing for table: {}", tableNameWithType);
     } else {
@@ -477,6 +479,14 @@ public class RoutingManager implements 
ClusterChangeHandler {
     return new RoutingTable(serverInstanceToSegmentsMap, 
selectionResult.getUnavailableSegments());
   }
 
+  private String getIdealStatePath(String tableNameWithType) {
+    return _idealStatePathPrefix + tableNameWithType;
+  }
+
+  private String getExternalViewPath(String tableNameWithType) {
+    return _externalViewPathPrefix + tableNameWithType;
+  }
+
   /**
    * Returns the time boundary info for the given offline table, or {@code 
null} if the routing or time boundary does
    * not exist.
@@ -504,6 +514,8 @@ public class RoutingManager implements ClusterChangeHandler 
{
 
   private static class RoutingEntry {
     final String _tableNameWithType;
+    final String _idealStatePath;
+    final String _externalViewPath;
     final SegmentPreSelector _segmentPreSelector;
     final SegmentSelector _segmentSelector;
     final List<SegmentPruner> _segmentPruners;
@@ -516,11 +528,14 @@ public class RoutingManager implements 
ClusterChangeHandler {
     // Time boundary manager is only available for the offline part of the 
hybrid table
     transient TimeBoundaryManager _timeBoundaryManager;
 
-    RoutingEntry(String tableNameWithType, SegmentPreSelector 
segmentPreSelector, SegmentSelector segmentSelector,
-        List<SegmentPruner> segmentPruners, InstanceSelector instanceSelector, 
int lastUpdateIdealStateVersion,
-        int lastUpdateExternalViewVersion, @Nullable TimeBoundaryManager 
timeBoundaryManager,
+    RoutingEntry(String tableNameWithType, String idealStatePath, String 
externalViewPath,
+        SegmentPreSelector segmentPreSelector, SegmentSelector 
segmentSelector, List<SegmentPruner> segmentPruners,
+        InstanceSelector instanceSelector, int lastUpdateIdealStateVersion, 
int lastUpdateExternalViewVersion,
+        @Nullable TimeBoundaryManager timeBoundaryManager,
         @Nullable Long queryTimeoutMs) {
       _tableNameWithType = tableNameWithType;
+      _idealStatePath = idealStatePath;
+      _externalViewPath = externalViewPath;
       _segmentPreSelector = segmentPreSelector;
       _segmentSelector = segmentSelector;
       _segmentPruners = segmentPruners;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to