This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 48deafdf94 create segment zk metadata cache (#10455)
48deafdf94 is described below

commit 48deafdf944fd320ba3663ee9579c353e1d03c93
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Mar 31 07:39:02 2023 -0700

    create segment zk metadata cache (#10455)
    
    * [partition] add partition routing to routing manager
    
    - add SegmentZkMetadataFetchListener interface
    - maintain a seen list of onlineSegments so not pull ZK when unnecessary
    - when no pruner registered, do not pull ZK
    - refactor Segment ZNRecord refresher
    - adding tests
    
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/broker/routing/BrokerRoutingManager.java |  31 ++--
 .../SegmentZkMetadataFetchListener.java}           |  25 +--
 .../segmentmetadata/SegmentZkMetadataFetcher.java  | 122 ++++++++++++++
 .../routing/segmentpruner/EmptySegmentPruner.java  |  40 ++---
 .../MultiPartitionColumnsSegmentPruner.java        |  43 ++---
 .../routing/segmentpruner/SegmentPruner.java       |  24 +--
 .../segmentpruner/SegmentPrunerFactory.java        |  33 +++-
 .../SinglePartitionColumnSegmentPruner.java        |  42 ++---
 .../routing/segmentpruner/TimeSegmentPruner.java   |  55 ++-----
 .../SegmentZkMetadataFetcherTest.java              | 175 +++++++++++++++++++++
 .../routing/segmentpruner/SegmentPrunerTest.java   |  86 ++++++----
 11 files changed, 467 insertions(+), 209 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 883e9cfb02..93edc45f52 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -43,6 +43,8 @@ import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSele
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelectorFactory;
 import org.apache.pinot.broker.routing.instanceselector.InstanceSelector;
 import 
org.apache.pinot.broker.routing.instanceselector.InstanceSelectorFactory;
+import 
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetchListener;
+import 
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetcher;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
 import 
org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelectorFactory;
 import org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
@@ -430,10 +432,10 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     Set<String> preSelectedOnlineSegments = 
segmentPreSelector.preSelect(onlineSegments);
     SegmentSelector segmentSelector = 
SegmentSelectorFactory.getSegmentSelector(tableConfig);
     segmentSelector.init(idealState, externalView, preSelectedOnlineSegments);
+
+    // Register segment pruners and initialize segment zk metadata fetcher.
     List<SegmentPruner> segmentPruners = 
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    for (SegmentPruner segmentPruner : segmentPruners) {
-      segmentPruner.init(idealState, externalView, preSelectedOnlineSegments);
-    }
+
     AdaptiveServerSelector adaptiveServerSelector =
         
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
 _pinotConfig);
     InstanceSelector instanceSelector =
@@ -488,10 +490,16 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     QueryConfig queryConfig = tableConfig.getQueryConfig();
     Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() : 
null;
 
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(tableNameWithType, _propertyStore);
+    for (SegmentZkMetadataFetchListener listener : segmentPruners) {
+      segmentZkMetadataFetcher.register(listener);
+    }
+    segmentZkMetadataFetcher.init(idealState, externalView, 
preSelectedOnlineSegments);
+
     RoutingEntry routingEntry =
         new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath, 
