Copilot commented on code in PR #16776:
URL: https://github.com/apache/pinot/pull/16776#discussion_r2461874932
##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java:
##########
@@ -77,6 +78,54 @@ public void tearDown() {
stopZk();
}
+ @Test
+ public void testConsumingSegmentPartitionRemappingFromName() {
+ ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
+ Map<String, Map<String, String>> segmentAssignment =
externalView.getRecord().getMapFields();
+ Set<String> onlineSegments = new HashSet<>();
+ IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
+
+ // Enable remapping to 4 table partitions, and do not set any ZK partition
metadata for segments.
+ // The partition id should be derived from the LLC segment name and
modulo-applied.
+ SegmentPartitionMetadataManager partitionMetadataManager =
+ new SegmentPartitionMetadataManager(REALTIME_TABLE_NAME,
PARTITION_COLUMN, PARTITION_COLUMN_FUNC, 4, true);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
+ segmentZkMetadataFetcher.register(partitionMetadataManager);
+
+ // Init with empty state
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
+
+ // Add two consuming (LLC) segments whose names encode partition ids 1 and
5 respectively.
+ // With 4 table partitions and remapping enabled, both should map to
partition 1 (1 % 4 = 1, 5 % 4 = 1).
+ String llcSegmentP1 = "testTable__1__100__1716185755000";
+ String llcSegmentP5 = "testTable__5__101__1716185756000";
Review Comment:
[nitpick] The LLC segment names use hardcoded timestamps (1716185755000,
1716185756000) that don't add semantic value to the test. Consider using
constants with descriptive names or a timestamp generation helper to make the
test data more maintainable and the intent clearer.
```suggestion
final long LLC_SEGMENT_TIMESTAMP_1 = 1716185755000L;
final long LLC_SEGMENT_TIMESTAMP_2 = 1716185756000L;
String llcSegmentP1 = "testTable__1__100__" + LLC_SEGMENT_TIMESTAMP_1;
String llcSegmentP5 = "testTable__5__101__" + LLC_SEGMENT_TIMESTAMP_2;
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java:
##########
@@ -106,14 +113,28 @@ private int getPartitionId(String segment, @Nullable
ZNRecord znRecord) {
if (!_partitionFunctionName.equalsIgnoreCase(partitionFunction.getName()))
{
return INVALID_PARTITION_ID;
}
- if (_numPartitions != partitionFunction.getNumPartitions()) {
- return INVALID_PARTITION_ID;
- }
- Set<Integer> partitions = segmentPartitionInfo.getPartitions();
- if (partitions.size() != 1) {
- return INVALID_PARTITION_ID;
+ if (_allowPartitionRemapping) {
+ // Ensure segment partitions can be evenly distributed across table
partitions.
+ // For example, 8 Kafka partitions can be remapped to 4 Pinot partitions
(8 % 4 = 0),
+ // but 8 partitions cannot be remapped to 3 partitions (8 % 3 ≠ 0).
Review Comment:
The comment describes the divisibility check but uses the reverse modulo
operation. The check `partitionFunction.getNumPartitions() % _numPartitions !=
0` verifies that the segment partition count is evenly divisible by the table
partition count. The comment should clarify this is checking `segmentPartitions
% tablePartitions` rather than implying `8 % 4`.
```suggestion
// This checks: segmentPartitions % tablePartitions == 0.
// For example, 8 Kafka partitions (segmentPartitions) can be remapped
to 4 Pinot partitions (tablePartitions) because 8 % 4 = 0,
// but 8 partitions cannot be remapped to 3 partitions because 8 % 3 ≠
0.
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java:
##########
@@ -97,6 +99,11 @@ private int getPartitionId(String segment, @Nullable
ZNRecord znRecord) {
SegmentPartitionInfo segmentPartitionInfo =
SegmentPartitionUtils.extractPartitionInfo(_tableNameWithType,
_partitionColumn, segment, znRecord);
if (segmentPartitionInfo == null || segmentPartitionInfo ==
SegmentPartitionUtils.INVALID_PARTITION_INFO) {
+ // Fall back to derive partition id from segment name (works for
consuming LLC segments)
+ Integer partitionIdFromName =
SegmentUtils.getPartitionIdFromSegmentName(segment);
+ if (partitionIdFromName != null) {
+ return _allowPartitionRemapping ? (partitionIdFromName %
_numPartitions) : partitionIdFromName;
Review Comment:
The partition ID extraction logic from the segment name bypasses partition
function and partition count validation. When remapping is disabled, this could
return partition IDs that exceed the configured `_numPartitions`, leading to
array index issues. Consider validating that `partitionIdFromName <
_numPartitions` when remapping is disabled before returning the value.
```suggestion
if (_allowPartitionRemapping) {
return partitionIdFromName % _numPartitions;
} else {
// Validate that partitionIdFromName is within [0, _numPartitions)
if (partitionIdFromName >= 0 && partitionIdFromName <
_numPartitions) {
return partitionIdFromName;
} else {
return INVALID_PARTITION_ID;
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]