This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fd39662b083 Fix auto reset and optimize multi-topic ingestion (#16833)
fd39662b083 is described below

commit fd39662b0832d07b6748add2e38f2026ba6e3077
Author: lnbest0707 <[email protected]>
AuthorDate: Wed Sep 17 08:31:57 2025 -0700

    Fix auto reset and optimize multi-topic ingestion (#16833)
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 44 +++++++++++++---------
 .../apache/pinot/spi/config/table/PauseState.java  |  7 +++-
 2 files changed, 31 insertions(+), 20 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e956d0bf5b3..beadd59f089 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -390,8 +390,10 @@ public class PinotLLCRealtimeSegmentManager {
     long currentTimeMs = getCurrentTimeMs();
     Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
     for (PartitionGroupMetadata partitionGroupMetadata : 
newPartitionGroupMetadataList) {
+      int streamConfigIdx = 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
+          partitionGroupMetadata.getPartitionGroupId());
       String segmentName =
-          setupNewPartitionGroup(tableConfig, streamConfigs.get(0), 
partitionGroupMetadata, currentTimeMs,
+          setupNewPartitionGroup(tableConfig, 
streamConfigs.get(streamConfigIdx), partitionGroupMetadata, currentTimeMs,
               instancePartitions, numPartitionGroups, numReplicas);
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, 
segmentName, segmentAssignment,
           instancePartitionsMap);
@@ -785,6 +787,8 @@ public class PinotLLCRealtimeSegmentManager {
     if (!isTablePaused(idealState) && !isTopicPaused(idealState, 
committingSegmentName)) {
       LLCSegmentName committingLLCSegment = new 
LLCSegmentName(committingSegmentName);
       int committingSegmentPartitionGroupId = 
committingLLCSegment.getPartitionGroupId();
+      int streamConfigIdx = 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
+          committingSegmentPartitionGroupId);
 
       List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
       Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState);
@@ -795,9 +799,9 @@ public class PinotLLCRealtimeSegmentManager {
         LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, 
committingSegmentPartitionGroupId,
             committingLLCSegment.getSequenceNumber() + 1, 
newSegmentCreationTimeMs);
 
-        createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), 
newLLCSegment, newSegmentCreationTimeMs,
-            committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, partitionIds.size(),
-            numReplicas);
+        createNewSegmentZKMetadata(tableConfig, 
streamConfigs.get(streamConfigIdx), newLLCSegment,
+            newSegmentCreationTimeMs, committingSegmentDescriptor, 
committingSegmentZKMetadata, instancePartitions,
+            partitionIds.size(), numReplicas);
         newConsumingSegmentName = newLLCSegment.getSegmentName();
         LOGGER.info("Created new segment metadata for segment: {} with status: 
{}.", newConsumingSegmentName,
             Status.IN_PROGRESS);
@@ -989,6 +993,7 @@ public class PinotLLCRealtimeSegmentManager {
           timeThreshold, offsetThreshold);
       return nextOffset;
     }
+    int streamPartitionId = 
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionId);
     String clientId = getTableTopicUniqueClientId(streamConfig);
     StreamConsumerFactory consumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
     StreamPartitionMsgOffsetFactory offsetFactory = 
consumerFactory.createStreamMsgOffsetFactory();
@@ -996,7 +1001,7 @@ public class PinotLLCRealtimeSegmentManager {
     StreamPartitionMsgOffset offsetAtSLA = null;
     StreamPartitionMsgOffset latestOffset;
     try (StreamMetadataProvider metadataProvider = 
consumerFactory.createPartitionMetadataProvider(clientId,
-        partitionId)) {
+        streamPartitionId)) {
       // Fetching timestamp from an offset is an expensive operation which 
requires reading the data,
       // while fetching offset from timestamp is lightweight and only needs to 
read metadata.
       // Hence, instead of checking if latestOffset's time - nextOffset's time 
< SLA, we would check
@@ -1005,13 +1010,13 @@ public class PinotLLCRealtimeSegmentManager {
       // get nextOffset's time, we should instead check (nextOffset's time + 
SLA)'s offset < latestOffset
       latestOffset =
           
metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
 STREAM_FETCH_TIMEOUT_MS);