segmentPreSelector, segmentSelector,
-            segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, timeBoundaryManager,
-            queryTimeoutMs);
+            segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
+            timeBoundaryManager, queryTimeoutMs);
     if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
       LOGGER.info("Built routing for table: {}", tableNameWithType);
     } else {
@@ -638,6 +646,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     final List<SegmentPruner> _segmentPruners;
     final InstanceSelector _instanceSelector;
     final Long _queryTimeoutMs;
+    final SegmentZkMetadataFetcher _segmentZkMetadataFetcher;
 
     // Cache IdealState and ExternalView version for the last update
     transient int _lastUpdateIdealStateVersion;
@@ -648,7 +657,8 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     RoutingEntry(String tableNameWithType, String idealStatePath, String 
externalViewPath,
         SegmentPreSelector segmentPreSelector, SegmentSelector 
segmentSelector, List<SegmentPruner> segmentPruners,
         InstanceSelector instanceSelector, int lastUpdateIdealStateVersion, 
int lastUpdateExternalViewVersion,
-        @Nullable TimeBoundaryManager timeBoundaryManager, @Nullable Long 
queryTimeoutMs) {
+        SegmentZkMetadataFetcher segmentZkMetadataFetcher, @Nullable 
TimeBoundaryManager timeBoundaryManager,
+        @Nullable Long queryTimeoutMs) {
       _tableNameWithType = tableNameWithType;
       _idealStatePath = idealStatePath;
       _externalViewPath = externalViewPath;
@@ -660,6 +670,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
       _lastUpdateExternalViewVersion = lastUpdateExternalViewVersion;
       _timeBoundaryManager = timeBoundaryManager;
       _queryTimeoutMs = queryTimeoutMs;
+      _segmentZkMetadataFetcher = segmentZkMetadataFetcher;
     }
 
     String getTableNameWithType() {
@@ -693,10 +704,8 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     void onAssignmentChange(IdealState idealState, ExternalView externalView) {
       Set<String> onlineSegments = getOnlineSegments(idealState);
       Set<String> preSelectedOnlineSegments = 
_segmentPreSelector.preSelect(onlineSegments);
+      _segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
preSelectedOnlineSegments);
       _segmentSelector.onAssignmentChange(idealState, externalView, 
preSelectedOnlineSegments);
-      for (SegmentPruner segmentPruner : _segmentPruners) {
-        segmentPruner.onAssignmentChange(idealState, externalView, 
preSelectedOnlineSegments);
-      }
       _instanceSelector.onAssignmentChange(idealState, externalView, 
preSelectedOnlineSegments);
       if (_timeBoundaryManager != null) {
         _timeBoundaryManager.onAssignmentChange(idealState, externalView, 
preSelectedOnlineSegments);
@@ -710,9 +719,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     }
 
     void refreshSegment(String segment) {
-      for (SegmentPruner segmentPruner : _segmentPruners) {
-        segmentPruner.refreshSegment(segment);
-      }
+      _segmentZkMetadataFetcher.refreshSegment(segment);
       if (_timeBoundaryManager != null) {
         _timeBoundaryManager.refreshSegment(segment);
       }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetchListener.java
similarity index 68%
copy from 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
copy to 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetchListener.java
index 5893e6bd92..138291089d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetchListener.java
@@ -16,40 +16,41 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.routing.segmentpruner;
+package org.apache.pinot.broker.routing.segmentmetadata;
 
+import java.util.List;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.request.BrokerRequest;
 
 
 /**
- * The segment pruner prunes the selected segments based on the query.
+ * Interface to register with {@link SegmentZkMetadataFetcher}.
+ *
+ * <p>When registered, SegmentZKMetadataFetcher will fetch {@link ZNRecord} 
for associated {@code onlineSegments} list
+ * or refreshed {@code segment}. Thus batch up all ZK access for segment 
metadata.
  */
-public interface SegmentPruner {
+public interface SegmentZkMetadataFetchListener {
 
   /**
    * Initializes the segment pruner with the ideal state, external view and 
online segments (segments with
    * ONLINE/CONSUMING instances in the ideal state and pre-selected by the 
{@link SegmentPreSelector}). Should be called
    * only once before calling other methods.
    */
-  void init(IdealState idealState, ExternalView externalView, Set<String> 
onlineSegments);
+  void init(IdealState idealState, ExternalView externalView, List<String> 
onlineSegments, List<ZNRecord> znRecords);
 
   /**
    * Processes the segment assignment (ideal state or external view) change 
based on the given online segments (segments
    * with ONLINE/CONSUMING instances in the ideal state and pre-selected by 
the {@link SegmentPreSelector}).
    */
-  void onAssignmentChange(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments);
+  void onAssignmentChange(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments,
+      List<String> pulledSegments, List<ZNRecord> znRecords);
 
   /**
    * Refreshes the metadata for the given segment (called when segment is 
getting refreshed).
    */
-  void refreshSegment(String segment);
-
-  /**
-   * Prunes the segments queried by the given broker request, returns the 
selected segments to be queried.
-   */
-  Set<String> prune(BrokerRequest brokerRequest, Set<String> segments);
+  void refreshSegment(String segment, @Nullable ZNRecord znRecord);
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java
new file mode 100644
index 0000000000..de9ec0a699
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.segmentmetadata;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.AccessOption;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+
+
+/**
+ * {@code SegmentZkMetadataFetcher} is used to cache {@link ZNRecord} stored 
in {@link ZkHelixPropertyStore} for
+ * segments.
+ */
+public class SegmentZkMetadataFetcher {
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final String _segmentZKMetadataPathPrefix;
+  private final List<SegmentZkMetadataFetchListener> _listeners;
+  private final Set<String> _onlineSegmentsCached;
+
+  private boolean _initialized;
+
+  public SegmentZkMetadataFetcher(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _tableNameWithType = tableNameWithType;
+    _propertyStore = propertyStore;
+    _segmentZKMetadataPathPrefix = 
ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + 
"/";
+    _listeners = new ArrayList<>();
+    _onlineSegmentsCached = new HashSet<>();
+    _initialized = false;
+  }
+
+  public void init(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments) {
+    if (!_initialized) {
+      _initialized = true;
+      if (!_listeners.isEmpty()) {
+        // Bulk load partition info for all online segments
+        int numSegments = onlineSegments.size();
+        List<String> segments = new ArrayList<>(numSegments);
+        List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
+        for (String segment : onlineSegments) {
+          segments.add(segment);
+          segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
+        }
+        _onlineSegmentsCached.addAll(onlineSegments);
+        List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, 
null, AccessOption.PERSISTENT, false);
+        for (SegmentZkMetadataFetchListener listener : _listeners) {
+          listener.init(idealState, externalView, segments, znRecords);
+        }
+      }
+    } else {
+      throw new RuntimeException("Segment zk metadata fetcher has already been 
initialized!");
+    }
+  }
+
+  public void register(SegmentZkMetadataFetchListener listener) {
+    if (!_initialized) {
+      _listeners.add(listener);
+    } else {
+      throw new RuntimeException("Segment zk metadata fetcher has already been 
initialized! "
+          + "Unable to register more listeners.");
+    }
+  }
+
+  public List<SegmentZkMetadataFetchListener> getListeners() {
+    return _listeners;
+  }
+
+  public synchronized void onAssignmentChange(IdealState idealState, 
ExternalView externalView,
+      Set<String> onlineSegments) {
+    if (!_listeners.isEmpty()) {
+      int numSegments = onlineSegments.size();
+      List<String> segments = new ArrayList<>(numSegments);
+      List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
+
+      for (String segment : onlineSegments) {
+        if (_onlineSegmentsCached.add(segment)) {
+          segments.add(segment);
+          segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
+        }
+      }
+      _onlineSegmentsCached.addAll(onlineSegments);
+      List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, 
null, AccessOption.PERSISTENT, false);
+      for (SegmentZkMetadataFetchListener listener : _listeners) {
+        listener.onAssignmentChange(idealState, externalView, onlineSegments, 
segments, znRecords);
+      }
+      _onlineSegmentsCached.retainAll(onlineSegments);
+    }
+  }
+
+  public synchronized void refreshSegment(String segment) {
+    if (!_listeners.isEmpty()) {
+      ZNRecord znRecord = _propertyStore.get(_segmentZKMetadataPathPrefix + 
segment, null, AccessOption.PERSISTENT);
+      for (SegmentZkMetadataFetchListener listener : _listeners) {
+        listener.refreshSegment(segment, znRecord);
+      }
+      _onlineSegmentsCached.add(segment);
+    }
+  }
+}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
index 7a7b66b086..aeb6a01f2e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
@@ -18,18 +18,14 @@
  */
 package org.apache.pinot.broker.routing.segmentpruner;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
-import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -45,35 +41,23 @@ public class EmptySegmentPruner implements SegmentPruner {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(EmptySegmentPruner.class);
 
   private final String _tableNameWithType;
-  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final String _segmentZKMetadataPathPrefix;
 
   private final Set<String> _segmentsLoaded = new HashSet<>();
   private final Set<String> _emptySegments = ConcurrentHashMap.newKeySet();
 
   private volatile ResultCache _resultCache;
 
-  public EmptySegmentPruner(TableConfig tableConfig, 
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  public EmptySegmentPruner(TableConfig tableConfig) {
     _tableNameWithType = tableConfig.getTableName();
-    _propertyStore = propertyStore;
-    _segmentZKMetadataPathPrefix = 
ZKMetadataProvider.constructPropertyStorePathForResource(_tableNameWithType) + 
"/";
   }
 
   @Override
-  public void init(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments) {
+  public void init(IdealState idealState, ExternalView externalView, 
List<String> onlineSegments,
+      List<ZNRecord> znRecords) {
     // Bulk load info for all online segments
-    int numSegments = onlineSegments.size();
-    List<String> segments = new ArrayList<>(numSegments);
-    List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
-    for (String segment : onlineSegments) {
-      segments.add(segment);
-      segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
-    }
-    _segmentsLoaded.addAll(segments);
-    List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, 
null, AccessOption.PERSISTENT, false);
-    for (int i = 0; i < numSegments; i++) {
-      String segment = segments.get(i);
-      if (isEmpty(segment, znRecords.get(i))) {
+    for (int idx = 0; idx < onlineSegments.size(); idx++) {
+      String segment = onlineSegments.get(idx);
+      if (isEmpty(segment, znRecords.get(idx))) {
         _emptySegments.add(segment);
       }
     }
@@ -81,14 +65,14 @@ public class EmptySegmentPruner implements SegmentPruner {
 
   @Override
   public synchronized void onAssignmentChange(IdealState idealState, 
ExternalView externalView,
-      Set<String> onlineSegments) {
+      Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> 
znRecords) {
     // NOTE: We don't update all the segment ZK metadata for every external 
view change, but only the new added/removed
     //       ones. The refreshed segment ZK metadata change won't be picked up.
     boolean emptySegmentsChanged = false;
-    for (String segment : onlineSegments) {
+    for (int idx = 0; idx < pulledSegments.size(); idx++) {
+      String segment = pulledSegments.get(idx);
       if (_segmentsLoaded.add(segment)) {
-        if (isEmpty(segment,
-            _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT))) {
+        if (isEmpty(segment, znRecords.get(idx))) {
           emptySegmentsChanged |= _emptySegments.add(segment);
         }
       }
@@ -103,9 +87,9 @@ public class EmptySegmentPruner implements SegmentPruner {
   }
 
   @Override
-  public synchronized void refreshSegment(String segment) {
+  public synchronized void refreshSegment(String segment, @Nullable ZNRecord 
znRecord) {
     _segmentsLoaded.add(segment);
-    if (isEmpty(segment, _propertyStore.get(_segmentZKMetadataPathPrefix + 
segment, null, AccessOption.PERSISTENT))) {
+    if (isEmpty(segment, znRecord)) {
       if (_emptySegments.add(segment)) {
         // Reset the result cache when empty segments changed
         _resultCache = null;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
index dd4edb686b..2f3216cbf2 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.broker.routing.segmentpruner;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -28,12 +27,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
-import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.Expression;
@@ -58,33 +54,21 @@ public class MultiPartitionColumnsSegmentPruner implements 
SegmentPruner {
 
   private final String _tableNameWithType;
   private final Set<String> _partitionColumns;
-  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final String _segmentZKMetadataPathPrefix;
   private final Map<String, Map<String, PartitionInfo>> 
_segmentColumnPartitionInfoMap = new ConcurrentHashMap<>();
 
-  public MultiPartitionColumnsSegmentPruner(String tableNameWithType, 
Set<String> partitionColumns,
-      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  public MultiPartitionColumnsSegmentPruner(String tableNameWithType, 
Set<String> partitionColumns) {
     _tableNameWithType = tableNameWithType;
     _partitionColumns = partitionColumns;
-    _propertyStore = propertyStore;
-    _segmentZKMetadataPathPrefix = 
ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + 
"/";
   }
 
   @Override
-  public void init(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments) {
+  public void init(IdealState idealState, ExternalView externalView, 
List<String> onlineSegments,
+      List<ZNRecord> znRecords) {
     // Bulk load partition info for all online segments
-    int numSegments = onlineSegments.size();
-    List<String> segments = new ArrayList<>(numSegments);
-    List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
-    for (String segment : onlineSegments) {
-      segments.add(segment);
-      segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
-    }
-    List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, 
null, AccessOption.PERSISTENT, false);
-    for (int i = 0; i < numSegments; i++) {
-      String segment = segments.get(i);
+    for (int idx = 0; idx < onlineSegments.size(); idx++) {
+      String segment = onlineSegments.get(idx);
       Map<String, PartitionInfo> columnPartitionInfoMap =
-          extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment, 
znRecords.get(i));
+          extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment, 
znRecords.get(idx));
       if (columnPartitionInfoMap != null) {
         _segmentColumnPartitionInfoMap.put(segment, columnPartitionInfoMap);
       }
@@ -143,21 +127,22 @@ public class MultiPartitionColumnsSegmentPruner 
implements SegmentPruner {
 
   @Override
   public synchronized void onAssignmentChange(IdealState idealState, 
ExternalView externalView,
-      Set<String> onlineSegments) {
+      Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> 
znRecords) {
     // NOTE: We don't update all the segment ZK metadata for every external 
view change, but only the new added/removed
     //       ones. The refreshed segment ZK metadata change won't be picked up.
-    for (String segment : onlineSegments) {
+    for (int idx = 0; idx < pulledSegments.size(); idx++) {
+      String segment = pulledSegments.get(idx);
+      ZNRecord znRecord = znRecords.get(idx);
       _segmentColumnPartitionInfoMap.computeIfAbsent(segment,
-          k -> extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(k,
-              _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, 
AccessOption.PERSISTENT)));
+          k -> extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(k, 
znRecord));
     }
     _segmentColumnPartitionInfoMap.keySet().retainAll(onlineSegments);
   }
 
   @Override
-  public synchronized void refreshSegment(String segment) {
-    Map<String, PartitionInfo> columnPartitionInfo = 
extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment,
-        _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT));
+  public synchronized void refreshSegment(String segment, @Nullable ZNRecord 
znRecord) {
+    Map<String, PartitionInfo> columnPartitionInfo =
+        extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment, 
znRecord);
     if (columnPartitionInfo != null) {
       _segmentColumnPartitionInfoMap.put(segment, columnPartitionInfo);
     } else {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
index 5893e6bd92..17e92e5c15 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
@@ -19,34 +19,14 @@
 package org.apache.pinot.broker.routing.segmentpruner;
 
 import java.util.Set;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
+import 
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetchListener;
 import org.apache.pinot.common.request.BrokerRequest;
 
 
 /**
  * The segment pruner prunes the selected segments based on the query.
  */
-public interface SegmentPruner {
-
-  /**
-   * Initializes the segment pruner with the ideal state, external view and 
online segments (segments with
-   * ONLINE/CONSUMING instances in the ideal state and pre-selected by the 
{@link SegmentPreSelector}). Should be called
-   * only once before calling other methods.
-   */
-  void init(IdealState idealState, ExternalView externalView, Set<String> 
onlineSegments);
-
-  /**
-   * Processes the segment assignment (ideal state or external view) change 
based on the given online segments (segments
-   * with ONLINE/CONSUMING instances in the ideal state and pre-selected by 
the {@link SegmentPreSelector}).
-   */
-  void onAssignmentChange(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments);
-
-  /**
-   * Refreshes the metadata for the given segment (called when segment is 
getting refreshed).
-   */
-  void refreshSegment(String segment);
+public interface SegmentPruner extends SegmentZkMetadataFetchListener {
 
   /**
    * Prunes the segments queried by the given broker request, returns the 
selected segments to be queried.
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
index f34f55f3c3..6135982e18 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.broker.routing.segmentpruner;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -26,6 +28,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.collections.MapUtils;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -33,6 +36,9 @@ import 
org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +58,7 @@ public class SegmentPrunerFactory {
     boolean needsEmptySegment = 
TableConfigUtils.needsEmptySegmentPruner(tableConfig);
     if (needsEmptySegment) {
       // Add EmptySegmentPruner if needed
-      segmentPruners.add(new EmptySegmentPruner(tableConfig, propertyStore));
+      segmentPruners.add(new EmptySegmentPruner(tableConfig));
     }
 
     RoutingConfig routingConfig = tableConfig.getRoutingConfig();
@@ -113,8 +119,8 @@ public class SegmentPrunerFactory {
     LOGGER.info("Using PartitionSegmentPruner on partition columns: {} for 
table: {}", partitionColumns,
         tableNameWithType);
     return partitionColumns.size() == 1 ? new 
SinglePartitionColumnSegmentPruner(tableNameWithType,
-        partitionColumns.iterator().next(), propertyStore)
-        : new MultiPartitionColumnsSegmentPruner(tableNameWithType, 
partitionColumns, propertyStore);
+        partitionColumns.iterator().next())
+        : new MultiPartitionColumnsSegmentPruner(tableNameWithType, 
partitionColumns);
   }
 
   @Nullable
@@ -131,9 +137,26 @@ public class SegmentPrunerFactory {
       LOGGER.warn("Cannot enable time range pruning without time column for 
table: {}", tableNameWithType);
       return null;
     }
+    return createTimeSegmentPruner(tableConfig, propertyStore);
+  }
+
+  @VisibleForTesting
+  static TimeSegmentPruner createTimeSegmentPruner(TableConfig tableConfig,
+      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    String tableNameWithType = tableConfig.getTableName();
+    String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+    Preconditions.checkNotNull(timeColumn, "Time column must be configured in 
table config for table: %s",
+        tableNameWithType);
+    Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, 
tableNameWithType);
+    Preconditions.checkNotNull(schema, "Failed to find schema for table: %s", 
tableNameWithType);
+    DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn);
+    Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in 
schema for time column: %s of table: %s",
+        timeColumn, tableNameWithType);
+    DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec();
 
-    LOGGER.info("Using TimeRangePruner on time column: {} for table: {}", 
timeColumn, tableNameWithType);
-    return new TimeSegmentPruner(tableConfig, propertyStore);
+    LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with 
DateTimeFormatSpec: {}",
+        timeColumn, tableNameWithType, timeFormatSpec);
+    return new TimeSegmentPruner(tableConfig, timeColumn, timeFormatSpec);
   }
 
   private static List<SegmentPruner> sortSegmentPruners(List<SegmentPruner> 
pruners) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
index 2c75782ef3..2959537706 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
@@ -18,19 +18,15 @@
  */
 package org.apache.pinot.broker.routing.segmentpruner;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
-import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.Expression;
@@ -55,32 +51,20 @@ public class SinglePartitionColumnSegmentPruner implements 
SegmentPruner {
 
   private final String _tableNameWithType;
   private final String _partitionColumn;
-  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final String _segmentZKMetadataPathPrefix;
   private final Map<String, PartitionInfo> _partitionInfoMap = new 
ConcurrentHashMap<>();
 
-  public SinglePartitionColumnSegmentPruner(String tableNameWithType, String 
partitionColumn,
-      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  public SinglePartitionColumnSegmentPruner(String tableNameWithType, String 
partitionColumn) {
     _tableNameWithType = tableNameWithType;
     _partitionColumn = partitionColumn;
-    _propertyStore = propertyStore;
-    _segmentZKMetadataPathPrefix = 
ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + 
"/";
   }
 
   @Override
-  public void init(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments) {
+  public void init(IdealState idealState, ExternalView externalView, 
List<String> onlineSegments,
+      List<ZNRecord> znRecords) {
     // Bulk load partition info for all online segments
-    int numSegments = onlineSegments.size();
-    List<String> segments = new ArrayList<>(numSegments);
-    List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
-    for (String segment : onlineSegments) {
-      segments.add(segment);
-      segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
-    }
-    List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, 
null, AccessOption.PERSISTENT, false);
-    for (int i = 0; i < numSegments; i++) {
-      String segment = segments.get(i);
-      PartitionInfo partitionInfo = 
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
+    for (int idx = 0; idx < onlineSegments.size(); idx++) {
+      String segment = onlineSegments.get(idx);
+      PartitionInfo partitionInfo = 
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(idx));
       if (partitionInfo != null) {
         _partitionInfoMap.put(segment, partitionInfo);
       }
@@ -129,20 +113,20 @@ public class SinglePartitionColumnSegmentPruner 
implements SegmentPruner {
 
   @Override
   public synchronized void onAssignmentChange(IdealState idealState, 
ExternalView externalView,
-      Set<String> onlineSegments) {
+      Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> 
znRecords) {
     // NOTE: We don't update all the segment ZK metadata for every external 
view change, but only the new added/removed
     //       ones. The refreshed segment ZK metadata change won't be picked up.
-    for (String segment : onlineSegments) {
-      _partitionInfoMap.computeIfAbsent(segment, k -> 
extractPartitionInfoFromSegmentZKMetadataZNRecord(k,
-          _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, 
AccessOption.PERSISTENT)));
+    for (int idx = 0; idx < pulledSegments.size(); idx++) {
+      String segment = pulledSegments.get(idx);
+      ZNRecord znRecord = znRecords.get(idx);
+      _partitionInfoMap.computeIfAbsent(segment, k -> 
extractPartitionInfoFromSegmentZKMetadataZNRecord(k, znRecord));
     }
     _partitionInfoMap.keySet().retainAll(onlineSegments);
   }
 
   @Override
-  public synchronized void refreshSegment(String segment) {
-    PartitionInfo partitionInfo = 
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment,
-        _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT));
+  public synchronized void refreshSegment(String segment, @Nullable ZNRecord 
znRecord) {
+    PartitionInfo partitionInfo = 
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecord);
     if (partitionInfo != null) {
       _partitionInfoMap.put(segment, partitionInfo);
     } else {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
index c123c65a67..a7ac4fce4b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.broker.routing.segmentpruner;
 
-import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,22 +28,17 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.segmentpruner.interval.Interval;
 import org.apache.pinot.broker.routing.segmentpruner.interval.IntervalTree;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.Function;
 import org.apache.pinot.common.request.Identifier;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Query.Range;
 import org.apache.pinot.sql.FilterKind;
@@ -64,44 +58,25 @@ public class TimeSegmentPruner implements SegmentPruner {
   private static final Interval DEFAULT_INTERVAL = new 
Interval(MIN_START_TIME, MAX_END_TIME);
 
   private final String _tableNameWithType;
-  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final String _segmentZKMetadataPathPrefix;
   private final String _timeColumn;
   private final DateTimeFormatSpec _timeFormatSpec;
 
   private volatile IntervalTree<String> _intervalTree;
   private final Map<String, Interval> _intervalMap = new HashMap<>();
 
-  public TimeSegmentPruner(TableConfig tableConfig, 
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  public TimeSegmentPruner(TableConfig tableConfig, String timeColumn, 
DateTimeFormatSpec timeFormatSpec) {
     _tableNameWithType = tableConfig.getTableName();
-    _propertyStore = propertyStore;
-    _segmentZKMetadataPathPrefix = 
ZKMetadataProvider.constructPropertyStorePathForResource(_tableNameWithType) + 
"/";
-    _timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
-    Preconditions.checkNotNull(_timeColumn, "Time column must be configured in 
table config for table: %s",
-        _tableNameWithType);
-
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-    Preconditions.checkNotNull(schema, "Failed to find schema for table: %s", 
_tableNameWithType);
-    DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(_timeColumn);
-    Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in 
schema for time column: %s of table: %s",
-        _timeColumn, _tableNameWithType);
-    _timeFormatSpec = dateTimeSpec.getFormatSpec();
+    _timeColumn = timeColumn;
+    _timeFormatSpec = timeFormatSpec;
   }
 
   @Override
-  public void init(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments) {
+  public void init(IdealState idealState, ExternalView externalView, 
List<String> onlineSegments,
+      List<ZNRecord> znRecords) {
     // Bulk load time info for all online segments
-    int numSegments = onlineSegments.size();
-    List<String> segments = new ArrayList<>(numSegments);
-    List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
-    for (String segment : onlineSegments) {
-      segments.add(segment);
-      segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
-    }
-    List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, 
null, AccessOption.PERSISTENT, false);
-    for (int i = 0; i < numSegments; i++) {
-      String segment = segments.get(i);
-      Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment, 
znRecords.get(i));
+    for (int idx = 0; idx < onlineSegments.size(); idx++) {
+      String segment = onlineSegments.get(idx);
+      Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment, 
znRecords.get(idx));
       _intervalMap.put(segment, interval);
     }
     _intervalTree = new IntervalTree<>(_intervalMap);
@@ -128,21 +103,21 @@ public class TimeSegmentPruner implements SegmentPruner {
 
   @Override
   public synchronized void onAssignmentChange(IdealState idealState, 
ExternalView externalView,
-      Set<String> onlineSegments) {
+      Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> 
znRecords) {
     // NOTE: We don't update all the segment ZK metadata for every external 
view change, but only the new added/removed
     //       ones. The refreshed segment ZK metadata change won't be picked up.
-    for (String segment : onlineSegments) {
-      _intervalMap.computeIfAbsent(segment, k -> 
extractIntervalFromSegmentZKMetaZNRecord(k,
-          _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, 
AccessOption.PERSISTENT)));
+    for (int idx = 0; idx < pulledSegments.size(); idx++) {
+      String segment = pulledSegments.get(idx);
+      ZNRecord zNrecord = znRecords.get(idx);
+      _intervalMap.computeIfAbsent(segment, k -> 
extractIntervalFromSegmentZKMetaZNRecord(k, zNrecord));
     }
     _intervalMap.keySet().retainAll(onlineSegments);
     _intervalTree = new IntervalTree<>(_intervalMap);
   }
 
   @Override
-  public synchronized void refreshSegment(String segment) {
-    Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment,
-        _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT));
+  public synchronized void refreshSegment(String segment, @Nullable ZNRecord 
znRecord) {
+    Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment, 
znRecord);
     _intervalMap.put(segment, interval);
     _intervalTree = new IntervalTree<>(_intervalMap);
   }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcherTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcherTest.java
new file mode 100644
index 0000000000..a63811dccd
--- /dev/null
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcherTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.segmentmetadata;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.fail;
+
+
+public class SegmentZkMetadataFetcherTest extends ControllerTest {
+  private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE";
+
+  @Test
+  public void 
testSegmentZkMetadataFetcherShouldNotAllowIncorrectRegisterOrInitBehavior() {
+    ZkHelixPropertyStore<ZNRecord> mockPropertyStore = 
Mockito.mock(ZkHelixPropertyStore.class);
+    IdealState idealState = Mockito.mock(IdealState.class);
+    ExternalView externalView = Mockito.mock(ExternalView.class);
+
+    // empty listener at beginning
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME,
+        mockPropertyStore);
+    assertEquals(segmentZkMetadataFetcher.getListeners().size(), 0);
+
+    // should allow register new listener
+    
segmentZkMetadataFetcher.register(mock(SegmentZkMetadataFetchListener.class));
+    assertEquals(segmentZkMetadataFetcher.getListeners().size(), 1);
+
+    // should not allow register new listener once initialized
+    segmentZkMetadataFetcher.init(idealState, externalView, 
Collections.singleton("foo"));
+    try {
+      
segmentZkMetadataFetcher.register(mock(SegmentZkMetadataFetchListener.class));
+      fail();
+    } catch (RuntimeException rte) {
+      assertTrue(rte.getMessage().contains("has already been initialized"));
+    }
+
+    // should not allow duplicate init either
+    try {
+      segmentZkMetadataFetcher.init(idealState, externalView, 
Collections.singleton("foo"));
+      fail();
+    } catch (RuntimeException rte) {
+      assertTrue(rte.getMessage().contains("has already been initialized"));
+    }
+  }
+
+  @Test
+  public void 
testSegmentZkMetadataFetcherShouldNotPullZkWhenNoPrunerRegistered() {
+    ZkHelixPropertyStore<ZNRecord> mockPropertyStore = 
Mockito.mock(ZkHelixPropertyStore.class);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME,
+        mockPropertyStore);
+    // NOTE: Ideal state and external view are not used in the current 
implementation
+    IdealState idealState = Mockito.mock(IdealState.class);
+    ExternalView externalView = Mockito.mock(ExternalView.class);
+
+    assertEquals(segmentZkMetadataFetcher.getListeners().size(), 0);
+    segmentZkMetadataFetcher.init(idealState, externalView, 
Collections.singleton("foo"));
+    Mockito.verify(mockPropertyStore, times(0)).get(any(), any(), anyInt(), 
anyBoolean());
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
Collections.singleton("foo"));
+    Mockito.verify(mockPropertyStore, times(0)).get(any(), any(), anyInt(), 
anyBoolean());
+    segmentZkMetadataFetcher.refreshSegment("foo");
+    Mockito.verify(mockPropertyStore, times(0)).get(any(), any(), anyInt(), 
anyBoolean());
+  }
+
+  @Test
+  public void 
testSegmentZkMetadataFetcherShouldPullZkOnlyOncePerSegmentWhenMultiplePrunersRegistered()
 {
+    ZkHelixPropertyStore<ZNRecord> mockPropertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(mockPropertyStore.get(any(), any(), anyInt(), 
anyBoolean())).thenAnswer(inv -> {
+      List<String> pathList = inv.getArgument(0);
+      List<ZNRecord> result = new ArrayList<>(pathList.size());
+      for (String path : pathList) {
+        String[] pathParts = path.split("/");
+        String segmentName = pathParts[pathParts.length - 1];
+        SegmentZKMetadata fakeSegmentZkMetadata = new 
SegmentZKMetadata(segmentName);
+        result.add(fakeSegmentZkMetadata.toZNRecord());
+      }
+      return result;
+    });
+    SegmentPruner pruner1 = mock(SegmentPruner.class);
+    SegmentPruner pruner2 = mock(SegmentPruner.class);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME,
+        mockPropertyStore);
+    segmentZkMetadataFetcher.register(pruner1);
+    segmentZkMetadataFetcher.register(pruner2);
+    // NOTE: Ideal state and external view are not used in the current 
implementation
+    IdealState idealState = mock(IdealState.class);
+    ExternalView externalView = mock(ExternalView.class);
+
+    assertEquals(segmentZkMetadataFetcher.getListeners().size(), 2);
+    // should call property store once for "foo" and "bar" as a batch
+    segmentZkMetadataFetcher.init(idealState, externalView, 
ImmutableSet.of("foo", "bar"));
+    verify(mockPropertyStore, times(1)).get(argThat(new ListMatcher("foo", 
"bar")), any(), anyInt(), anyBoolean());
+    verify(pruner1, times(1)).init(any(), any(), argThat(new 
ListMatcher("foo", "bar")), any());
+    verify(pruner2, times(1)).init(any(), any(), argThat(new 
ListMatcher("foo", "bar")), any());
+
+    // should call property store only once b/c "alice" was missing
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
ImmutableSet.of("bar", "alice"));
+    verify(mockPropertyStore, times(1)).get(argThat(new ListMatcher("alice")), 
any(), anyInt(), anyBoolean());
+    verify(pruner1, times(1)).onAssignmentChange(any(), any(), any(), 
argThat(new ListMatcher("alice")), any());
+    verify(pruner2, times(1)).onAssignmentChange(any(), any(), any(), 
argThat(new ListMatcher("alice")), any());
+
+    // should call property store once more b/c "foo" was cleared when 
onAssignmentChange called with "bar" and "alice"
+    segmentZkMetadataFetcher.refreshSegment("foo");
+    verify(mockPropertyStore, times(1)).get(endsWith("foo"), any(), anyInt());
+    verify(pruner1, times(1)).refreshSegment(eq("foo"), any());
+    verify(pruner2, times(1)).refreshSegment(eq("foo"), any());
+    clearInvocations(mockPropertyStore, pruner1, pruner2);
+
+    // update with all existing segments will call into property store and 
pruner with empty list
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
ImmutableSet.of("bar", "alice"));
+    verify(mockPropertyStore, times(1)).get(argThat(new ListMatcher()), any(), 
anyInt(), anyBoolean());
+    verify(pruner1, times(1)).onAssignmentChange(any(), any(), any(), 
argThat(new ListMatcher()), any());
+    verify(pruner2, times(1)).onAssignmentChange(any(), any(), any(), 
argThat(new ListMatcher()), any());
+
+    // calling refresh will still force pull from property store
+    segmentZkMetadataFetcher.refreshSegment("foo");
+    verify(mockPropertyStore, times(1)).get(endsWith("foo"), any(), anyInt());
+    verify(pruner1, times(1)).refreshSegment(eq("foo"), any());
+    verify(pruner2, times(1)).refreshSegment(eq("foo"), any());
+  }
+
+  private static class ListMatcher implements ArgumentMatcher<List<String>> {
+    private final List<String> _valueToMatch;
+
+    private ListMatcher(String... values) {
+      _valueToMatch = Arrays.asList(values);
+    }
+
+    @Override
+    public boolean matches(List<String> arg) {
+      if (arg.size() != _valueToMatch.size()) {
+        return false;
+      }
+      for (int i = 0; i < arg.size(); i++) {
+        if (!arg.get(i).endsWith(_valueToMatch.get(i))) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+}
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index cc60739d36..feaad35169 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -35,6 +35,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
+import 
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetcher;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -263,9 +264,12 @@ public class SegmentPrunerTest extends ControllerTest {
     ExternalView externalView = Mockito.mock(ExternalView.class);
 
     SinglePartitionColumnSegmentPruner singlePartitionColumnSegmentPruner =
-        new SinglePartitionColumnSegmentPruner(OFFLINE_TABLE_NAME, 
PARTITION_COLUMN_1, _propertyStore);
+        new SinglePartitionColumnSegmentPruner(OFFLINE_TABLE_NAME, 
PARTITION_COLUMN_1);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME,
+        _propertyStore);
+    segmentZkMetadataFetcher.register(singlePartitionColumnSegmentPruner);
     Set<String> onlineSegments = new HashSet<>();
-    singlePartitionColumnSegmentPruner.init(idealState, externalView, 
onlineSegments);
+    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
     assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, 
Collections.emptySet()),
         Collections.emptySet());
     assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, 
Collections.emptySet()),
@@ -289,7 +293,7 @@ public class SegmentPrunerTest extends ControllerTest {
         new SegmentZKMetadata(segmentWithoutPartitionMetadata);
     ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME,
         segmentZKMetadataWithoutPartitionMetadata);
-    singlePartitionColumnSegmentPruner.onAssignmentChange(idealState, 
externalView, onlineSegments);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
             new 
HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
         Collections.singletonList(segmentWithoutPartitionMetadata));
@@ -309,7 +313,7 @@ public class SegmentPrunerTest extends ControllerTest {
     String segment1 = "segment1";
     onlineSegments.add(segment1);
     setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment1, "Murmur", 4, 
0);
-    singlePartitionColumnSegmentPruner.onAssignmentChange(idealState, 
externalView, onlineSegments);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     assertEquals(
         singlePartitionColumnSegmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
         new HashSet<>(Arrays.asList(segment0, segment1)));
@@ -322,7 +326,7 @@ public class SegmentPrunerTest extends ControllerTest {
 
     // Update partition metadata without refreshing should have no effect
     setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment0, "Modulo", 4, 
1);
-    singlePartitionColumnSegmentPruner.onAssignmentChange(idealState, 
externalView, onlineSegments);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     assertEquals(
         singlePartitionColumnSegmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
         new HashSet<>(Arrays.asList(segment0, segment1)));
@@ -334,7 +338,7 @@ public class SegmentPrunerTest extends ControllerTest {
         new HashSet<>(Collections.singletonList(segment1)));
 
     // Refresh the changed segment should update the segment pruner
-    singlePartitionColumnSegmentPruner.refreshSegment(segment0);
+    segmentZkMetadataFetcher.refreshSegment(segment0);
     assertEquals(
         singlePartitionColumnSegmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
         new HashSet<>(Arrays.asList(segment0, segment1)));
@@ -348,8 +352,10 @@ public class SegmentPrunerTest extends ControllerTest {
     // Multi-column partitioned segment.
     MultiPartitionColumnsSegmentPruner multiPartitionColumnsSegmentPruner =
         new MultiPartitionColumnsSegmentPruner(OFFLINE_TABLE_NAME,
-            Stream.of(PARTITION_COLUMN_1, 
PARTITION_COLUMN_2).collect(Collectors.toSet()), _propertyStore);
-    multiPartitionColumnsSegmentPruner.init(idealState, externalView, 
onlineSegments);
+            Stream.of(PARTITION_COLUMN_1, 
PARTITION_COLUMN_2).collect(Collectors.toSet()));
+    segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
+    segmentZkMetadataFetcher.register(multiPartitionColumnsSegmentPruner);
+    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
     assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1, 
Collections.emptySet()),
         Collections.emptySet());
     assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2, 
Collections.emptySet()),
@@ -367,10 +373,10 @@ public class SegmentPrunerTest extends ControllerTest {
     Map<String, String> partitionColumn2FunctionConfig = new HashMap<>();
     partitionColumn2FunctionConfig.put("columnValues", "xyz|abc");
     partitionColumn2FunctionConfig.put("columnValuesDelimiter", "|");
-    columnPartitionMetadataMap.put(PARTITION_COLUMN_2,
-        new ColumnPartitionMetadata("BoundedColumnValue", 3, 
Collections.singleton(1), partitionColumn2FunctionConfig));
+    columnPartitionMetadataMap.put(PARTITION_COLUMN_2, new 
ColumnPartitionMetadata(
+        "BoundedColumnValue", 3, Collections.singleton(1), 
partitionColumn2FunctionConfig));
     setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment2, 
columnPartitionMetadataMap);
-    multiPartitionColumnsSegmentPruner.onAssignmentChange(idealState, 
externalView, onlineSegments);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     assertEquals(
         multiPartitionColumnsSegmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
         new HashSet<>(Arrays.asList(segment0, segment1)));
@@ -399,10 +405,13 @@ public class SegmentPrunerTest extends ControllerTest {
 
     TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, 
TableType.REALTIME);
     setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
-
-    TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, 
_propertyStore);
+    TimeSegmentPruner segmentPruner = 
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig,
+        _propertyStore);
     Set<String> onlineSegments = new HashSet<>();
-    segmentPruner.init(idealState, externalView, onlineSegments);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
+        _propertyStore);
+    segmentZkMetadataFetcher.register(segmentPruner);
+    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), 
Collections.emptySet());
     assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), 
Collections.emptySet());
     assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), 
Collections.emptySet());
@@ -413,10 +422,12 @@ public class SegmentPrunerTest extends ControllerTest {
 
     // Initialize with non-empty onlineSegments
     // Segments without metadata (not updated yet) should not be pruned
-    segmentPruner = new TimeSegmentPruner(tableConfig, _propertyStore);
+    segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, 
_propertyStore);
+    segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
+    segmentZkMetadataFetcher.register(segmentPruner);
     String newSegment = "newSegment";
     onlineSegments.add(newSegment);
-    segmentPruner.init(idealState, externalView, onlineSegments);
+    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, 
Collections.singleton(newSegment)),
         Collections.singletonList(newSegment));
     assertEquals(segmentPruner.prune(brokerRequest2, 
Collections.singleton(newSegment)),
@@ -440,7 +451,7 @@ public class SegmentPrunerTest extends ControllerTest {
     
segmentZKMetadataWithoutTimeRangeMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
     ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, 
REALTIME_TABLE_NAME,
         segmentZKMetadataWithoutTimeRangeMetadata);
-    segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     assertEquals(
         segmentPruner.prune(brokerRequest1, new 
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
         Collections.singletonList(segmentWithoutTimeRangeMetadata));
@@ -476,7 +487,7 @@ public class SegmentPrunerTest extends ControllerTest {
     onlineSegments.add(segment2);
     setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, 
TimeUnit.DAYS);
 
-    segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
         new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
     assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
@@ -510,7 +521,7 @@ public class SegmentPrunerTest extends ControllerTest {
         Collections.emptySet());
 
     // Refresh the changed segment should update the segment pruner
-    segmentPruner.refreshSegment(segment2);
+    segmentZkMetadataFetcher.refreshSegment(segment2);
     assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
         new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
     assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
@@ -541,7 +552,10 @@ public class SegmentPrunerTest extends ControllerTest {
     TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, 
TableType.REALTIME);
     setSchemaDateTimeFieldSpecSDF(RAW_TABLE_NAME, SDF_PATTERN);
 
-    TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, 
_propertyStore);
+    TimeSegmentPruner segmentPruner = 
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
+        _propertyStore);
+    segmentZkMetadataFetcher.register(segmentPruner);
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
RAW_TABLE_NAME);
     DateTimeFormatSpec dateTimeFormatSpec = 
schema.getSpecForTimeColumn(TIME_COLUMN).getFormatSpec();
 
@@ -561,7 +575,7 @@ public class SegmentPrunerTest extends ControllerTest {
     setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 
dateTimeFormatSpec.fromFormatToMillis("20200401"),
         dateTimeFormatSpec.fromFormatToMillis("20200430"), 
TimeUnit.MILLISECONDS);
 
-    segmentPruner.init(idealState, externalView, onlineSegments);
+    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), 
Collections.singleton(segment0));
     assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new 
HashSet<>(Arrays.asList(segment0, segment1)));
     assertEquals(segmentPruner.prune(brokerRequest3, onlineSegments), 
Collections.singleton(segment1));
@@ -581,7 +595,10 @@ public class SegmentPrunerTest extends ControllerTest {
     TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, 
TableType.REALTIME);
     setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
 
-    TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, 
_propertyStore);
+    TimeSegmentPruner segmentPruner = 
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
+        _propertyStore);
+    segmentZkMetadataFetcher.register(segmentPruner);
     Set<String> onlineSegments = new HashSet<>();
     String segment0 = "segment0";
     onlineSegments.add(segment0);
@@ -592,7 +609,7 @@ public class SegmentPrunerTest extends ControllerTest {
     String segment2 = "segment2";
     onlineSegments.add(segment2);
     setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, 
TimeUnit.DAYS);
-    segmentPruner.init(idealState, externalView, onlineSegments);
+    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
 
     assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new 
HashSet<>(Arrays.asList(segment0, segment2)));
     assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new 
