Copilot commented on code in PR #16776:
URL: https://github.com/apache/pinot/pull/16776#discussion_r2331443389
##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java:
##########
@@ -272,6 +272,191 @@ public void
testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePar
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
}
+ @Test
+ public void testPartitionIdRemappingLogic() {
+ ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+ Map<String, Map<String, String>> segmentAssignment =
externalView.getRecord().getMapFields();
+ Map<String, String> onlineInstanceStateMap = ImmutableMap.of(SERVER_0,
ONLINE, SERVER_1, ONLINE);
+ Set<String> onlineSegments = new HashSet<>();
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+
+ // Create partition metadata manager with remapping enabled (4 table
partitions from 8 segment partitions)
+ SegmentPartitionMetadataManager partitionMetadataManager =
+ new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME,
PARTITION_COLUMN, PARTITION_COLUMN_FUNC, 4, true);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
+ segmentZkMetadataFetcher.register(partitionMetadataManager);
+
+ // Initial state should be empty
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
+ TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo =
partitionMetadataManager
+ .getTablePartitionReplicatedServersInfo();
+ assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap(),
+ new TablePartitionReplicatedServersInfo.PartitionInfo[4]);
+
+ // Add segments with partition IDs 0, 4 (should both map to partition 0
via modulo)
+ String segment0 = "segment_partition_0";
+ String segment4 = "segment_partition_4";
+ onlineSegments.add(segment0);
+ onlineSegments.add(segment4);
+ segmentAssignment.put(segment0, Collections.singletonMap(SERVER_0,
ONLINE));
+ segmentAssignment.put(segment4, Collections.singletonMap(SERVER_1,
ONLINE));
+
+ // Set metadata for segments with 8 total partitions (will be remapped to
4)
+ setSegmentZKMetadata(segment0, PARTITION_COLUMN_FUNC, 8, 0, 0L);
+ setSegmentZKMetadata(segment4, PARTITION_COLUMN_FUNC, 8, 4, 0L);
+
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ tablePartitionReplicatedServersInfo =
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+ TablePartitionReplicatedServersInfo.PartitionInfo[] partitionInfoMap =
tablePartitionReplicatedServersInfo
+ .getPartitionInfoMap();
+
+ // Both segments should be in partition 0 (0 % 4 = 0, 4 % 4 = 0)
+ assertNull(partitionInfoMap[1]);
+ assertNull(partitionInfoMap[2]);
+ assertNull(partitionInfoMap[3]);
+ // No single server has all segments in partition 0, so
fullyReplicatedServers should be empty
+ assertTrue(partitionInfoMap[0]._fullyReplicatedServers.isEmpty());
+ assertEqualsNoOrder(partitionInfoMap[0]._segments.toArray(), new
String[]{segment0, segment4});
+
+ // Add segments with partition IDs 1, 5 (should both map to partition 1)
+ String segment1 = "segment_partition_1";
+ String segment5 = "segment_partition_5";
+ onlineSegments.add(segment1);
+ onlineSegments.add(segment5);
+ segmentAssignment.put(segment1, Collections.singletonMap(SERVER_0,
ONLINE));
+ segmentAssignment.put(segment5, Collections.singletonMap(SERVER_0,
ONLINE));
+
+ setSegmentZKMetadata(segment1, PARTITION_COLUMN_FUNC, 8, 1, 0L);
+ setSegmentZKMetadata(segment5, PARTITION_COLUMN_FUNC, 8, 5, 0L);
+
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ tablePartitionReplicatedServersInfo =
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+ partitionInfoMap =
tablePartitionReplicatedServersInfo.getPartitionInfoMap();
+
+ // Partition 1 should have both segments on SERVER_0, making it fully
replicated
+ assertEquals(partitionInfoMap[1]._fullyReplicatedServers,
Collections.singleton(SERVER_0));
+ assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new
String[]{segment1, segment5});
+
+ // Add segments with partition IDs 2, 6 (should both map to partition 2)
+ String segment2 = "segment_partition_2";
+ String segment6 = "segment_partition_6";
+ onlineSegments.add(segment2);
+ onlineSegments.add(segment6);
+ segmentAssignment.put(segment2, ImmutableMap.of(SERVER_0, ONLINE,
SERVER_1, ONLINE));
+ segmentAssignment.put(segment6, ImmutableMap.of(SERVER_0, ONLINE,
SERVER_1, ONLINE));
+
+ setSegmentZKMetadata(segment2, PARTITION_COLUMN_FUNC, 8, 2, 0L);
+ setSegmentZKMetadata(segment6, PARTITION_COLUMN_FUNC, 8, 6, 0L);
+
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ tablePartitionReplicatedServersInfo =
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+ partitionInfoMap =
tablePartitionReplicatedServersInfo.getPartitionInfoMap();
+
+ // Partition 2 should have both segments on both servers, making both
servers fully replicated
+ assertEquals(partitionInfoMap[2]._fullyReplicatedServers,
ImmutableSet.of(SERVER_0, SERVER_1));
+ assertEqualsNoOrder(partitionInfoMap[2]._segments.toArray(), new
String[]{segment2, segment6});
+
+
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
+ }
+
+ @Test
+ public void testPartitionIdRemappingInvalidCases() {
+ ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+ Map<String, Map<String, String>> segmentAssignment =
externalView.getRecord().getMapFields();
+ Set<String> onlineSegments = new HashSet<>();
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+
+ // Create partition metadata manager with remapping enabled but invalid
divisibility (3 table partitions from 8
+ // segment partitions)
Review Comment:
[nitpick] The comment has a formatting issue with line wrapping. The comment
should be properly formatted on a single line or with proper continuation
indentation.
```suggestion
// segment partitions)
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java:
##########
@@ -106,14 +108,28 @@ private int getPartitionId(String segment, @Nullable
ZNRecord znRecord) {
if (!_partitionFunctionName.equalsIgnoreCase(partitionFunction.getName()))
{
return INVALID_PARTITION_ID;
}
- if (_numPartitions != partitionFunction.getNumPartitions()) {
- return INVALID_PARTITION_ID;
- }
- Set<Integer> partitions = segmentPartitionInfo.getPartitions();
- if (partitions.size() != 1) {
- return INVALID_PARTITION_ID;
+ if (_allowPartitionRemapping) {
+ // Ensure segment partitions can be evenly distributed across table
partitions.
+ // For example, 8 Kafka partitions can be remapped to 4 Pinot partitions
(8 % 4 = 0),
+ // but 8 partitions cannot be remapped to 3 partitions (8 % 3 ≠ 0).
+ if (partitionFunction.getNumPartitions() % _numPartitions != 0) {
+ return INVALID_PARTITION_ID;
+ }
+ Set<Integer> partitions = segmentPartitionInfo.getPartitions();
+ if (partitions.size() != 1) {
+ return INVALID_PARTITION_ID;
+ }
+ return partitions.iterator().next() % _numPartitions;
+ } else {
Review Comment:
The comment should explain why segments must have exactly one partition
(line 119-121). This validation constraint is not obvious and would benefit
from additional documentation explaining the business logic.
##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java:
##########
@@ -272,6 +272,191 @@ public void
testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePar
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
}
+ @Test
+ public void testPartitionIdRemappingLogic() {
+ ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+ Map<String, Map<String, String>> segmentAssignment =
externalView.getRecord().getMapFields();
+ Map<String, String> onlineInstanceStateMap = ImmutableMap.of(SERVER_0,
ONLINE, SERVER_1, ONLINE);
+ Set<String> onlineSegments = new HashSet<>();
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+
+ // Create partition metadata manager with remapping enabled (4 table
partitions from 8 segment partitions)
+ SegmentPartitionMetadataManager partitionMetadataManager =
+ new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME,
PARTITION_COLUMN, PARTITION_COLUMN_FUNC, 4, true);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
+ segmentZkMetadataFetcher.register(partitionMetadataManager);
+
+ // Initial state should be empty
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
+ TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo =
partitionMetadataManager
+ .getTablePartitionReplicatedServersInfo();
+ assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap(),
+ new TablePartitionReplicatedServersInfo.PartitionInfo[4]);
+
+ // Add segments with partition IDs 0, 4 (should both map to partition 0
via modulo)
+ String segment0 = "segment_partition_0";
+ String segment4 = "segment_partition_4";
+ onlineSegments.add(segment0);
+ onlineSegments.add(segment4);
+ segmentAssignment.put(segment0, Collections.singletonMap(SERVER_0,
ONLINE));
+ segmentAssignment.put(segment4, Collections.singletonMap(SERVER_1,
ONLINE));
+
+ // Set metadata for segments with 8 total partitions (will be remapped to
4)
+ setSegmentZKMetadata(segment0, PARTITION_COLUMN_FUNC, 8, 0, 0L);
+ setSegmentZKMetadata(segment4, PARTITION_COLUMN_FUNC, 8, 4, 0L);
+
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ tablePartitionReplicatedServersInfo =
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+ TablePartitionReplicatedServersInfo.PartitionInfo[] partitionInfoMap =
tablePartitionReplicatedServersInfo
+ .getPartitionInfoMap();
+
+ // Both segments should be in partition 0 (0 % 4 = 0, 4 % 4 = 0)
+ assertNull(partitionInfoMap[1]);
+ assertNull(partitionInfoMap[2]);
+ assertNull(partitionInfoMap[3]);
+ // No single server has all segments in partition 0, so
fullyReplicatedServers should be empty
+ assertTrue(partitionInfoMap[0]._fullyReplicatedServers.isEmpty());
+ assertEqualsNoOrder(partitionInfoMap[0]._segments.toArray(), new
String[]{segment0, segment4});
+
+ // Add segments with partition IDs 1, 5 (should both map to partition 1)
+ String segment1 = "segment_partition_1";
+ String segment5 = "segment_partition_5";
+ onlineSegments.add(segment1);
+ onlineSegments.add(segment5);
+ segmentAssignment.put(segment1, Collections.singletonMap(SERVER_0,
ONLINE));
+ segmentAssignment.put(segment5, Collections.singletonMap(SERVER_0,
ONLINE));
+
+ setSegmentZKMetadata(segment1, PARTITION_COLUMN_FUNC, 8, 1, 0L);
+ setSegmentZKMetadata(segment5, PARTITION_COLUMN_FUNC, 8, 5, 0L);
+
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ tablePartitionReplicatedServersInfo =
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+ partitionInfoMap =
tablePartitionReplicatedServersInfo.getPartitionInfoMap();
+
+ // Partition 1 should have both segments on SERVER_0, making it fully
replicated
+ assertEquals(partitionInfoMap[1]._fullyReplicatedServers,
Collections.singleton(SERVER_0));
+ assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new
String[]{segment1, segment5});
+
+ // Add segments with partition IDs 2, 6 (should both map to partition 2)
+ String segment2 = "segment_partition_2";
+ String segment6 = "segment_partition_6";
+ onlineSegments.add(segment2);
+ onlineSegments.add(segment6);
+ segmentAssignment.put(segment2, ImmutableMap.of(SERVER_0, ONLINE,
SERVER_1, ONLINE));
+ segmentAssignment.put(segment6, ImmutableMap.of(SERVER_0, ONLINE,
SERVER_1, ONLINE));
+
+ setSegmentZKMetadata(segment2, PARTITION_COLUMN_FUNC, 8, 2, 0L);
+ setSegmentZKMetadata(segment6, PARTITION_COLUMN_FUNC, 8, 6, 0L);
+
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ tablePartitionReplicatedServersInfo =
partitionMetadataManager.getTablePartitionReplicatedServersInfo();
+ partitionInfoMap =
tablePartitionReplicatedServersInfo.getPartitionInfoMap();
+
+ // Partition 2 should have both segments on both servers, making both
servers fully replicated
+ assertEquals(partitionInfoMap[2]._fullyReplicatedServers,
ImmutableSet.of(SERVER_0, SERVER_1));
+ assertEqualsNoOrder(partitionInfoMap[2]._segments.toArray(), new
String[]{segment2, segment6});
+
+
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
+ }
+
+ @Test
+ public void testPartitionIdRemappingInvalidCases() {
+ ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+ Map<String, Map<String, String>> segmentAssignment =
externalView.getRecord().getMapFields();
+ Set<String> onlineSegments = new HashSet<>();
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+
+ // Create partition metadata manager with remapping enabled but invalid
divisibility (3 table partitions from 8
+ // segment partitions)
+ SegmentPartitionMetadataManager partitionMetadataManager =
+ new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME,
PARTITION_COLUMN, PARTITION_COLUMN_FUNC, 3, true);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
+ segmentZkMetadataFetcher.register(partitionMetadataManager);
+
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
+
+ // Add segment with partition ID from 8-partition scheme (should be
invalid because 8 % 3 != 0)
+ String invalidSegment = "invalid_segment";
+ onlineSegments.add(invalidSegment);
+ segmentAssignment.put(invalidSegment, Collections.singletonMap(SERVER_0,
ONLINE));
+ setSegmentZKMetadata(invalidSegment, PARTITION_COLUMN_FUNC, 8, 0, 0L);
+
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo =
partitionMetadataManager
+ .getTablePartitionReplicatedServersInfo();
+
+ // Segment should be marked as invalid due to non-divisible partition count
+
assertEquals(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition(),
+ Collections.singletonList(invalidSegment));
+ assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap(),
+ new TablePartitionReplicatedServersInfo.PartitionInfo[3]);
+ }
+
+ @Test
+ public void testPartitionIdRemappingDisabled() {
+ ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+ Map<String, Map<String, String>> segmentAssignment =
externalView.getRecord().getMapFields();
+ Set<String> onlineSegments = new HashSet<>();
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+
+ // Create partition metadata manager with remapping DISABLED (4 table
partitions, remapping = false)
Review Comment:
[nitpick] Using 'DISABLED' in all caps in comments is inconsistent with the
rest of the codebase style. Consider using 'disabled' in lowercase for
consistency.
```suggestion
// Create partition metadata manager with remapping disabled (4 table
partitions, remapping = false)
```
##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ColumnPartitionConfig.java:
##########
@@ -30,20 +30,27 @@ public class ColumnPartitionConfig extends BaseJsonConfig {
private final String _functionName;
private final int _numPartitions;
private final Map<String, String> _functionConfig;
+ private final boolean _allowPartitionRemapping;
public ColumnPartitionConfig(String functionName, int numPartitions) {
- this(functionName, numPartitions, null);
+ this(functionName, numPartitions, null, null);
+ }
+
+ public ColumnPartitionConfig(String functionName, int numPartitions,
@Nullable Map<String, String> functionConfig) {
+ this(functionName, numPartitions, functionConfig, null);
}
Review Comment:
The constructor overload should include the ASF license header comment
explaining the parameter usage, specifically for the allowPartitionRemapping
parameter being defaulted to null/false.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]