This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 80062305e3a Fix Segment Partition Tracking for Multi-Topic Ingestion
(#17217)
80062305e3a is described below
commit 80062305e3adc0f822123fcc2e52e680be1f336d
Author: tarun11Mavani <[email protected]>
AuthorDate: Sun Nov 16 22:15:22 2025 +0530
Fix Segment Partition Tracking for Multi-Topic Ingestion (#17217)
---
.../helix/core/realtime/PinotLLCRealtimeSegmentManager.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index a41eb37638b..ebbd8c9e5c2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1061,9 +1061,13 @@ public class PinotLLCRealtimeSegmentManager {
+ "Please update the table config accordingly.",
numPartitionGroups,
columnPartitionConfig.getNumPartitions());
}
+ // For multi-stream tables, convert Pinot partition ID (which includes
padding offset) to stream partition ID.
+ // This ensures the partition metadata stored in ZK matches what the
broker's partition function computes
+ // during query pruning. For example, stream 1 partition 5 has Pinot
partition ID 10005, but should store 5.
+ int streamPartitionId =
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionId);
ColumnPartitionMetadata columnPartitionMetadata =
new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(),
numPartitionGroups,
- Collections.singleton(partitionId),
columnPartitionConfig.getFunctionConfig());
+ Collections.singleton(streamPartitionId),
columnPartitionConfig.getFunctionConfig());
return new
SegmentPartitionMetadata(Collections.singletonMap(entry.getKey(),
columnPartitionMetadata));
} else {
LOGGER.warn(
@@ -2402,9 +2406,9 @@ public class PinotLLCRealtimeSegmentManager {
Set<String> invalidSegments = partitionedByIsConsuming.get(false);
if (!invalidSegments.isEmpty()) {
LOGGER.warn("Cannot commit segments that are not in CONSUMING state.
All consuming segments: {}, "
- + "provided segments to commit: {}. Ignoring all non-consuming
segments, sampling 10: {}",
+ + "provided segments to commit: {}. Ignoring all non-consuming
segments, sampling 10: {}",
allConsumingSegments,
- segmentsToCommitStr,
invalidSegments.stream().limit(10).collect(Collectors.toSet()));
+ segmentsToCommitStr,
invalidSegments.stream().limit(10).collect(Collectors.toSet()));
}
return validSegmentsToCommit;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]