HashSet<>(Arrays.asList(segment0, segment1)));
@@ -610,7 +627,10 @@ public class SegmentPrunerTest extends ControllerTest {
     TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, 
TableType.REALTIME);
 
     // init with list of segments
-    EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig, 
_propertyStore);
+    EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
+        _propertyStore);
+    segmentZkMetadataFetcher.register(segmentPruner);
     Set<String> onlineSegments = new HashSet<>();
     String segment0 = "segment0";
     onlineSegments.add(segment0);
@@ -618,7 +638,7 @@ public class SegmentPrunerTest extends ControllerTest {
     String segment1 = "segment1";
     onlineSegments.add(segment1);
     setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment1, 0);
-    segmentPruner.init(idealState, externalView, onlineSegments);
+    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
         new HashSet<>(Collections.singletonList(segment0)));
     assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1))),
@@ -627,9 +647,11 @@ public class SegmentPrunerTest extends ControllerTest {
         new HashSet<>(Collections.singletonList(segment0)));
 
     // init with empty list of segments
-    segmentPruner = new EmptySegmentPruner(tableConfig, _propertyStore);
+    segmentPruner = new EmptySegmentPruner(tableConfig);
+    segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
+    segmentZkMetadataFetcher.register(segmentPruner);
     onlineSegments.clear();
