Copilot commented on code in PR #16776:
URL: https://github.com/apache/pinot/pull/16776#discussion_r2331443389


##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java:
##########
@@ -272,6 +272,191 @@ public void 
testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePar
     
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
   }
 
+  @Test
+  public void testPartitionIdRemappingLogic() {
+    ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+    Map<String, Map<String, String>> segmentAssignment = 
externalView.getRecord().getMapFields();
+    Map<String, String> onlineInstanceStateMap = ImmutableMap.of(SERVER_0, 
ONLINE, SERVER_1, ONLINE);
+    Set<String> onlineSegments = new HashSet<>();
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+
+    // Create partition metadata manager with remapping enabled (4 table 
partitions from 8 segment partitions)
+    SegmentPartitionMetadataManager partitionMetadataManager =
+        new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, 
PARTITION_COLUMN, PARTITION_COLUMN_FUNC, 4, true);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+        new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
+    segmentZkMetadataFetcher.register(partitionMetadataManager);
+
+    // Initial state should be empty
+    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
+    TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo = 
partitionMetadataManager
+        .getTablePartitionReplicatedServersInfo();
+    assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap(),
+        new TablePartitionReplicatedServersInfo.PartitionInfo[4]);
+
+    // Add segments with partition IDs 0, 4 (should both map to partition 0 
via modulo)
+    String segment0 = "segment_partition_0";
+    String segment4 = "segment_partition_4";
+    onlineSegments.add(segment0);
+    onlineSegments.add(segment4);
+    segmentAssignment.put(segment0, Collections.singletonMap(SERVER_0, 
ONLINE));
+    segmentAssignment.put(segment4, Collections.singletonMap(SERVER_1, 
ONLINE));
+
+    // Set metadata for segments with 8 total partitions (will be remapped to 
4)
+    setSegmentZKMetadata(segment0, PARTITION_COLUMN_FUNC, 8, 0, 0L);
+    setSegmentZKMetadata(segment4, PARTITION_COLUMN_FUNC, 8, 4, 0L);
+
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    tablePartitionReplicatedServersInfo = 
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+    TablePartitionReplicatedServersInfo.PartitionInfo[] partitionInfoMap = 
tablePartitionReplicatedServersInfo
+        .getPartitionInfoMap();
+
+    // Both segments should be in partition 0 (0 % 4 = 0, 4 % 4 = 0)
+    assertNull(partitionInfoMap[1]);
+    assertNull(partitionInfoMap[2]);
+    assertNull(partitionInfoMap[3]);
+    // No single server has all segments in partition 0, so 
fullyReplicatedServers should be empty
+    assertTrue(partitionInfoMap[0]._fullyReplicatedServers.isEmpty());
+    assertEqualsNoOrder(partitionInfoMap[0]._segments.toArray(), new 
String[]{segment0, segment4});
+
+    // Add segments with partition IDs 1, 5 (should both map to partition 1)
+    String segment1 = "segment_partition_1";
+    String segment5 = "segment_partition_5";
+    onlineSegments.add(segment1);
+    onlineSegments.add(segment5);
+    segmentAssignment.put(segment1, Collections.singletonMap(SERVER_0, 
ONLINE));
+    segmentAssignment.put(segment5, Collections.singletonMap(SERVER_0, 
ONLINE));
+
+    setSegmentZKMetadata(segment1, PARTITION_COLUMN_FUNC, 8, 1, 0L);
+    setSegmentZKMetadata(segment5, PARTITION_COLUMN_FUNC, 8, 5, 0L);
+
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    tablePartitionReplicatedServersInfo = 
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+    partitionInfoMap = 
tablePartitionReplicatedServersInfo.getPartitionInfoMap();
+
+    // Partition 1 should have both segments on SERVER_0, making it fully 
replicated
+    assertEquals(partitionInfoMap[1]._fullyReplicatedServers, 
Collections.singleton(SERVER_0));
+    assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new 
String[]{segment1, segment5});
+
+    // Add segments with partition IDs 2, 6 (should both map to partition 2)
+    String segment2 = "segment_partition_2";
+    String segment6 = "segment_partition_6";
+    onlineSegments.add(segment2);
+    onlineSegments.add(segment6);
+    segmentAssignment.put(segment2, ImmutableMap.of(SERVER_0, ONLINE, 
SERVER_1, ONLINE));
+    segmentAssignment.put(segment6, ImmutableMap.of(SERVER_0, ONLINE, 
SERVER_1, ONLINE));
+
+    setSegmentZKMetadata(segment2, PARTITION_COLUMN_FUNC, 8, 2, 0L);
+    setSegmentZKMetadata(segment6, PARTITION_COLUMN_FUNC, 8, 6, 0L);
+
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    tablePartitionReplicatedServersInfo = 
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+    partitionInfoMap = 
tablePartitionReplicatedServersInfo.getPartitionInfoMap();
+
+    // Partition 2 should have both segments on both servers, making both 
servers fully replicated
+    assertEquals(partitionInfoMap[2]._fullyReplicatedServers, 
ImmutableSet.of(SERVER_0, SERVER_1));
+    assertEqualsNoOrder(partitionInfoMap[2]._segments.toArray(), new 
String[]{segment2, segment6});
+
+    
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
+  }
+
+  @Test
+  public void testPartitionIdRemappingInvalidCases() {
+    ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+    Map<String, Map<String, String>> segmentAssignment = 
externalView.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>();
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+
+    // Create partition metadata manager with remapping enabled but invalid 
divisibility (3 table partitions from 8
+    // segment partitions)

Review Comment:
   [nitpick] The comment has a formatting issue with line wrapping. The comment 
should be properly formatted on a single line or with proper continuation 
indentation.
   ```suggestion
       //   segment partitions)
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java:
##########
@@ -106,14 +108,28 @@ private int getPartitionId(String segment, @Nullable 
ZNRecord znRecord) {
     if (!_partitionFunctionName.equalsIgnoreCase(partitionFunction.getName())) 
{
       return INVALID_PARTITION_ID;
     }
-    if (_numPartitions != partitionFunction.getNumPartitions()) {
-      return INVALID_PARTITION_ID;
-    }
-    Set<Integer> partitions = segmentPartitionInfo.getPartitions();
-    if (partitions.size() != 1) {
-      return INVALID_PARTITION_ID;
+    if (_allowPartitionRemapping) {
+      // Ensure segment partitions can be evenly distributed across table 
partitions.
+      // For example, 8 Kafka partitions can be remapped to 4 Pinot partitions 
(8 % 4 = 0),
+      // but 8 partitions cannot be remapped to 3 partitions (8 % 3 ≠ 0).
+      if (partitionFunction.getNumPartitions() % _numPartitions != 0) {
+        return INVALID_PARTITION_ID;
+      }
+      Set<Integer> partitions = segmentPartitionInfo.getPartitions();
+      if (partitions.size() != 1) {
+        return INVALID_PARTITION_ID;
+      }
+      return partitions.iterator().next() % _numPartitions;
+    } else {

Review Comment:
   The comment should explain why segments must have exactly one partition 
(line 119-121). This validation constraint is not obvious and would benefit 
from additional documentation explaining the business logic.



##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java:
##########
@@ -272,6 +272,191 @@ public void 
testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePar
     
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
   }
 
+  @Test
+  public void testPartitionIdRemappingLogic() {
+    ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+    Map<String, Map<String, String>> segmentAssignment = 
externalView.getRecord().getMapFields();
+    Map<String, String> onlineInstanceStateMap = ImmutableMap.of(SERVER_0, 
ONLINE, SERVER_1, ONLINE);
+    Set<String> onlineSegments = new HashSet<>();
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+
+    // Create partition metadata manager with remapping enabled (4 table 
partitions from 8 segment partitions)
+    SegmentPartitionMetadataManager partitionMetadataManager =
+        new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, 
PARTITION_COLUMN, PARTITION_COLUMN_FUNC, 4, true);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+        new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
+    segmentZkMetadataFetcher.register(partitionMetadataManager);
+
+    // Initial state should be empty
+    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
+    TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo = 
partitionMetadataManager
+        .getTablePartitionReplicatedServersInfo();
+    assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap(),
+        new TablePartitionReplicatedServersInfo.PartitionInfo[4]);
+
+    // Add segments with partition IDs 0, 4 (should both map to partition 0 
via modulo)
+    String segment0 = "segment_partition_0";
+    String segment4 = "segment_partition_4";
+    onlineSegments.add(segment0);
+    onlineSegments.add(segment4);
+    segmentAssignment.put(segment0, Collections.singletonMap(SERVER_0, 
ONLINE));
+    segmentAssignment.put(segment4, Collections.singletonMap(SERVER_1, 
ONLINE));
+
+    // Set metadata for segments with 8 total partitions (will be remapped to 
4)
+    setSegmentZKMetadata(segment0, PARTITION_COLUMN_FUNC, 8, 0, 0L);
+    setSegmentZKMetadata(segment4, PARTITION_COLUMN_FUNC, 8, 4, 0L);
+
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    tablePartitionReplicatedServersInfo = 
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+    TablePartitionReplicatedServersInfo.PartitionInfo[] partitionInfoMap = 
tablePartitionReplicatedServersInfo
+        .getPartitionInfoMap();
+
+    // Both segments should be in partition 0 (0 % 4 = 0, 4 % 4 = 0)
+    assertNull(partitionInfoMap[1]);
+    assertNull(partitionInfoMap[2]);
+    assertNull(partitionInfoMap[3]);
+    // No single server has all segments in partition 0, so 
fullyReplicatedServers should be empty
+    assertTrue(partitionInfoMap[0]._fullyReplicatedServers.isEmpty());
+    assertEqualsNoOrder(partitionInfoMap[0]._segments.toArray(), new 
String[]{segment0, segment4});
+
+    // Add segments with partition IDs 1, 5 (should both map to partition 1)
+    String segment1 = "segment_partition_1";
+    String segment5 = "segment_partition_5";
+    onlineSegments.add(segment1);
+    onlineSegments.add(segment5);
+    segmentAssignment.put(segment1, Collections.singletonMap(SERVER_0, 
ONLINE));
+    segmentAssignment.put(segment5, Collections.singletonMap(SERVER_0, 
ONLINE));
+
+    setSegmentZKMetadata(segment1, PARTITION_COLUMN_FUNC, 8, 1, 0L);
+    setSegmentZKMetadata(segment5, PARTITION_COLUMN_FUNC, 8, 5, 0L);
+
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    tablePartitionReplicatedServersInfo = 
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+    partitionInfoMap = 
tablePartitionReplicatedServersInfo.getPartitionInfoMap();
+
+    // Partition 1 should have both segments on SERVER_0, making it fully 
replicated
+    assertEquals(partitionInfoMap[1]._fullyReplicatedServers, 
Collections.singleton(SERVER_0));
+    assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new 
String[]{segment1, segment5});
+
+    // Add segments with partition IDs 2, 6 (should both map to partition 2)
+    String segment2 = "segment_partition_2";
+    String segment6 = "segment_partition_6";
+    onlineSegments.add(segment2);
+    onlineSegments.add(segment6);
+    segmentAssignment.put(segment2, ImmutableMap.of(SERVER_0, ONLINE, 
SERVER_1, ONLINE));
+    segmentAssignment.put(segment6, ImmutableMap.of(SERVER_0, ONLINE, 
SERVER_1, ONLINE));
+
+    setSegmentZKMetadata(segment2, PARTITION_COLUMN_FUNC, 8, 2, 0L);
+    setSegmentZKMetadata(segment6, PARTITION_COLUMN_FUNC, 8, 6, 0L);
+
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    tablePartitionReplicatedServersInfo = 
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+    partitionInfoMap = 
tablePartitionReplicatedServersInfo.getPartitionInfoMap();
+
+    // Partition 2 should have both segments on both servers, making both 
servers fully replicated
+    assertEquals(partitionInfoMap[2]._fullyReplicatedServers, 
ImmutableSet.of(SERVER_0, SERVER_1));
+    assertEqualsNoOrder(partitionInfoMap[2]._segments.toArray(), new 
String[]{segment2, segment6});
+
+    
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
+  }
+
+  @Test
+  public void testPartitionIdRemappingInvalidCases() {
+    ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+    Map<String, Map<String, String>> segmentAssignment = 
externalView.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>();
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+
+    // Create partition metadata manager with remapping enabled but invalid 
divisibility (3 table partitions from 8
+    // segment partitions)
+    SegmentPartitionMetadataManager partitionMetadataManager =
+        new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, 
PARTITION_COLUMN, PARTITION_COLUMN_FUNC, 3, true);
+    SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+        new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
+    segmentZkMetadataFetcher.register(partitionMetadataManager);
+
+    segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
+
+    // Add segment with partition ID from 8-partition scheme (should be 
invalid because 8 % 3 != 0)
+    String invalidSegment = "invalid_segment";
+    onlineSegments.add(invalidSegment);
+    segmentAssignment.put(invalidSegment, Collections.singletonMap(SERVER_0, 
ONLINE));
+    setSegmentZKMetadata(invalidSegment, PARTITION_COLUMN_FUNC, 8, 0, 0L);
+
+    segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, 
onlineSegments);
+    TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo = 
partitionMetadataManager
+        .getTablePartitionReplicatedServersInfo();
+
+    // Segment should be marked as invalid due to non-divisible partition count
+    
assertEquals(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition(),
+        Collections.singletonList(invalidSegment));
+    assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap(),
+        new TablePartitionReplicatedServersInfo.PartitionInfo[3]);
+  }
+
+  @Test
+  public void testPartitionIdRemappingDisabled() {
+    ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+    Map<String, Map<String, String>> segmentAssignment = 
externalView.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>();
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+
+    // Create partition metadata manager with remapping DISABLED (4 table 
partitions, remapping = false)

Review Comment:
   [nitpick] Using 'DISABLED' in all caps in comments is inconsistent with the 
rest of the codebase style. Consider using 'disabled' in lowercase for 
consistency.
   ```suggestion
       // Create partition metadata manager with remapping disabled (4 table 
partitions, remapping = false)
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ColumnPartitionConfig.java:
##########
@@ -30,20 +30,27 @@ public class ColumnPartitionConfig extends BaseJsonConfig {
   private final String _functionName;
   private final int _numPartitions;
   private final Map<String, String> _functionConfig;
+  private final boolean _allowPartitionRemapping;
 
   public ColumnPartitionConfig(String functionName, int numPartitions) {
-    this(functionName, numPartitions, null);
+    this(functionName, numPartitions, null, null);
+  }
+
+  public ColumnPartitionConfig(String functionName, int numPartitions, 
@Nullable Map<String, String> functionConfig) {
+    this(functionName, numPartitions, functionConfig, null);
   }

Review Comment:
   The constructor overload should include the ASF license header comment 
explaining the parameter usage, specifically for the allowPartitionRemapping 
parameter being defaulted to null/false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to