J-HowHuang commented on code in PR #17215:
URL: https://github.com/apache/pinot/pull/17215#discussion_r2653889873
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -124,8 +127,69 @@ public Pair<Map<String, String>, Map<String, String>>
select(List<String> segmen
* }
* }
*/
- private Pair<Map<String, String>, Map<String, String>> assign(Set<String>
segments,
- SegmentStates segmentStates, InstancePartitions instancePartitions, int
preferredReplicaId) {
+ private Pair<Map<String, String>, Map<String, String>> assign(Set<String>
segments, SegmentStates segmentStates,
+ InstancePartitions instancePartitions, int preferredReplicaId) {
+ Pair<Map<String, String>, Map<String, String>> assignment =
+ assignUsingIdealStateMetadata(segments, segmentStates,
instancePartitions, preferredReplicaId);
+
+ if (assignment != null) {
+ return assignment;
+ } else {
+ return assignUsingInstancePartitions(segments, segmentStates,
instancePartitions, preferredReplicaId);
+ }
+ }
+
+ @Nullable
+ private Pair<Map<String, String>, Map<String, String>>
assignUsingIdealStateMetadata(Set<String> segments,
+ SegmentStates segmentStates, InstancePartitions instancePartitions, int
preferredReplicaId) {
+ Set<String> segmentsToAssign = Sets.newHashSet(segments);
+ int numReplicaGroups = instancePartitions.getNumReplicaGroups();
+
+ Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+
+ for (int replicaGroupOffset = 0; replicaGroupOffset < numReplicaGroups;
replicaGroupOffset++) {
+ int selectedReplicaGroup = (replicaGroupOffset + preferredReplicaId) %
numReplicaGroups;
+
+ // Check if the replica group can serve all the required segments
+ for (Iterator<String> iterator = segmentsToAssign.iterator();
iterator.hasNext();) {
+ String segment = iterator.next();
+ List<SegmentInstanceCandidate> candidates =
segmentStates.getCandidates(segment);
+ if (CollectionUtils.isEmpty(candidates)) {
+ if (isNewLLCSegment(segment)) {
+ // New segments might not be tracked in segmentStates yet.
+ iterator.remove();
+ continue;
+ } else {
+ throw new IllegalStateException(String.format("Failed to find
servers for segment: %s", segment));
+ }
+ }
+
+ for (SegmentInstanceCandidate candidate : candidates) {
+ if (candidate.getReplicaGroupId() == FALLBACK_REPLICA_GROUP_ID) {
+ // Ideal state instance partitions metadata unavailable fallback
to older selection logic
+ return null;
+ }
+ if (candidate.getReplicaGroupId() == selectedReplicaGroup) {
+ segmentToSelectedInstanceMap.put(segment, candidate.getInstance());
+ iterator.remove();
+ break;
+ }
+ }
+ }
+
+ // All segments can be served. If not, continue attempting to assign the
remaining segments using other replica
+ // groups.
Review Comment:
I'm trying to understand the difference between two paths. This part seems
to have different behavior than the old assignment?
In `assignUsingInstancePartitions`, it makes sure that all segments of the
same partition get assigned within the same RG. But it's not doing the same in
`assignUsingIdealStateMetadata` here? It's trying to assign the current RG to
as many segments as it could, then assign other RGs to the remaining segments.
Is this expected?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]