-      LOGGER.info("Latest offset of topic {} and partition {} is {}", 
streamConfig.getTopicName(), partitionId,
+      LOGGER.info("Latest offset of topic {} and partition {} is {}", 
streamConfig.getTopicName(), streamPartitionId,
           latestOffset);
       if (timeThreshold > 0) {
         offsetAtSLA =
-            metadataProvider.getOffsetAtTimestamp(partitionId, 
System.currentTimeMillis() - timeThreshold * 1000,
+            metadataProvider.getOffsetAtTimestamp(streamPartitionId, 
System.currentTimeMillis() - timeThreshold * 1000,
                 STREAM_FETCH_TIMEOUT_MS);
-        LOGGER.info("Offset at SLA of topic {} and partition {} is {}", 
streamConfig.getTopicName(), partitionId,
+        LOGGER.info("Offset at SLA of topic {} and partition {} is {}", 
streamConfig.getTopicName(), streamPartitionId,
             offsetAtSLA);
       }
     } catch (Exception e) {
@@ -1021,13 +1026,13 @@ public class PinotLLCRealtimeSegmentManager {
     try {
       if (timeThreshold > 0 && offsetAtSLA != null && 
offsetAtSLA.compareTo(nextOffsetWithType) > 0) {
         LOGGER.info("Auto reset offset from {} to {} on partition {} because 
time threshold reached", nextOffset,
-            latestOffset, partitionId);
+            latestOffset, streamPartitionId);
         return latestOffset.toString();
       }
       if (offsetThreshold > 0
           && Long.parseLong(latestOffset.toString()) - 
Long.parseLong(nextOffset) > offsetThreshold) {
         LOGGER.info("Auto reset offset from {} to {} on partition {} because 
number of offsets threshold reached",
-            nextOffset, latestOffset, partitionId);
+            nextOffset, latestOffset, streamPartitionId);
         return latestOffset.toString();
       }
     } catch (Exception e) {
@@ -1660,6 +1665,7 @@ public class PinotLLCRealtimeSegmentManager {
       SegmentZKMetadata latestSegmentZKMetadata = entry.getValue();
       String latestSegmentName = latestSegmentZKMetadata.getSegmentName();
       LLCSegmentName latestLLCSegmentName = new 
LLCSegmentName(latestSegmentName);
+      int streamConfigIdx = 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
 
       Map<String, String> instanceStateMap = 
instanceStatesMap.get(latestSegmentName);
       if (instanceStateMap != null) {
@@ -1683,7 +1689,8 @@ public class PinotLLCRealtimeSegmentManager {
               CommittingSegmentDescriptor committingSegmentDescriptor =
                   new CommittingSegmentDescriptor(latestSegmentName,
                       
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
-              createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), 
newLLCSegmentName, currentTimeMs,
+              createNewSegmentZKMetadata(tableConfig, 
streamConfigs.get(streamConfigIdx), newLLCSegmentName,
+                  currentTimeMs,
                   committingSegmentDescriptor, latestSegmentZKMetadata, 
instancePartitions, numPartitions, numReplicas);
               updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
latestSegmentName, newSegmentName,
                   segmentAssignment, instancePartitionsMap);
@@ -1733,16 +1740,16 @@ public class PinotLLCRealtimeSegmentManager {
                 selectStartOffset(offsetCriteria, partitionId, 
partitionIdToStartOffset, partitionIdToSmallestOffset,
                     tableConfig.getTableName(), offsetFactory,
                     latestSegmentZKMetadata.getStartOffset()); // segments are 
OFFLINE; start from beginning
-            createNewConsumingSegment(tableConfig, streamConfigs.get(0), 
latestSegmentZKMetadata, currentTimeMs,
-                partitionGroupMetadataList, instancePartitions, 
instanceStatesMap, segmentAssignment,
+            createNewConsumingSegment(tableConfig, 
streamConfigs.get(streamConfigIdx), latestSegmentZKMetadata,
+                currentTimeMs, partitionGroupMetadataList, instancePartitions, 
instanceStatesMap, segmentAssignment,
                 instancePartitionsMap, startOffset);
           } else {
             LOGGER.info("Resuming consumption for partition: {} of table: {}", 
partitionId, realtimeTableName);
             StreamPartitionMsgOffset startOffset =
                 selectStartOffset(offsetCriteria, partitionId, 
partitionIdToStartOffset, partitionIdToSmallestOffset,
                     tableConfig.getTableName(), offsetFactory, 
latestSegmentZKMetadata.getEndOffset());
-            createNewConsumingSegment(tableConfig, streamConfigs.get(0), 
latestSegmentZKMetadata, currentTimeMs,
-                partitionGroupMetadataList, instancePartitions, 
instanceStatesMap, segmentAssignment,
+            createNewConsumingSegment(tableConfig, 
streamConfigs.get(streamConfigIdx), latestSegmentZKMetadata,
+                currentTimeMs, partitionGroupMetadataList, instancePartitions, 
instanceStatesMap, segmentAssignment,
                 instancePartitionsMap, startOffset);
           }
         }
@@ -1786,10 +1793,11 @@ public class PinotLLCRealtimeSegmentManager {
     // Set up new partitions if not exist
     for (PartitionGroupMetadata partitionGroupMetadata : 
partitionGroupMetadataList) {
       int partitionId = partitionGroupMetadata.getPartitionGroupId();
+      int streamConfigIdx = 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
       if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
         String newSegmentName =
-            setupNewPartitionGroup(tableConfig, streamConfigs.get(0), 
partitionGroupMetadata, currentTimeMs,
-                instancePartitions, numPartitions, numReplicas);
+            setupNewPartitionGroup(tableConfig, 
streamConfigs.get(streamConfigIdx), partitionGroupMetadata,
+                currentTimeMs, instancePartitions, numPartitions, numReplicas);
         updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, 
newSegmentName, segmentAssignment,
             instancePartitionsMap);
       }
@@ -2541,7 +2549,7 @@ public class PinotLLCRealtimeSegmentManager {
     Set<String> consumingSegments = new TreeSet<>();
     idealState.getRecord().getMapFields().forEach((segmentName, 
instanceToStateMap) -> {
       LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
-      if (llcSegmentName != null && topicIndices.contains(
+      if (llcSegmentName != null && !topicIndices.contains(
           
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(llcSegmentName.getPartitionGroupId())))
 {
         return;
       }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
index 50ae0404854..18af6c329b5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
@@ -29,7 +29,7 @@ public class PauseState extends BaseJsonConfig {
   private String _comment;
   private String _timestamp;
   // List of inactive topic indices. Index is the index of the topic in the 
streamConfigMaps.
-  private List<Integer> _indexOfInactiveTopics;
+  private List<Integer> _indexOfInactiveTopics = new ArrayList<>();
 
   public PauseState() {
   }
@@ -80,7 +80,10 @@ public class PauseState extends BaseJsonConfig {
   }
 
   public void setIndexOfInactiveTopics(List<Integer> indexOfInactiveTopics) {
-    _indexOfInactiveTopics = indexOfInactiveTopics == null ? new ArrayList<>() 
: indexOfInactiveTopics;
+    _indexOfInactiveTopics.clear();
+    if (indexOfInactiveTopics != null) {
+      _indexOfInactiveTopics.addAll(indexOfInactiveTopics);
+    }
   }
 
   public enum ReasonCode {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to