This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 575398dae0 Remove special routing handling for multiple consuming 
segments (#11371)
575398dae0 is described below

commit 575398dae0841ba152c31306cb460cfdc776b04b
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Thu Aug 17 10:12:47 2023 -0700

    Remove special routing handling for multiple consuming segments (#11371)
---
 .../instanceselector/BaseInstanceSelector.java     |  20 ++-
 .../StrictReplicaGroupInstanceSelector.java        |  24 ++-
 ...ntSelector.java => DefaultSegmentSelector.java} |  10 +-
 .../segmentselector/RealtimeSegmentSelector.java   | 163 ---------------------
 .../routing/segmentselector/SegmentSelector.java   |  12 --
 .../segmentselector/SegmentSelectorFactory.java    |   7 +-
 .../segmentselector/SegmentSelectorTest.java       | 134 -----------------
 .../common/utils/config/QueryOptionsUtils.java     |   6 -
 .../apache/pinot/spi/utils/CommonConstants.java    |   1 -
 9 files changed, 35 insertions(+), 342 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 667cc766d8..c9219c3ea3 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -156,6 +156,8 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
         newSegmentPushTimeMap.put(segmentZKMetadata.getSegmentName(), 
pushTimeMillis);
       }
     }
+    LOGGER.info("Got {} new segments: {} for table: {} by reading ZK metadata, 
current time: {}",
+        newSegmentPushTimeMap.size(), newSegmentPushTimeMap, 
_tableNameWithType, nowMillis);
     return newSegmentPushTimeMap;
   }
 
@@ -294,9 +296,9 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
         for (SegmentInstanceCandidate candidate : candidates) {
           candidateInstances.add(candidate.getInstance());
         }
-        LOGGER.warn(
-            "Failed to find servers hosting segment: {} for table: {} (all 
candidate instances: {} are disabled, "
-                + "counting segment as unavailable)", segment, 
_tableNameWithType, candidateInstances);
+        LOGGER.warn("Failed to find servers hosting old segment: {} for table: 
{} "
+                + "(all candidate instances: {} are disabled, counting segment 
as unavailable)", segment,
+            _tableNameWithType, candidateInstances);
         unavailableSegments.add(segment);
         _brokerMetrics.addMeteredTableValue(_tableNameWithType, 
BrokerMeter.NO_SERVING_HOST_FOR_SEGMENT, 1);
       }
@@ -314,8 +316,16 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
       }
       if (!enabledCandidates.isEmpty()) {
         instanceCandidatesMap.put(segment, enabledCandidates);
+      } else {
+        // Do not count new segment as unavailable
+        List<String> candidateInstances = new ArrayList<>(candidates.size());
+        for (SegmentInstanceCandidate candidate : candidates) {
+          candidateInstances.add(candidate.getInstance());
+        }
+        LOGGER.info("Failed to find servers hosting new segment: {} for table: 
{} "
+                + "(all candidate instances: {} are disabled, but not counting 
new segment as unavailable)", segment,
+            _tableNameWithType, candidateInstances);
       }
-      // Do not count new segment as unavailable
     }
 
     _segmentStates = new SegmentStates(instanceCandidatesMap, 
unavailableSegments);
@@ -377,6 +387,8 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
         }
       }
     }
