This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7247da84bc When reading server to segments map, exclude OFFLINE segments (#11818) 7247da84bc is described below commit 7247da84bcb51c49c8e45354d3e9c2ec05e3e8df Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon Oct 16 19:19:18 2023 -0700 When reading server to segments map, exclude OFFLINE segments (#11818) --- .../api/resources/PinotSegmentRestletResource.java | 8 ++-- .../helix/core/PinotHelixResourceManager.java | 52 +++++++--------------- .../util/ConsumingSegmentInfoReader.java | 8 ++-- .../ConsumingSegmentInfoReaderStatelessTest.java | 6 +-- 4 files changed, 28 insertions(+), 46 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 33d4b65b5d..7a5961c680 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -34,12 +34,12 @@ import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.Executor; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -661,13 +661,13 @@ public class PinotSegmentRestletResource { controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME); if (singleSegmentName != null) { // No need to query servers where this segment is not supposed to be hosted - serverToSegments = new HashMap<>(); - List<String> segmentList = Arrays.asList(singleSegmentName); + serverToSegments = new TreeMap<>(); + List<String> segmentList = Collections.singletonList(singleSegmentName); _pinotHelixResourceManager.getServers(tableNameWithType, singleSegmentName).forEach(server -> { serverToSegments.put(server, segmentList); }); } else { - serverToSegments = _pinotHelixResourceManager.getServerToOnlineSegmentsMap(tableNameWithType); + serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); } BiMap<String, String> serverEndPoints = diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index c3fde8a85d..bfb4918bf7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2651,29 +2651,8 @@ public class PinotHelixResourceManager { } /** - * Returns a map from server instance to list of online segments it serves for the given table. - */ - public Map<String, List<String>> getServerToOnlineSegmentsMap(String tableNameWithType) { - Map<String, List<String>> serverToSegmentsMap = new TreeMap<>(); - IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); - if (idealState == null) { - throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType); - } - for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) { - String segmentName = entry.getKey(); - for (Map.Entry<String, String> e : entry.getValue().entrySet()) { - String server = e.getKey(); - String status = e.getValue(); - if (status.equals(SegmentStateModel.CONSUMING) || status.equals(SegmentStateModel.ONLINE)) { - serverToSegmentsMap.computeIfAbsent(server, key -> new ArrayList<>()).add(segmentName); - } - } - } - return serverToSegmentsMap; - } - - /** - * Returns a map from server instance to list of segments it serves for the given table. + * Returns a map from server instance to list of segments it serves for the given table. Ignore OFFLINE segments from + * the ideal state because they are not supposed to be served. */ public Map<String, List<String>> getServerToSegmentsMap(String tableNameWithType) { Map<String, List<String>> serverToSegmentsMap = new TreeMap<>(); @@ -2683,15 +2662,18 @@ public class PinotHelixResourceManager { } for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) { String segmentName = entry.getKey(); - for (String server : entry.getValue().keySet()) { - serverToSegmentsMap.computeIfAbsent(server, key -> new ArrayList<>()).add(segmentName); + for (Map.Entry<String, String> instanceStateEntry : entry.getValue().entrySet()) { + if (!instanceStateEntry.getValue().equals(SegmentStateModel.OFFLINE)) { + serverToSegmentsMap.computeIfAbsent(instanceStateEntry.getKey(), key -> new ArrayList<>()).add(segmentName); + } } } return serverToSegmentsMap; } /** - * Returns a set of server instances for a given table and segment + * Returns a set of server instances for a given table and segment. Ignore OFFLINE segments from the ideal state + * because they are not supposed to be served. */ public Set<String> getServers(String tableNameWithType, String segmentName) { IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); @@ -2701,7 +2683,13 @@ public class PinotHelixResourceManager { Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName); Preconditions.checkState(instanceStateMap != null, "Segment: {} does not exist in the ideal state of table: {}", segmentName, tableNameWithType); - return instanceStateMap.keySet(); + Set<String> servers = new TreeSet<>(); + for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { + if (!entry.getValue().equals(SegmentStateModel.OFFLINE)) { + servers.add(entry.getKey()); + } + } + return servers; } /** @@ -2722,15 +2710,9 @@ public class PinotHelixResourceManager { return consumingSegments; } - /** - * Utility function to return set of servers corresponding to a given segment. - */ + @Deprecated public Set<String> getServersForSegment(String tableNameWithType, String segmentName) { - IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); - if (idealState == null) { - throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType); - } - return new HashSet<>(idealState.getInstanceStateMap(segmentName).keySet()); + return getServers(tableNameWithType, segmentName); } public synchronized Map<String, String> getSegmentsCrcForTable(String tableNameWithType) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java index c8ee931580..26dab01955 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java @@ -157,13 +157,13 @@ public class ConsumingSegmentInfoReader { } // Check if any responses are missing - Set<String> serversForSegment = _pinotHelixResourceManager.getServersForSegment(tableNameWithType, segmentName); - if (serversForSegment.size() != consumingSegmentInfoList.size()) { + Set<String> servers = _pinotHelixResourceManager.getServers(tableNameWithType, segmentName); + if (servers.size() != consumingSegmentInfoList.size()) { Set<String> serversResponded = consumingSegmentInfoList.stream().map(c -> c._serverName).collect(Collectors.toSet()); - serversForSegment.removeAll(serversResponded); + servers.removeAll(serversResponded); String errorMessage = - "Not all servers responded for segment: " + segmentName + " Missing servers : " + serversForSegment; + "Not all servers responded for segment: " + segmentName + " Missing servers : " + servers; return TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.UNHEALTHY, errorMessage); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java index 3e8d935813..9c29142b46 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java @@ -28,10 +28,10 @@ import java.io.OutputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -197,11 +197,11 @@ public class ConsumingSegmentInfoReaderStatelessTest { private void mockSetup(final String[] servers, final Set<String> consumingSegments) throws InvalidConfigException { when(_helix.getServerToSegmentsMap(anyString())).thenAnswer(invocationOnMock -> subsetOfServerSegments(servers)); + when(_helix.getServers(anyString(), anyString())).thenAnswer( + invocationOnMock -> new TreeSet<>(Arrays.asList(servers))); when(_helix.getDataInstanceAdminEndpoints(ArgumentMatchers.anySet())).thenAnswer( invocationOnMock -> serverEndpoints(servers)); when(_helix.getConsumingSegments(anyString())).thenAnswer(invocationOnMock -> consumingSegments); - when(_helix.getServersForSegment(anyString(), anyString())).thenAnswer( - invocationOnMock -> new HashSet<>(Arrays.asList(servers))); } private ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner(final String[] servers, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org