This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 80062305e3a Fix Segment Partition Tracking for Multi-Topic Ingestion 
(#17217)
80062305e3a is described below

commit 80062305e3adc0f822123fcc2e52e680be1f336d
Author: tarun11Mavani <[email protected]>
AuthorDate: Sun Nov 16 22:15:22 2025 +0530

    Fix Segment Partition Tracking for Multi-Topic Ingestion (#17217)
---
 .../helix/core/realtime/PinotLLCRealtimeSegmentManager.java    | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index a41eb37638b..ebbd8c9e5c2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1061,9 +1061,13 @@ public class PinotLLCRealtimeSegmentManager {
                 + "Please update the table config accordingly.", 
numPartitionGroups,
             columnPartitionConfig.getNumPartitions());
       }
+      // For multi-stream tables, convert Pinot partition ID (which includes 
padding offset) to stream partition ID.
+      // This ensures the partition metadata stored in ZK matches what the 
broker's partition function computes
+      // during query pruning. For example, stream 1 partition 5 has Pinot 
partition ID 10005, but should store 5.
+      int streamPartitionId = 
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionId);
       ColumnPartitionMetadata columnPartitionMetadata =
           new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), 
numPartitionGroups,
-              Collections.singleton(partitionId), 
columnPartitionConfig.getFunctionConfig());
+              Collections.singleton(streamPartitionId), 
columnPartitionConfig.getFunctionConfig());
       return new 
SegmentPartitionMetadata(Collections.singletonMap(entry.getKey(), 
columnPartitionMetadata));
     } else {
       LOGGER.warn(
@@ -2402,9 +2406,9 @@ public class PinotLLCRealtimeSegmentManager {
       Set<String> invalidSegments = partitionedByIsConsuming.get(false);
       if (!invalidSegments.isEmpty()) {
         LOGGER.warn("Cannot commit segments that are not in CONSUMING state. 
All consuming segments: {}, "
-            + "provided segments to commit: {}. Ignoring all non-consuming 
segments, sampling 10: {}",
+                + "provided segments to commit: {}. Ignoring all non-consuming 
segments, sampling 10: {}",
             allConsumingSegments,
-                segmentsToCommitStr, 
invalidSegments.stream().limit(10).collect(Collectors.toSet()));
+            segmentsToCommitStr, 
invalidSegments.stream().limit(10).collect(Collectors.toSet()));
       }
       return validSegmentsToCommit;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to