+    LOGGER.info("Got {} new segments: {} for table: {} by processing existing 
states, current time: {}",
+        newSegmentPushTimeMap.size(), newSegmentPushTimeMap, 
_tableNameWithType, nowMillis);
     return newSegmentPushTimeMap;
   }
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index b00cf851c7..ddb83ce282 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -34,6 +34,8 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.HashUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -66,6 +68,7 @@ import org.apache.pinot.common.utils.HashUtil;
  * </pre>
  */
 public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSelector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StrictReplicaGroupInstanceSelector.class);
 
   public StrictReplicaGroupInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
       BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock) {
@@ -122,12 +125,19 @@ public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSele
     Map<Set<String>, Set<String>> unavailableInstancesMap = new HashMap<>();
     for (Map.Entry<String, Set<String>> entry : 
oldSegmentToOnlineInstancesMap.entrySet()) {
       String segment = entry.getKey();
-      Set<String> instancesInIdealState = 
idealStateAssignment.get(segment).keySet();
+      Set<String> onlineInstances = entry.getValue();
+      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+      Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet();
       Set<String> unavailableInstances =
           unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k -> 
new HashSet<>());
       for (String instance : instancesInIdealState) {
-        if (!entry.getValue().contains(instance)) {
-          unavailableInstances.add(instance);
+        if (!onlineInstances.contains(instance)) {
+          if (unavailableInstances.add(instance)) {
+            LOGGER.warn(
+                "Found unavailable instance: {} in instance group: {} for 
segment: {}, table: {} (IS: {}, EV: {})",
+                instance, instancesInIdealState, segment, _tableNameWithType, 
idealStateInstanceStateMap,
+                externalViewAssignment.get(segment));
+          }
         }
       }
     }
@@ -138,8 +148,7 @@ public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSele
       // NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted)
       Set<String> onlineInstances = entry.getValue();
       Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
-      Set<String> unavailableInstances =
-          
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(), 
Collections.emptySet());
+      Set<String> unavailableInstances = 
unavailableInstancesMap.get(idealStateInstanceStateMap.keySet());
       List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(onlineInstances.size());
       for (String instance : onlineInstances) {
         if (!unavailableInstances.contains(instance)) {
@@ -156,9 +165,8 @@ public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSele
       Set<String> unavailableInstances =
           
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(), 
Collections.emptySet());
       List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(idealStateInstanceStateMap.size());
-      for (Map.Entry<String, String> instanceStateEntry : 
convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
-        String instance = instanceStateEntry.getKey();
-        if (!unavailableInstances.contains(instance) && 
isOnlineForRouting(instanceStateEntry.getValue())) {
+      for (String instance : 
convertToSortedMap(idealStateInstanceStateMap).keySet()) {
+        if (!unavailableInstances.contains(instance)) {
           candidates.add(new SegmentInstanceCandidate(instance, 
onlineInstances.contains(instance)));
         }
       }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/DefaultSegmentSelector.java
similarity index 82%
rename from 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
rename to 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/DefaultSegmentSelector.java
index 32f1cd963d..069ce53da3 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/DefaultSegmentSelector.java
@@ -25,22 +25,16 @@ import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 
 
-/**
- * Segment selector for offline table.
- */
-public class OfflineSegmentSelector implements SegmentSelector {
+public class DefaultSegmentSelector implements SegmentSelector {
   private volatile Set<String> _segments;
 
   @Override
   public void init(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments) {
-    onAssignmentChange(idealState, externalView, onlineSegments);
+    _segments = Collections.unmodifiableSet(onlineSegments);
   }
 
   @Override
   public void onAssignmentChange(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments) {
-    // TODO: for new added segments, before all replicas are up, consider not 
selecting them to avoid causing
-    //       hotspot servers
-
     _segments = Collections.unmodifiableSet(onlineSegments);
   }
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
deleted file mode 100644
index 72d44ffd81..0000000000
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.routing.segmentselector;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.HLCSegmentName;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.SegmentName;
-import org.apache.pinot.common.utils.config.QueryOptionsUtils;
-import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-
-
-/**
- * Segment selector for real-time table which handles the following scenarios:
- * <ul>
- *   <li>When HLC and LLC segments coexist (during LLC migration), select only 
HLC segments or LLC segments</li>
- *   <li>For HLC segments, only select segments in one group</li>
- *   <li>
- *     For LLC segments, only select the first CONSUMING segment for each 
partition to avoid duplicate data because in
- *     certain unlikely degenerate scenarios, we can consume overlapping data 
until segments are flushed (at which point
- *     the overlapping data is discarded during the reconciliation process 
with the controller).
- *   </li>
- * </ul>
- */
-public class RealtimeSegmentSelector implements SegmentSelector {
-  private final AtomicLong _requestId = new AtomicLong();
-  private volatile List<Set<String>> _hlcSegments;
-  private volatile Set<String> _llcSegments;
-
-  @Override
-  public void init(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments) {
-    onAssignmentChange(idealState, externalView, onlineSegments);
-  }
-
-  @Override
-  public void onAssignmentChange(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments) {
-    // Group HLC segments by their group id
-    // NOTE: Use TreeMap so that group ids are sorted and the result is 
deterministic
-    Map<String, Set<String>> groupIdToHLCSegmentsMap = new TreeMap<>();
-
-    List<String> completedLLCSegments = new ArrayList<>();
-    // Store the first CONSUMING segment for each partition
-    Map<Integer, LLCSegmentName> partitionIdToFirstConsumingLLCSegmentMap = 
new HashMap<>();
-
-    // Iterate over the external view instead of the online segments so that 
the map lookups are performed on the
-    // HashSet instead of the TreeSet for performance. For LLC segments, we 
need the external view to figure out whether
-    // the segments are in CONSUMING state. For the goal of segment selector, 
we should not exclude segments not in the
-    // external view, but it is okay to exclude them as there is no way to 
route them without instance states in
-    // external view.
-    // - New added segment might only exist in ideal state
-    // - New removed segment might only exist in external view
-    for (Map.Entry<String, Map<String, String>> entry : 
externalView.getRecord().getMapFields().entrySet()) {
-      String segment = entry.getKey();
-      if (!onlineSegments.contains(segment)) {
-        continue;
-      }
-
-      // TODO: for new added segments, before all replicas are up, consider 
not selecting them to avoid causing
-      //       hotspot servers
-
-      Map<String, String> instanceStateMap = entry.getValue();
-      if (SegmentName.isHighLevelConsumerSegmentName(segment)) {
-        HLCSegmentName hlcSegmentName = new HLCSegmentName(segment);
-        groupIdToHLCSegmentsMap.computeIfAbsent(hlcSegmentName.getGroupId(), k 
-> new HashSet<>()).add(segment);
-      } else {
-        if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
-          // Keep the first CONSUMING segment for each partition
-          LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
-          partitionIdToFirstConsumingLLCSegmentMap
-              .compute(llcSegmentName.getPartitionGroupId(), (k, 
consumingSegment) -> {
-                if (consumingSegment == null) {
-                  return llcSegmentName;
-                } else {
-                  if (llcSegmentName.getSequenceNumber() < 
consumingSegment.getSequenceNumber()) {
-                    return llcSegmentName;
-                  } else {
-                    return consumingSegment;
-                  }
-                }
-              });
-        } else {
-          completedLLCSegments.add(segment);
-        }
-      }
-    }
-
-    int numHLCGroups = groupIdToHLCSegmentsMap.size();
-    if (numHLCGroups != 0) {
-      List<Set<String>> hlcSegments = new ArrayList<>(numHLCGroups);
-      for (Set<String> hlcSegmentsForGroup : groupIdToHLCSegmentsMap.values()) 
{
-        hlcSegments.add(Collections.unmodifiableSet(hlcSegmentsForGroup));
-      }
-      _hlcSegments = hlcSegments;
-    } else {
-      _hlcSegments = null;
-    }
-
-    if (!completedLLCSegments.isEmpty() || 
!partitionIdToFirstConsumingLLCSegmentMap.isEmpty()) {
-      Set<String> llcSegments =
-          new HashSet<>(completedLLCSegments.size() + 
partitionIdToFirstConsumingLLCSegmentMap.size());
-      llcSegments.addAll(completedLLCSegments);
-      for (LLCSegmentName llcSegmentName : 
partitionIdToFirstConsumingLLCSegmentMap.values()) {
-        llcSegments.add(llcSegmentName.getSegmentName());
-      }
-      _llcSegments = Collections.unmodifiableSet(llcSegments);
-    } else {
-      _llcSegments = null;
-    }
-  }
-
-  @Override
-  public Set<String> select(BrokerRequest brokerRequest) {
-    if (_hlcSegments == null && _llcSegments == null) {
-      return Collections.emptySet();
-    }
-    if (_hlcSegments == null) {
-      return selectLLCSegments();
-    }
-    if (_llcSegments == null) {
-      return selectHLCSegments();
-    }
-
-    // Handle HLC and LLC coexisting scenario, select HLC segments only if it 
is forced in the routing options
-    return 
QueryOptionsUtils.isRoutingForceHLC(brokerRequest.getPinotQuery().getQueryOptions())
 ? selectHLCSegments()
-        : selectLLCSegments();
-  }
-
-  private Set<String> selectHLCSegments() {
-    List<Set<String>> hlcSegments = _hlcSegments;
-    return hlcSegments.get((int) (_requestId.getAndIncrement() % 
hlcSegments.size()));
-  }
-
-  private Set<String> selectLLCSegments() {
-    return _llcSegments;
-  }
-}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
index 5c12bb961f..76c0ba3b37 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
@@ -28,18 +28,6 @@ import org.apache.pinot.common.request.BrokerRequest;
 /**
  * The segment selector selects the segments for the query. The segments 
selected should cover the whole dataset (table)
  * without overlap.
- * <p>Segment selector examples:
- * <ul>
- *   <li>
- *     For real-time table, when HLC and LLC segments coexist (during LLC 
migration), select only HLC segments or LLC
- *     segments
- *   </li>
- *   <li>For HLC real-time table, select segments in one group</li>
- *   <li>
- *     For table with segment merge/rollup enabled, select the merged segments 
over the original segments with the same
- *     data
- *   </li>
- * </ul>
  */
 public interface SegmentSelector {
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorFactory.java
index 4085a54715..f6bd0ce24b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorFactory.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.broker.routing.segmentselector;
 
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
 
 
 public class SegmentSelectorFactory {
@@ -27,10 +26,6 @@ public class SegmentSelectorFactory {
   }
 
   public static SegmentSelector getSegmentSelector(TableConfig tableConfig) {
-    if (tableConfig.getTableType() == TableType.OFFLINE) {
-      return new OfflineSegmentSelector();
-    } else {
-      return new RealtimeSegmentSelector();
-    }
+    return new DefaultSegmentSelector();
   }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
deleted file mode 100644
index e3ec929d59..0000000000
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.routing.segmentselector;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.common.utils.HLCSegmentName;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
-import org.testng.annotations.Test;
-
-import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
-import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEqualsNoOrder;
-import static org.testng.Assert.assertTrue;
-
-
-public class SegmentSelectorTest {
-
-  @Test
-  public void testSegmentSelectorFactory() {
-    TableConfig tableConfig = mock(TableConfig.class);
-
-    when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
-    assertTrue(SegmentSelectorFactory.getSegmentSelector(tableConfig) 
instanceof OfflineSegmentSelector);
-
-    when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
-    assertTrue(SegmentSelectorFactory.getSegmentSelector(tableConfig) 
instanceof RealtimeSegmentSelector);
-  }
-
-  @Test
-  public void testRealtimeSegmentSelector() {
-    String realtimeTableName = "testTable_REALTIME";
-    ExternalView externalView = new ExternalView(realtimeTableName);
-    Map<String, Map<String, String>> segmentAssignment = 
externalView.getRecord().getMapFields();
-    Map<String, String> onlineInstanceStateMap = 
Collections.singletonMap("server", ONLINE);
-    Map<String, String> consumingInstanceStateMap = 
Collections.singletonMap("server", CONSUMING);
-    Set<String> onlineSegments = new HashSet<>();
-    // NOTE: Ideal state is not used in the current implementation
-    IdealState idealState = mock(IdealState.class);
-
-    // Should return an empty list when there is no segment
-    RealtimeSegmentSelector segmentSelector = new RealtimeSegmentSelector();
-    segmentSelector.init(idealState, externalView, onlineSegments);
-    BrokerRequest brokerRequest = mock(BrokerRequest.class);
-    PinotQuery pinotQuery = mock(PinotQuery.class);
-    when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
-    assertTrue(segmentSelector.select(brokerRequest).isEmpty());
-
-    // For HLC segments, only one group of segments should be selected
-    int numHLCGroups = 3;
-    int numHLCSegmentsPerGroup = 5;
-    String[][] hlcSegments = new String[numHLCGroups][];
-    for (int i = 0; i < numHLCGroups; i++) {
-      String groupId = "testTable_REALTIME_" + i;
-      String[] hlcSegmentsForGroup = new String[numHLCSegmentsPerGroup];
-      for (int j = 0; j < numHLCSegmentsPerGroup; j++) {
-        String hlcSegment = new HLCSegmentName(groupId, "0", 
Integer.toString(j)).getSegmentName();
-        segmentAssignment.put(hlcSegment, onlineInstanceStateMap);
-        onlineSegments.add(hlcSegment);
-        hlcSegmentsForGroup[j] = hlcSegment;
-      }
-      hlcSegments[i] = hlcSegmentsForGroup;
-    }
-    segmentSelector.onAssignmentChange(idealState, externalView, 
onlineSegments);
-
-    // Only HLC segments exist, should select the HLC segments from the first 
group
-    assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(), 
hlcSegments[0]);
-
-    // For LLC segments, only the first CONSUMING segment for each partition 
should be selected
-    int numLLCPartitions = 3;
-    int numLLCSegmentsPerPartition = 5;
-    int numOnlineLLCSegmentsPerPartition = 3;
-    String[] expectedSelectedLLCSegments = new String[numLLCPartitions * 
(numLLCSegmentsPerPartition - 1)];
-    for (int i = 0; i < numLLCPartitions; i++) {
-      for (int j = 0; j < numLLCSegmentsPerPartition; j++) {
-        String llcSegment = new LLCSegmentName(realtimeTableName, i, j, 
0).getSegmentName();
-        if (j < numOnlineLLCSegmentsPerPartition) {
-          externalView.setStateMap(llcSegment, onlineInstanceStateMap);
-        } else {
-          externalView.setStateMap(llcSegment, consumingInstanceStateMap);
-        }
-        onlineSegments.add(llcSegment);
-        if (j < numLLCSegmentsPerPartition - 1) {
-          expectedSelectedLLCSegments[i * (numLLCSegmentsPerPartition - 1) + 
j] = llcSegment;
-        }
-      }
-    }
-    segmentSelector.onAssignmentChange(idealState, externalView, 
onlineSegments);
-
-    // Both HLC and LLC segments exist, should select the LLC segments
-    assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(), 
expectedSelectedLLCSegments);
-
-    // When HLC is forced, should select the HLC segments from the second group
-    when(pinotQuery.getQueryOptions()).thenReturn(
-        Collections.singletonMap(Request.QueryOptionKey.ROUTING_OPTIONS, 
Request.QueryOptionValue.ROUTING_FORCE_HLC));
-    assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(), 
hlcSegments[1]);
-
-    // Remove all the HLC segments from ideal state, should select the LLC 
segments even when HLC is forced
-    for (String[] hlcSegmentsForGroup : hlcSegments) {
-      for (String hlcSegment : hlcSegmentsForGroup) {
-        onlineSegments.remove(hlcSegment);
-      }
-    }
-    segmentSelector.onAssignmentChange(idealState, externalView, 
onlineSegments);
-    assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(), 
expectedSelectedLLCSegments);
-  }
-}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 2682eb927a..5b239fde33 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
-import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionValue;
 
 
 /**
@@ -118,11 +117,6 @@ public class QueryOptionsUtils {
     return 
"false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_STAR_TREE));
   }
 
-  public static boolean isRoutingForceHLC(Map<String, String> queryOptions) {
-    String routingOptions = queryOptions.get(QueryOptionKey.ROUTING_OPTIONS);
-    return routingOptions != null && 
routingOptions.toUpperCase().contains(QueryOptionValue.ROUTING_FORCE_HLC);
-  }
-
   public static boolean isSkipScanFilterReorder(Map<String, String> 
queryOptions) {
     return 
"false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION));
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 46b2c5148e..1e10152cfb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -360,7 +360,6 @@ public class CommonConstants {
       }
 
       public static class QueryOptionValue {
-        public static final String ROUTING_FORCE_HLC = "FORCE_HLC";
         public static final String DEFAULT_IN_PREDICATE_SORT_THRESHOLD = 
"1000";
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to