-    segmentPruner.init(idealState, externalView, onlineSegments);
+    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), 
Collections.emptySet());
     assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), 
Collections.emptySet());
     assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), 
Collections.emptySet());
@@ -637,7 +659,7 @@ public class SegmentPrunerTest extends ControllerTest {
     // Segments without metadata (not updated yet) should not be pruned
     String newSegment = "newSegment";
     onlineSegments.add(newSegment);
-    segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, 
Collections.singleton(newSegment)),
         Collections.singleton(newSegment));
     assertEquals(segmentPruner.prune(brokerRequest2, 
Collections.singleton(newSegment)),
@@ -654,7 +676,7 @@ public class SegmentPrunerTest extends ControllerTest {
     
segmentZKMetadataWithoutTotalDocsMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
     ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, 
REALTIME_TABLE_NAME,
         segmentZKMetadataWithoutTotalDocsMetadata);
-    segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, 
Collections.singleton(segmentWithoutTotalDocsMetadata)),
         Collections.singleton(segmentWithoutTotalDocsMetadata));
     assertEquals(segmentPruner.prune(brokerRequest2, 
Collections.singleton(segmentWithoutTotalDocsMetadata)),
@@ -667,7 +689,7 @@ public class SegmentPrunerTest extends ControllerTest {
     String segmentWithNegativeTotalDocsMetadata = 
"segmentWithNegativeTotalDocsMetadata";
     onlineSegments.add(segmentWithNegativeTotalDocsMetadata);
     setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, 
segmentWithNegativeTotalDocsMetadata, -1);
-    segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, 
Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
         Collections.singleton(segmentWithNegativeTotalDocsMetadata));
     assertEquals(segmentPruner.prune(brokerRequest2, 
Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
@@ -685,7 +707,7 @@ public class SegmentPrunerTest extends ControllerTest {
     onlineSegments.add(segment2);
     setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment2, -1);
 
-    segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
         new HashSet<>(Arrays.asList(segment0, segment2)));
     assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
@@ -704,7 +726,7 @@ public class SegmentPrunerTest extends ControllerTest {
         new HashSet<>(Arrays.asList(segment0, segment2)));
 
     // Refresh the changed segment should update the segment pruner
-    segmentPruner.refreshSegment(segment2);
+    segmentZkMetadataFetcher.refreshSegment(segment2);
     assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
         new HashSet<>(Collections.singletonList(segment0)));
     assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1, segment2))),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to