This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 48deafdf94 create segment zk metadata cache (#10455) 48deafdf94 is described below commit 48deafdf944fd320ba3663ee9579c353e1d03c93 Author: Rong Rong <ro...@apache.org> AuthorDate: Fri Mar 31 07:39:02 2023 -0700 create segment zk metadata cache (#10455) * [partition] add partition routing to routing manager - add SegmentZkMetadataFetchListener interface - maintain a seen list of onlineSegments so not pull ZK when unnecessary - when no pruner registered, do not pull ZK - refactor Segment ZNRecord refresher - adding tests --------- Co-authored-by: Rong Rong <ro...@startree.ai> --- .../pinot/broker/routing/BrokerRoutingManager.java | 31 ++-- .../SegmentZkMetadataFetchListener.java} | 25 +-- .../segmentmetadata/SegmentZkMetadataFetcher.java | 122 ++++++++++++++ .../routing/segmentpruner/EmptySegmentPruner.java | 40 ++--- .../MultiPartitionColumnsSegmentPruner.java | 43 ++--- .../routing/segmentpruner/SegmentPruner.java | 24 +-- .../segmentpruner/SegmentPrunerFactory.java | 33 +++- .../SinglePartitionColumnSegmentPruner.java | 42 ++--- .../routing/segmentpruner/TimeSegmentPruner.java | 55 ++----- .../SegmentZkMetadataFetcherTest.java | 175 +++++++++++++++++++++ .../routing/segmentpruner/SegmentPrunerTest.java | 86 ++++++---- 11 files changed, 467 insertions(+), 209 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index 883e9cfb02..93edc45f52 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -43,6 +43,8 @@ import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSele import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelectorFactory; import org.apache.pinot.broker.routing.instanceselector.InstanceSelector; import org.apache.pinot.broker.routing.instanceselector.InstanceSelectorFactory; +import org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetchListener; +import org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetcher; import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector; import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelectorFactory; import org.apache.pinot.broker.routing.segmentpruner.SegmentPruner; @@ -430,10 +432,10 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle Set<String> preSelectedOnlineSegments = segmentPreSelector.preSelect(onlineSegments); SegmentSelector segmentSelector = SegmentSelectorFactory.getSegmentSelector(tableConfig); segmentSelector.init(idealState, externalView, preSelectedOnlineSegments); + + // Register segment pruners and initialize segment zk metadata fetcher. List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); - for (SegmentPruner segmentPruner : segmentPruners) { - segmentPruner.init(idealState, externalView, preSelectedOnlineSegments); - } + AdaptiveServerSelector adaptiveServerSelector = AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager, _pinotConfig); InstanceSelector instanceSelector = @@ -488,10 +490,16 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle QueryConfig queryConfig = tableConfig.getQueryConfig(); Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() : null; + SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(tableNameWithType, _propertyStore); + for (SegmentZkMetadataFetchListener listener : segmentPruners) { + segmentZkMetadataFetcher.register(listener); + } + segmentZkMetadataFetcher.init(idealState, externalView, preSelectedOnlineSegments); + RoutingEntry routingEntry = new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath, segmentPreSelector, segmentSelector, - segmentPruners, instanceSelector, idealStateVersion, externalViewVersion, timeBoundaryManager, - queryTimeoutMs); + segmentPruners, instanceSelector, idealStateVersion, externalViewVersion, segmentZkMetadataFetcher, + timeBoundaryManager, queryTimeoutMs); if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) { LOGGER.info("Built routing for table: {}", tableNameWithType); } else { @@ -638,6 +646,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle final List<SegmentPruner> _segmentPruners; final InstanceSelector _instanceSelector; final Long _queryTimeoutMs; + final SegmentZkMetadataFetcher _segmentZkMetadataFetcher; // Cache IdealState and ExternalView version for the last update transient int _lastUpdateIdealStateVersion; @@ -648,7 +657,8 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle RoutingEntry(String tableNameWithType, String idealStatePath, String externalViewPath, SegmentPreSelector segmentPreSelector, SegmentSelector segmentSelector, List<SegmentPruner> segmentPruners, InstanceSelector instanceSelector, int lastUpdateIdealStateVersion, int lastUpdateExternalViewVersion, - @Nullable TimeBoundaryManager timeBoundaryManager, @Nullable Long queryTimeoutMs) { + SegmentZkMetadataFetcher segmentZkMetadataFetcher, @Nullable TimeBoundaryManager timeBoundaryManager, + @Nullable Long queryTimeoutMs) { _tableNameWithType = tableNameWithType; _idealStatePath = idealStatePath; _externalViewPath = externalViewPath; @@ -660,6 +670,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle _lastUpdateExternalViewVersion = lastUpdateExternalViewVersion; _timeBoundaryManager = timeBoundaryManager; _queryTimeoutMs = queryTimeoutMs; + _segmentZkMetadataFetcher = segmentZkMetadataFetcher; } String getTableNameWithType() { @@ -693,10 +704,8 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle void onAssignmentChange(IdealState idealState, ExternalView externalView) { Set<String> onlineSegments = getOnlineSegments(idealState); Set<String> preSelectedOnlineSegments = _segmentPreSelector.preSelect(onlineSegments); + _segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, preSelectedOnlineSegments); _segmentSelector.onAssignmentChange(idealState, externalView, preSelectedOnlineSegments); - for (SegmentPruner segmentPruner : _segmentPruners) { - segmentPruner.onAssignmentChange(idealState, externalView, preSelectedOnlineSegments); - } _instanceSelector.onAssignmentChange(idealState, externalView, preSelectedOnlineSegments); if (_timeBoundaryManager != null) { _timeBoundaryManager.onAssignmentChange(idealState, externalView, preSelectedOnlineSegments); @@ -710,9 +719,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle } void refreshSegment(String segment) { - for (SegmentPruner segmentPruner : _segmentPruners) { - segmentPruner.refreshSegment(segment); - } + _segmentZkMetadataFetcher.refreshSegment(segment); if (_timeBoundaryManager != null) { _timeBoundaryManager.refreshSegment(segment); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetchListener.java similarity index 68% copy from pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java copy to pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetchListener.java index 5893e6bd92..138291089d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetchListener.java @@ -16,40 +16,41 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.broker.routing.segmentpruner; +package org.apache.pinot.broker.routing.segmentmetadata; +import java.util.List; import java.util.Set; +import javax.annotation.Nullable; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector; -import org.apache.pinot.common.request.BrokerRequest; /** - * The segment pruner prunes the selected segments based on the query. + * Interface to register with {@link SegmentZkMetadataFetcher}. + * + * <p>When registered, SegmentZKMetadataFetcher will fetch {@link ZNRecord} for associated {@code onlineSegments} list + * or refreshed {@code segment}. Thus batch up all ZK access for segment metadata. */ -public interface SegmentPruner { +public interface SegmentZkMetadataFetchListener { /** * Initializes the segment pruner with the ideal state, external view and online segments (segments with * ONLINE/CONSUMING instances in the ideal state and pre-selected by the {@link SegmentPreSelector}). Should be called * only once before calling other methods. */ - void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments); + void init(IdealState idealState, ExternalView externalView, List<String> onlineSegments, List<ZNRecord> znRecords); /** * Processes the segment assignment (ideal state or external view) change based on the given online segments (segments * with ONLINE/CONSUMING instances in the ideal state and pre-selected by the {@link SegmentPreSelector}). */ - void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments); + void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments, + List<String> pulledSegments, List<ZNRecord> znRecords); /** * Refreshes the metadata for the given segment (called when segment is getting refreshed). */ - void refreshSegment(String segment); - - /** - * Prunes the segments queried by the given broker request, returns the selected segments to be queried. - */ - Set<String> prune(BrokerRequest brokerRequest, Set<String> segments); + void refreshSegment(String segment, @Nullable ZNRecord znRecord); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java new file mode 100644 index 0000000000..de9ec0a699 --- /dev/null +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing.segmentmetadata; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.helix.AccessOption; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.metadata.ZKMetadataProvider; + + +/** + * {@code SegmentZkMetadataFetcher} is used to cache {@link ZNRecord} stored in {@link ZkHelixPropertyStore} for + * segments. + */ +public class SegmentZkMetadataFetcher { + private final String _tableNameWithType; + private final ZkHelixPropertyStore<ZNRecord> _propertyStore; + private final String _segmentZKMetadataPathPrefix; + private final List<SegmentZkMetadataFetchListener> _listeners; + private final Set<String> _onlineSegmentsCached; + + private boolean _initialized; + + public SegmentZkMetadataFetcher(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore) { + _tableNameWithType = tableNameWithType; + _propertyStore = propertyStore; + _segmentZKMetadataPathPrefix = ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + "/"; + _listeners = new ArrayList<>(); + _onlineSegmentsCached = new HashSet<>(); + _initialized = false; + } + + public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) { + if (!_initialized) { + _initialized = true; + if (!_listeners.isEmpty()) { + // Bulk load partition info for all online segments + int numSegments = onlineSegments.size(); + List<String> segments = new ArrayList<>(numSegments); + List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments); + for (String segment : onlineSegments) { + segments.add(segment); + segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment); + } + _onlineSegmentsCached.addAll(onlineSegments); + List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false); + for (SegmentZkMetadataFetchListener listener : _listeners) { + listener.init(idealState, externalView, segments, znRecords); + } + } + } else { + throw new RuntimeException("Segment zk metadata fetcher has already been initialized!"); + } + } + + public void register(SegmentZkMetadataFetchListener listener) { + if (!_initialized) { + _listeners.add(listener); + } else { + throw new RuntimeException("Segment zk metadata fetcher has already been initialized! " + + "Unable to register more listeners."); + } + } + + public List<SegmentZkMetadataFetchListener> getListeners() { + return _listeners; + } + + public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView, + Set<String> onlineSegments) { + if (!_listeners.isEmpty()) { + int numSegments = onlineSegments.size(); + List<String> segments = new ArrayList<>(numSegments); + List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments); + + for (String segment : onlineSegments) { + if (_onlineSegmentsCached.add(segment)) { + segments.add(segment); + segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment); + } + } + _onlineSegmentsCached.addAll(onlineSegments); + List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false); + for (SegmentZkMetadataFetchListener listener : _listeners) { + listener.onAssignmentChange(idealState, externalView, onlineSegments, segments, znRecords); + } + _onlineSegmentsCached.retainAll(onlineSegments); + } + } + + public synchronized void refreshSegment(String segment) { + if (!_listeners.isEmpty()) { + ZNRecord znRecord = _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT); + for (SegmentZkMetadataFetchListener listener : _listeners) { + listener.refreshSegment(segment, znRecord); + } + _onlineSegmentsCached.add(segment); + } + } +} diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java index 7a7b66b086..aeb6a01f2e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java @@ -18,18 +18,14 @@ */ package org.apache.pinot.broker.routing.segmentpruner; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; -import org.apache.helix.AccessOption; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.utils.CommonConstants; @@ -45,35 +41,23 @@ public class EmptySegmentPruner implements SegmentPruner { private static final Logger LOGGER = LoggerFactory.getLogger(EmptySegmentPruner.class); private final String _tableNameWithType; - private final ZkHelixPropertyStore<ZNRecord> _propertyStore; - private final String _segmentZKMetadataPathPrefix; private final Set<String> _segmentsLoaded = new HashSet<>(); private final Set<String> _emptySegments = ConcurrentHashMap.newKeySet(); private volatile ResultCache _resultCache; - public EmptySegmentPruner(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) { + public EmptySegmentPruner(TableConfig tableConfig) { _tableNameWithType = tableConfig.getTableName(); - _propertyStore = propertyStore; - _segmentZKMetadataPathPrefix = ZKMetadataProvider.constructPropertyStorePathForResource(_tableNameWithType) + "/"; } @Override - public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) { + public void init(IdealState idealState, ExternalView externalView, List<String> onlineSegments, + List<ZNRecord> znRecords) { // Bulk load info for all online segments - int numSegments = onlineSegments.size(); - List<String> segments = new ArrayList<>(numSegments); - List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments); - for (String segment : onlineSegments) { - segments.add(segment); - segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment); - } - _segmentsLoaded.addAll(segments); - List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false); - for (int i = 0; i < numSegments; i++) { - String segment = segments.get(i); - if (isEmpty(segment, znRecords.get(i))) { + for (int idx = 0; idx < onlineSegments.size(); idx++) { + String segment = onlineSegments.get(idx); + if (isEmpty(segment, znRecords.get(idx))) { _emptySegments.add(segment); } } @@ -81,14 +65,14 @@ public class EmptySegmentPruner implements SegmentPruner { @Override public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView, - Set<String> onlineSegments) { + Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> znRecords) { // NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed // ones. The refreshed segment ZK metadata change won't be picked up. boolean emptySegmentsChanged = false; - for (String segment : onlineSegments) { + for (int idx = 0; idx < pulledSegments.size(); idx++) { + String segment = pulledSegments.get(idx); if (_segmentsLoaded.add(segment)) { - if (isEmpty(segment, - _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT))) { + if (isEmpty(segment, znRecords.get(idx))) { emptySegmentsChanged |= _emptySegments.add(segment); } } @@ -103,9 +87,9 @@ public class EmptySegmentPruner implements SegmentPruner { } @Override - public synchronized void refreshSegment(String segment) { + public synchronized void refreshSegment(String segment, @Nullable ZNRecord znRecord) { _segmentsLoaded.add(segment); - if (isEmpty(segment, _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT))) { + if (isEmpty(segment, znRecord)) { if (_emptySegments.add(segment)) { // Reset the result cache when empty segments changed _resultCache = null; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java index dd4edb686b..2f3216cbf2 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java @@ -19,7 +19,6 @@ package org.apache.pinot.broker.routing.segmentpruner; import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -28,12 +27,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; -import org.apache.helix.AccessOption; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.Expression; @@ -58,33 +54,21 @@ public class MultiPartitionColumnsSegmentPruner implements SegmentPruner { private final String _tableNameWithType; private final Set<String> _partitionColumns; - private final ZkHelixPropertyStore<ZNRecord> _propertyStore; - private final String _segmentZKMetadataPathPrefix; private final Map<String, Map<String, PartitionInfo>> _segmentColumnPartitionInfoMap = new ConcurrentHashMap<>(); - public MultiPartitionColumnsSegmentPruner(String tableNameWithType, Set<String> partitionColumns, - ZkHelixPropertyStore<ZNRecord> propertyStore) { + public MultiPartitionColumnsSegmentPruner(String tableNameWithType, Set<String> partitionColumns) { _tableNameWithType = tableNameWithType; _partitionColumns = partitionColumns; - _propertyStore = propertyStore; - _segmentZKMetadataPathPrefix = ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + "/"; } @Override - public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) { + public void init(IdealState idealState, ExternalView externalView, List<String> onlineSegments, + List<ZNRecord> znRecords) { // Bulk load partition info for all online segments - int numSegments = onlineSegments.size(); - List<String> segments = new ArrayList<>(numSegments); - List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments); - for (String segment : onlineSegments) { - segments.add(segment); - segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment); - } - List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false); - for (int i = 0; i < numSegments; i++) { - String segment = segments.get(i); + for (int idx = 0; idx < onlineSegments.size(); idx++) { + String segment = onlineSegments.get(idx); Map<String, PartitionInfo> columnPartitionInfoMap = - extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment, znRecords.get(i)); + extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment, znRecords.get(idx)); if (columnPartitionInfoMap != null) { _segmentColumnPartitionInfoMap.put(segment, columnPartitionInfoMap); } @@ -143,21 +127,22 @@ public class MultiPartitionColumnsSegmentPruner implements SegmentPruner { @Override public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView, - Set<String> onlineSegments) { + Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> znRecords) { // NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed // ones. The refreshed segment ZK metadata change won't be picked up. - for (String segment : onlineSegments) { + for (int idx = 0; idx < pulledSegments.size(); idx++) { + String segment = pulledSegments.get(idx); + ZNRecord znRecord = znRecords.get(idx); _segmentColumnPartitionInfoMap.computeIfAbsent(segment, - k -> extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(k, - _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, AccessOption.PERSISTENT))); + k -> extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(k, znRecord)); } _segmentColumnPartitionInfoMap.keySet().retainAll(onlineSegments); } @Override - public synchronized void refreshSegment(String segment) { - Map<String, PartitionInfo> columnPartitionInfo = extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment, - _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)); + public synchronized void refreshSegment(String segment, @Nullable ZNRecord znRecord) { + Map<String, PartitionInfo> columnPartitionInfo = + extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment, znRecord); if (columnPartitionInfo != null) { _segmentColumnPartitionInfoMap.put(segment, columnPartitionInfo); } else { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java index 5893e6bd92..17e92e5c15 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java @@ -19,34 +19,14 @@ package org.apache.pinot.broker.routing.segmentpruner; import java.util.Set; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector; +import org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetchListener; import org.apache.pinot.common.request.BrokerRequest; /** * The segment pruner prunes the selected segments based on the query. */ -public interface SegmentPruner { - - /** - * Initializes the segment pruner with the ideal state, external view and online segments (segments with - * ONLINE/CONSUMING instances in the ideal state and pre-selected by the {@link SegmentPreSelector}). Should be called - * only once before calling other methods. - */ - void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments); - - /** - * Processes the segment assignment (ideal state or external view) change based on the given online segments (segments - * with ONLINE/CONSUMING instances in the ideal state and pre-selected by the {@link SegmentPreSelector}). - */ - void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments); - - /** - * Refreshes the metadata for the given segment (called when segment is getting refreshed). - */ - void refreshSegment(String segment); +public interface SegmentPruner extends SegmentZkMetadataFetchListener { /** * Prunes the segments queried by the given broker request, returns the selected segments to be queried. diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java index f34f55f3c3..6135982e18 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.broker.routing.segmentpruner; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -26,6 +28,7 @@ import javax.annotation.Nullable; import org.apache.commons.collections.MapUtils; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.RoutingConfig; @@ -33,6 +36,9 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +58,7 @@ public class SegmentPrunerFactory { boolean needsEmptySegment = TableConfigUtils.needsEmptySegmentPruner(tableConfig); if (needsEmptySegment) { // Add EmptySegmentPruner if needed - segmentPruners.add(new EmptySegmentPruner(tableConfig, propertyStore)); + segmentPruners.add(new EmptySegmentPruner(tableConfig)); } RoutingConfig routingConfig = tableConfig.getRoutingConfig(); @@ -113,8 +119,8 @@ public class SegmentPrunerFactory { LOGGER.info("Using PartitionSegmentPruner on partition columns: {} for table: {}", partitionColumns, tableNameWithType); return partitionColumns.size() == 1 ? new SinglePartitionColumnSegmentPruner(tableNameWithType, - partitionColumns.iterator().next(), propertyStore) - : new MultiPartitionColumnsSegmentPruner(tableNameWithType, partitionColumns, propertyStore); + partitionColumns.iterator().next()) + : new MultiPartitionColumnsSegmentPruner(tableNameWithType, partitionColumns); } @Nullable @@ -131,9 +137,26 @@ public class SegmentPrunerFactory { LOGGER.warn("Cannot enable time range pruning without time column for table: {}", tableNameWithType); return null; } + return createTimeSegmentPruner(tableConfig, propertyStore); + } + + @VisibleForTesting + static TimeSegmentPruner createTimeSegmentPruner(TableConfig tableConfig, + ZkHelixPropertyStore<ZNRecord> propertyStore) { + String tableNameWithType = tableConfig.getTableName(); + String timeColumn = tableConfig.getValidationConfig().getTimeColumnName(); + Preconditions.checkNotNull(timeColumn, "Time column must be configured in table config for table: %s", + tableNameWithType); + Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableNameWithType); + Preconditions.checkNotNull(schema, "Failed to find schema for table: %s", tableNameWithType); + DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn); + Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in schema for time column: %s of table: %s", + timeColumn, tableNameWithType); + DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec(); - LOGGER.info("Using TimeRangePruner on time column: {} for table: {}", timeColumn, tableNameWithType); - return new TimeSegmentPruner(tableConfig, propertyStore); + LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with DateTimeFormatSpec: {}", + timeColumn, tableNameWithType, timeFormatSpec); + return new TimeSegmentPruner(tableConfig, timeColumn, timeFormatSpec); } private static List<SegmentPruner> sortSegmentPruners(List<SegmentPruner> pruners) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java index 2c75782ef3..2959537706 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java @@ -18,19 +18,15 @@ */ package org.apache.pinot.broker.routing.segmentpruner; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; -import org.apache.helix.AccessOption; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.Expression; @@ -55,32 +51,20 @@ public class SinglePartitionColumnSegmentPruner implements SegmentPruner { private final String _tableNameWithType; private final String _partitionColumn; - private final ZkHelixPropertyStore<ZNRecord> _propertyStore; - private final String _segmentZKMetadataPathPrefix; private final Map<String, PartitionInfo> _partitionInfoMap = new ConcurrentHashMap<>(); - public SinglePartitionColumnSegmentPruner(String tableNameWithType, String partitionColumn, - ZkHelixPropertyStore<ZNRecord> propertyStore) { + public SinglePartitionColumnSegmentPruner(String tableNameWithType, String partitionColumn) { _tableNameWithType = tableNameWithType; _partitionColumn = partitionColumn; - _propertyStore = propertyStore; - _segmentZKMetadataPathPrefix = ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + "/"; } @Override - public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) { + public void init(IdealState idealState, ExternalView externalView, List<String> onlineSegments, + List<ZNRecord> znRecords) { // Bulk load partition info for all online segments - int numSegments = onlineSegments.size(); - List<String> segments = new ArrayList<>(numSegments); - List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments); - for (String segment : onlineSegments) { - segments.add(segment); - segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment); - } - List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false); - for (int i = 0; i < numSegments; i++) { - String segment = segments.get(i); - PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i)); + for (int idx = 0; idx < onlineSegments.size(); idx++) { + String segment = onlineSegments.get(idx); + PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(idx)); if (partitionInfo != null) { _partitionInfoMap.put(segment, partitionInfo); } @@ -129,20 +113,20 @@ public class SinglePartitionColumnSegmentPruner implements SegmentPruner { @Override public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView, - Set<String> onlineSegments) { + Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> znRecords) { // NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed // ones. The refreshed segment ZK metadata change won't be picked up. - for (String segment : onlineSegments) { - _partitionInfoMap.computeIfAbsent(segment, k -> extractPartitionInfoFromSegmentZKMetadataZNRecord(k, - _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, AccessOption.PERSISTENT))); + for (int idx = 0; idx < pulledSegments.size(); idx++) { + String segment = pulledSegments.get(idx); + ZNRecord znRecord = znRecords.get(idx); + _partitionInfoMap.computeIfAbsent(segment, k -> extractPartitionInfoFromSegmentZKMetadataZNRecord(k, znRecord)); } _partitionInfoMap.keySet().retainAll(onlineSegments); } @Override - public synchronized void refreshSegment(String segment) { - PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, - _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)); + public synchronized void refreshSegment(String segment, @Nullable ZNRecord znRecord) { + PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecord); if (partitionInfo != null) { _partitionInfoMap.put(segment, partitionInfo); } else { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java index c123c65a67..a7ac4fce4b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.broker.routing.segmentpruner; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -29,22 +28,17 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; -import org.apache.helix.AccessOption; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.broker.routing.segmentpruner.interval.Interval; import org.apache.pinot.broker.routing.segmentpruner.interval.IntervalTree; -import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.Expression; import org.apache.pinot.common.request.Function; import org.apache.pinot.common.request.Identifier; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; -import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Query.Range; import org.apache.pinot.sql.FilterKind; @@ -64,44 +58,25 @@ public class TimeSegmentPruner implements SegmentPruner { private static final Interval DEFAULT_INTERVAL = new Interval(MIN_START_TIME, MAX_END_TIME); private final String _tableNameWithType; - private final ZkHelixPropertyStore<ZNRecord> _propertyStore; - private final String _segmentZKMetadataPathPrefix; private final String _timeColumn; private final DateTimeFormatSpec _timeFormatSpec; private volatile IntervalTree<String> _intervalTree; private final Map<String, Interval> _intervalMap = new HashMap<>(); - public TimeSegmentPruner(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) { + public TimeSegmentPruner(TableConfig tableConfig, String timeColumn, DateTimeFormatSpec timeFormatSpec) { _tableNameWithType = tableConfig.getTableName(); - _propertyStore = propertyStore; - _segmentZKMetadataPathPrefix = ZKMetadataProvider.constructPropertyStorePathForResource(_tableNameWithType) + "/"; - _timeColumn = tableConfig.getValidationConfig().getTimeColumnName(); - Preconditions.checkNotNull(_timeColumn, "Time column must be configured in table config for table: %s", - _tableNameWithType); - - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - Preconditions.checkNotNull(schema, "Failed to find schema for table: %s", _tableNameWithType); - DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(_timeColumn); - Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in schema for time column: %s of table: %s", - _timeColumn, _tableNameWithType); - _timeFormatSpec = dateTimeSpec.getFormatSpec(); + _timeColumn = timeColumn; + _timeFormatSpec = timeFormatSpec; } @Override - public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) { + public void init(IdealState idealState, ExternalView externalView, List<String> onlineSegments, + List<ZNRecord> znRecords) { // Bulk load time info for all online segments - int numSegments = onlineSegments.size(); - List<String> segments = new ArrayList<>(numSegments); - List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments); - for (String segment : onlineSegments) { - segments.add(segment); - segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment); - } - List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false); - for (int i = 0; i < numSegments; i++) { - String segment = segments.get(i); - Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment, znRecords.get(i)); + for (int idx = 0; idx < onlineSegments.size(); idx++) { + String segment = onlineSegments.get(idx); + Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment, znRecords.get(idx)); _intervalMap.put(segment, interval); } _intervalTree = new IntervalTree<>(_intervalMap); @@ -128,21 +103,21 @@ public class TimeSegmentPruner implements SegmentPruner { @Override public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView, - Set<String> onlineSegments) { + Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord> znRecords) { // NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed // ones. The refreshed segment ZK metadata change won't be picked up. - for (String segment : onlineSegments) { - _intervalMap.computeIfAbsent(segment, k -> extractIntervalFromSegmentZKMetaZNRecord(k, - _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, AccessOption.PERSISTENT))); + for (int idx = 0; idx < pulledSegments.size(); idx++) { + String segment = pulledSegments.get(idx); + ZNRecord zNrecord = znRecords.get(idx); + _intervalMap.computeIfAbsent(segment, k -> extractIntervalFromSegmentZKMetaZNRecord(k, zNrecord)); } _intervalMap.keySet().retainAll(onlineSegments); _intervalTree = new IntervalTree<>(_intervalMap); } @Override - public synchronized void refreshSegment(String segment) { - Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment, - _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)); + public synchronized void refreshSegment(String segment, @Nullable ZNRecord znRecord) { + Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment, znRecord); _intervalMap.put(segment, interval); _intervalTree = new IntervalTree<>(_intervalMap); } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcherTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcherTest.java new file mode 100644 index 0000000000..a63811dccd --- /dev/null +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcherTest.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing.segmentmetadata; + +import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.broker.routing.segmentpruner.SegmentPruner; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.controller.helix.ControllerTest; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.AssertJUnit.fail; + + +public class SegmentZkMetadataFetcherTest extends ControllerTest { + private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE"; + + @Test + public void testSegmentZkMetadataFetcherShouldNotAllowIncorrectRegisterOrInitBehavior() { + ZkHelixPropertyStore<ZNRecord> mockPropertyStore = Mockito.mock(ZkHelixPropertyStore.class); + IdealState idealState = Mockito.mock(IdealState.class); + ExternalView externalView = Mockito.mock(ExternalView.class); + + // empty listener at beginning + SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, + mockPropertyStore); + assertEquals(segmentZkMetadataFetcher.getListeners().size(), 0); + + // should allow register new listener + segmentZkMetadataFetcher.register(mock(SegmentZkMetadataFetchListener.class)); + assertEquals(segmentZkMetadataFetcher.getListeners().size(), 1); + + // should not allow register new listener once initialized + segmentZkMetadataFetcher.init(idealState, externalView, Collections.singleton("foo")); + try { + segmentZkMetadataFetcher.register(mock(SegmentZkMetadataFetchListener.class)); + fail(); + } catch (RuntimeException rte) { + assertTrue(rte.getMessage().contains("has already been initialized")); + } + + // should not allow duplicate init either + try { + segmentZkMetadataFetcher.init(idealState, externalView, Collections.singleton("foo")); + fail(); + } catch (RuntimeException rte) { + assertTrue(rte.getMessage().contains("has already been initialized")); + } + } + + @Test + public void testSegmentZkMetadataFetcherShouldNotPullZkWhenNoPrunerRegistered() { + ZkHelixPropertyStore<ZNRecord> mockPropertyStore = Mockito.mock(ZkHelixPropertyStore.class); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, + mockPropertyStore); + // NOTE: Ideal state and external view are not used in the current implementation + IdealState idealState = Mockito.mock(IdealState.class); + ExternalView externalView = Mockito.mock(ExternalView.class); + + assertEquals(segmentZkMetadataFetcher.getListeners().size(), 0); + segmentZkMetadataFetcher.init(idealState, externalView, Collections.singleton("foo")); + Mockito.verify(mockPropertyStore, times(0)).get(any(), any(), anyInt(), anyBoolean()); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, Collections.singleton("foo")); + Mockito.verify(mockPropertyStore, times(0)).get(any(), any(), anyInt(), anyBoolean()); + segmentZkMetadataFetcher.refreshSegment("foo"); + Mockito.verify(mockPropertyStore, times(0)).get(any(), any(), anyInt(), anyBoolean()); + } + + @Test + public void testSegmentZkMetadataFetcherShouldPullZkOnlyOncePerSegmentWhenMultiplePrunersRegistered() { + ZkHelixPropertyStore<ZNRecord> mockPropertyStore = mock(ZkHelixPropertyStore.class); + when(mockPropertyStore.get(any(), any(), anyInt(), anyBoolean())).thenAnswer(inv -> { + List<String> pathList = inv.getArgument(0); + List<ZNRecord> result = new ArrayList<>(pathList.size()); + for (String path : pathList) { + String[] pathParts = path.split("/"); + String segmentName = pathParts[pathParts.length - 1]; + SegmentZKMetadata fakeSegmentZkMetadata = new SegmentZKMetadata(segmentName); + result.add(fakeSegmentZkMetadata.toZNRecord()); + } + return result; + }); + SegmentPruner pruner1 = mock(SegmentPruner.class); + SegmentPruner pruner2 = mock(SegmentPruner.class); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, + mockPropertyStore); + segmentZkMetadataFetcher.register(pruner1); + segmentZkMetadataFetcher.register(pruner2); + // NOTE: Ideal state and external view are not used in the current implementation + IdealState idealState = mock(IdealState.class); + ExternalView externalView = mock(ExternalView.class); + + assertEquals(segmentZkMetadataFetcher.getListeners().size(), 2); + // should call property store once for "foo" and "bar" as a batch + segmentZkMetadataFetcher.init(idealState, externalView, ImmutableSet.of("foo", "bar")); + verify(mockPropertyStore, times(1)).get(argThat(new ListMatcher("foo", "bar")), any(), anyInt(), anyBoolean()); + verify(pruner1, times(1)).init(any(), any(), argThat(new ListMatcher("foo", "bar")), any()); + verify(pruner2, times(1)).init(any(), any(), argThat(new ListMatcher("foo", "bar")), any()); + + // should call property store only once b/c "alice" was missing + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, ImmutableSet.of("bar", "alice")); + verify(mockPropertyStore, times(1)).get(argThat(new ListMatcher("alice")), any(), anyInt(), anyBoolean()); + verify(pruner1, times(1)).onAssignmentChange(any(), any(), any(), argThat(new ListMatcher("alice")), any()); + verify(pruner2, times(1)).onAssignmentChange(any(), any(), any(), argThat(new ListMatcher("alice")), any()); + + // should call property store once more b/c "foo" was cleared when onAssignmentChange called with "bar" and "alice" + segmentZkMetadataFetcher.refreshSegment("foo"); + verify(mockPropertyStore, times(1)).get(endsWith("foo"), any(), anyInt()); + verify(pruner1, times(1)).refreshSegment(eq("foo"), any()); + verify(pruner2, times(1)).refreshSegment(eq("foo"), any()); + clearInvocations(mockPropertyStore, pruner1, pruner2); + + // update with all existing segments will call into property store and pruner with empty list + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, ImmutableSet.of("bar", "alice")); + verify(mockPropertyStore, times(1)).get(argThat(new ListMatcher()), any(), anyInt(), anyBoolean()); + verify(pruner1, times(1)).onAssignmentChange(any(), any(), any(), argThat(new ListMatcher()), any()); + verify(pruner2, times(1)).onAssignmentChange(any(), any(), any(), argThat(new ListMatcher()), any()); + + // calling refresh will still force pull from property store + segmentZkMetadataFetcher.refreshSegment("foo"); + verify(mockPropertyStore, times(1)).get(endsWith("foo"), any(), anyInt()); + verify(pruner1, times(1)).refreshSegment(eq("foo"), any()); + verify(pruner2, times(1)).refreshSegment(eq("foo"), any()); + } + + private static class ListMatcher implements ArgumentMatcher<List<String>> { + private final List<String> _valueToMatch; + + private ListMatcher(String... values) { + _valueToMatch = Arrays.asList(values); + } + + @Override + public boolean matches(List<String> arg) { + if (arg.size() != _valueToMatch.size()) { + return false; + } + for (int i = 0; i < arg.size(); i++) { + if (!arg.get(i).endsWith(_valueToMatch.get(i))) { + return false; + } + } + return true; + } + } +} diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java index cc60739d36..feaad35169 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java @@ -35,6 +35,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetcher; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -263,9 +264,12 @@ public class SegmentPrunerTest extends ControllerTest { ExternalView externalView = Mockito.mock(ExternalView.class); SinglePartitionColumnSegmentPruner singlePartitionColumnSegmentPruner = - new SinglePartitionColumnSegmentPruner(OFFLINE_TABLE_NAME, PARTITION_COLUMN_1, _propertyStore); + new SinglePartitionColumnSegmentPruner(OFFLINE_TABLE_NAME, PARTITION_COLUMN_1); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, + _propertyStore); + segmentZkMetadataFetcher.register(singlePartitionColumnSegmentPruner); Set<String> onlineSegments = new HashSet<>(); - singlePartitionColumnSegmentPruner.init(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, Collections.emptySet()), Collections.emptySet()); assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, Collections.emptySet()), @@ -289,7 +293,7 @@ public class SegmentPrunerTest extends ControllerTest { new SegmentZKMetadata(segmentWithoutPartitionMetadata); ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segmentZKMetadataWithoutPartitionMetadata); - singlePartitionColumnSegmentPruner.onAssignmentChange(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, new HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))), Collections.singletonList(segmentWithoutPartitionMetadata)); @@ -309,7 +313,7 @@ public class SegmentPrunerTest extends ControllerTest { String segment1 = "segment1"; onlineSegments.add(segment1); setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment1, "Murmur", 4, 0); - singlePartitionColumnSegmentPruner.onAssignmentChange(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); assertEquals( singlePartitionColumnSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))), new HashSet<>(Arrays.asList(segment0, segment1))); @@ -322,7 +326,7 @@ public class SegmentPrunerTest extends ControllerTest { // Update partition metadata without refreshing should have no effect setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment0, "Modulo", 4, 1); - singlePartitionColumnSegmentPruner.onAssignmentChange(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); assertEquals( singlePartitionColumnSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))), new HashSet<>(Arrays.asList(segment0, segment1))); @@ -334,7 +338,7 @@ public class SegmentPrunerTest extends ControllerTest { new HashSet<>(Collections.singletonList(segment1))); // Refresh the changed segment should update the segment pruner - singlePartitionColumnSegmentPruner.refreshSegment(segment0); + segmentZkMetadataFetcher.refreshSegment(segment0); assertEquals( singlePartitionColumnSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))), new HashSet<>(Arrays.asList(segment0, segment1))); @@ -348,8 +352,10 @@ public class SegmentPrunerTest extends ControllerTest { // Multi-column partitioned segment. MultiPartitionColumnsSegmentPruner multiPartitionColumnsSegmentPruner = new MultiPartitionColumnsSegmentPruner(OFFLINE_TABLE_NAME, - Stream.of(PARTITION_COLUMN_1, PARTITION_COLUMN_2).collect(Collectors.toSet()), _propertyStore); - multiPartitionColumnsSegmentPruner.init(idealState, externalView, onlineSegments); + Stream.of(PARTITION_COLUMN_1, PARTITION_COLUMN_2).collect(Collectors.toSet())); + segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore); + segmentZkMetadataFetcher.register(multiPartitionColumnsSegmentPruner); + segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1, Collections.emptySet()), Collections.emptySet()); assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2, Collections.emptySet()), @@ -367,10 +373,10 @@ public class SegmentPrunerTest extends ControllerTest { Map<String, String> partitionColumn2FunctionConfig = new HashMap<>(); partitionColumn2FunctionConfig.put("columnValues", "xyz|abc"); partitionColumn2FunctionConfig.put("columnValuesDelimiter", "|"); - columnPartitionMetadataMap.put(PARTITION_COLUMN_2, - new ColumnPartitionMetadata("BoundedColumnValue", 3, Collections.singleton(1), partitionColumn2FunctionConfig)); + columnPartitionMetadataMap.put(PARTITION_COLUMN_2, new ColumnPartitionMetadata( + "BoundedColumnValue", 3, Collections.singleton(1), partitionColumn2FunctionConfig)); setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment2, columnPartitionMetadataMap); - multiPartitionColumnsSegmentPruner.onAssignmentChange(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); assertEquals( multiPartitionColumnsSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))), new HashSet<>(Arrays.asList(segment0, segment1))); @@ -399,10 +405,13 @@ public class SegmentPrunerTest extends ControllerTest { TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME); setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS); - - TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, _propertyStore); + TimeSegmentPruner segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, + _propertyStore); Set<String> onlineSegments = new HashSet<>(); - segmentPruner.init(idealState, externalView, onlineSegments); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, + _propertyStore); + segmentZkMetadataFetcher.register(segmentPruner); + segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), Collections.emptySet()); assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), Collections.emptySet()); assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), Collections.emptySet()); @@ -413,10 +422,12 @@ public class SegmentPrunerTest extends ControllerTest { // Initialize with non-empty onlineSegments // Segments without metadata (not updated yet) should not be pruned - segmentPruner = new TimeSegmentPruner(tableConfig, _propertyStore); + segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore); + segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore); + segmentZkMetadataFetcher.register(segmentPruner); String newSegment = "newSegment"; onlineSegments.add(newSegment); - segmentPruner.init(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(newSegment)), Collections.singletonList(newSegment)); assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(newSegment)), @@ -440,7 +451,7 @@ public class SegmentPrunerTest extends ControllerTest { segmentZKMetadataWithoutTimeRangeMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE); ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME, segmentZKMetadataWithoutTimeRangeMetadata); - segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); assertEquals( segmentPruner.prune(brokerRequest1, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))), Collections.singletonList(segmentWithoutTimeRangeMetadata)); @@ -476,7 +487,7 @@ public class SegmentPrunerTest extends ControllerTest { onlineSegments.add(segment2); setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, TimeUnit.DAYS); - segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), new HashSet<>(Arrays.asList(segment0, segment1, segment2))); assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), @@ -510,7 +521,7 @@ public class SegmentPrunerTest extends ControllerTest { Collections.emptySet()); // Refresh the changed segment should update the segment pruner - segmentPruner.refreshSegment(segment2); + segmentZkMetadataFetcher.refreshSegment(segment2); assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), new HashSet<>(Arrays.asList(segment0, segment1, segment2))); assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), @@ -541,7 +552,10 @@ public class SegmentPrunerTest extends ControllerTest { TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME); setSchemaDateTimeFieldSpecSDF(RAW_TABLE_NAME, SDF_PATTERN); - TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, _propertyStore); + TimeSegmentPruner segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, + _propertyStore); + segmentZkMetadataFetcher.register(segmentPruner); Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, RAW_TABLE_NAME); DateTimeFormatSpec dateTimeFormatSpec = schema.getSpecForTimeColumn(TIME_COLUMN).getFormatSpec(); @@ -561,7 +575,7 @@ public class SegmentPrunerTest extends ControllerTest { setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, dateTimeFormatSpec.fromFormatToMillis("20200401"), dateTimeFormatSpec.fromFormatToMillis("20200430"), TimeUnit.MILLISECONDS); - segmentPruner.init(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), Collections.singleton(segment0)); assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment1))); assertEquals(segmentPruner.prune(brokerRequest3, onlineSegments), Collections.singleton(segment1)); @@ -581,7 +595,10 @@ public class SegmentPrunerTest extends ControllerTest { TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME); setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS); - TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, _propertyStore); + TimeSegmentPruner segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, + _propertyStore); + segmentZkMetadataFetcher.register(segmentPruner); Set<String> onlineSegments = new HashSet<>(); String segment0 = "segment0"; onlineSegments.add(segment0); @@ -592,7 +609,7 @@ public class SegmentPrunerTest extends ControllerTest { String segment2 = "segment2"; onlineSegments.add(segment2); setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, TimeUnit.DAYS); - segmentPruner.init(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment2))); assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment1))); @@ -610,7 +627,10 @@ public class SegmentPrunerTest extends ControllerTest { TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME); // init with list of segments - EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig, _propertyStore); + EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig); + SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, + _propertyStore); + segmentZkMetadataFetcher.register(segmentPruner); Set<String> onlineSegments = new HashSet<>(); String segment0 = "segment0"; onlineSegments.add(segment0); @@ -618,7 +638,7 @@ public class SegmentPrunerTest extends ControllerTest { String segment1 = "segment1"; onlineSegments.add(segment1); setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment1, 0); - segmentPruner.init(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))), new HashSet<>(Collections.singletonList(segment0))); assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))), @@ -627,9 +647,11 @@ public class SegmentPrunerTest extends ControllerTest { new HashSet<>(Collections.singletonList(segment0))); // init with empty list of segments - segmentPruner = new EmptySegmentPruner(tableConfig, _propertyStore); + segmentPruner = new EmptySegmentPruner(tableConfig); + segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore); + segmentZkMetadataFetcher.register(segmentPruner); onlineSegments.clear(); - segmentPruner.init(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments); assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), Collections.emptySet()); assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), Collections.emptySet()); assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), Collections.emptySet()); @@ -637,7 +659,7 @@ public class SegmentPrunerTest extends ControllerTest { // Segments without metadata (not updated yet) should not be pruned String newSegment = "newSegment"; onlineSegments.add(newSegment); - segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(newSegment)), Collections.singleton(newSegment)); assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(newSegment)), @@ -654,7 +676,7 @@ public class SegmentPrunerTest extends ControllerTest { segmentZKMetadataWithoutTotalDocsMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS); ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME, segmentZKMetadataWithoutTotalDocsMetadata); - segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(segmentWithoutTotalDocsMetadata)), Collections.singleton(segmentWithoutTotalDocsMetadata)); assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(segmentWithoutTotalDocsMetadata)), @@ -667,7 +689,7 @@ public class SegmentPrunerTest extends ControllerTest { String segmentWithNegativeTotalDocsMetadata = "segmentWithNegativeTotalDocsMetadata"; onlineSegments.add(segmentWithNegativeTotalDocsMetadata); setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segmentWithNegativeTotalDocsMetadata, -1); - segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(segmentWithNegativeTotalDocsMetadata)), Collections.singleton(segmentWithNegativeTotalDocsMetadata)); assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(segmentWithNegativeTotalDocsMetadata)), @@ -685,7 +707,7 @@ public class SegmentPrunerTest extends ControllerTest { onlineSegments.add(segment2); setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment2, -1); - segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments); + segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments); assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), new HashSet<>(Arrays.asList(segment0, segment2))); assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), @@ -704,7 +726,7 @@ public class SegmentPrunerTest extends ControllerTest { new HashSet<>(Arrays.asList(segment0, segment2))); // Refresh the changed segment should update the segment pruner - segmentPruner.refreshSegment(segment2); + segmentZkMetadataFetcher.refreshSegment(segment2); assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), new HashSet<>(Collections.singletonList(segment0))); assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org