This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c44fd796a5b [Auto reset 2/3]Introduce topic 'inactive' status (#16692)
c44fd796a5b is described below

commit c44fd796a5b1ec3a509c791fa3340af794c9d43f
Author: lnbest0707 <[email protected]>
AuthorDate: Fri Aug 29 15:48:37 2025 -0700

    [Auto reset 2/3]Introduce topic 'inactive' status (#16692)
    
    * Introduce topic 'inactive' status
    
    * Expose the topic pause through API and add UT for  
PartitionGroupMetadataFetcher
    
    * Fix style
    
    * Fix idealstate update
    
    * Change variable naming
---
 .../api/resources/PinotRealtimeTableResource.java  |  68 +++++++
 .../controller/helix/SegmentStatusChecker.java     |   2 +-
 .../helix/core/PinotTableIdealStateBuilder.java    |   6 +-
 .../realtime/MissingConsumingSegmentFinder.java    |   8 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 126 ++++++++++--
 .../PinotLLCRealtimeSegmentManagerTest.java        |  25 +--
 .../apache/pinot/spi/config/table/PauseState.java  |  16 +-
 .../spi/stream/PartitionGroupMetadataFetcher.java  |  10 +-
 .../stream/PartitionGroupMetadataFetcherTest.java  | 216 +++++++++++++++++++++
 9 files changed, 438 insertions(+), 39 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index b442b883d99..bd262a9f995 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -29,12 +29,15 @@ import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -123,6 +126,38 @@ public class PinotRealtimeTableResource {
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/pauseTopicConsumption")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.PAUSE_CONSUMPTION)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Pause consumption of some topics of a realtime 
table", notes = "Pause the consumption of "
+      + "some topics of a realtime table.")
+  public Response pauseTopicConsumption(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "Comma separated list of index of the topics", 
required = true) @QueryParam("topicIndices")
+      String topicIndices,
+      @Context HttpHeaders headers) {
+    tableName = DatabaseUtils.translateTableName(tableName, headers);
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    validateTable(tableNameWithType);
+    List<Integer> topicIndexList;
+    try {
+      topicIndexList = Arrays.stream(topicIndices.split(","))
+          .map(String::trim)
+          .map(idx -> Integer.parseInt(idx))
+          .collect(Collectors.toList());
+    } catch (NumberFormatException nfe) {
+      throw new ControllerApplicationException(LOGGER, "topicIndices should be 
a comma separated list of integers",
+          Response.Status.BAD_REQUEST, nfe);
+    }
+    try {
+      return 
Response.ok(_pinotLLCRealtimeSegmentManager.pauseTopicsConsumption(tableNameWithType,
 topicIndexList))
+          .build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   @POST
   @Path("/tables/{tableName}/resumeConsumption")
   @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.RESUME_CONSUMPTION)
@@ -160,6 +195,39 @@ public class PinotRealtimeTableResource {
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/resumeTopicConsumption")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.RESUME_CONSUMPTION)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resume consumption of some topics of a realtime 
table", notes =
+      "Resume the consumption for some topics of a realtime table. There are 
two independent pause mechanism, "
+          + "table pause and topic pause. The topics is resumed only if both 
table and topics are resumed.")
+  public Response resumeTopicConsumption(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "Comma separated list of index of the topics", 
required = true) @QueryParam("topicIndices")
+      String topicIndices,
+      @Context HttpHeaders headers) {
+    tableName = DatabaseUtils.translateTableName(tableName, headers);
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    validateTable(tableNameWithType);
+    List<Integer> topicIndexList;
+    try {
+      topicIndexList = Arrays.stream(topicIndices.split(","))
+          .map(String::trim)
+          .map(idx -> Integer.parseInt(idx))
+          .collect(Collectors.toList());
+    } catch (NumberFormatException nfe) {
+      throw new ControllerApplicationException(LOGGER, "topicIndices should be 
a comma separated list of integers",
+          Response.Status.BAD_REQUEST, nfe);
+    }
+    try {
+      return 
Response.ok(_pinotLLCRealtimeSegmentManager.resumeTopicsConsumption(
+          tableNameWithType, topicIndexList)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   @POST
   @Path("/tables/{tableName}/forceCommit")
   @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.FORCE_COMMIT)
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 553dada84f8..9a09761b815 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -479,7 +479,7 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
     if (tableType == TableType.REALTIME && tableConfig != null) {
       List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
       new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, 
_controllerMetrics,
-          streamConfigs).findAndEmitMetrics(idealState);
+          streamConfigs, idealState).findAndEmitMetrics(idealState);
     }
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 8bc9ea442fb..6ec48830ca7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -87,12 +87,14 @@ public class PinotTableIdealStateBuilder {
    *                                            partition groups.
    *                                          The size of this list is equal 
to the number of partition groups,
    *                                          and is created using the latest 
segment zk metadata.
+   * @param pausedTopicIndices List of inactive topic indices. Index is the 
index of the topic in the streamConfigMaps.
    * @param forceGetOffsetFromStream - details in 
PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset
    */
   public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
-      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList, boolean forceGetOffsetFromStream) {
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList, List<Integer> pausedTopicIndices,
+      boolean forceGetOffsetFromStream) {
     PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new 
PartitionGroupMetadataFetcher(
-        streamConfigs, partitionGroupConsumptionStatusList, 
forceGetOffsetFromStream);
+        streamConfigs, partitionGroupConsumptionStatusList, 
pausedTopicIndices, forceGetOffsetFromStream);
     try {
       
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
       return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index efc43246b7e..99bee6f8a7f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -38,6 +39,7 @@ import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
+import org.apache.pinot.spi.config.table.PauseState;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -67,7 +69,7 @@ public class MissingConsumingSegmentFinder {
   private ControllerMetrics _controllerMetrics;
 
   public MissingConsumingSegmentFinder(String realtimeTableName, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
-      ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) {
+      ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs, 
IdealState idealState) {
     _realtimeTableName = realtimeTableName;
     _controllerMetrics = controllerMetrics;
     _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, 
controllerMetrics);
@@ -81,7 +83,9 @@ public class MissingConsumingSegmentFinder {
       return streamConfig;
     });
     try {
-      PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList(), false)
+      PauseState pauseState = 
PinotLLCRealtimeSegmentManager.extractTablePauseState(idealState);
+      PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList(),
+              pauseState == null ? new ArrayList<>() : 
pauseState.getIndexOfInactiveTopics(), false)
           .forEach(metadata -> {
             
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
           });
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 0d135dd37e3..549cd2c2507 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -51,6 +51,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -377,7 +378,7 @@ public class PinotLLCRealtimeSegmentManager {
     
streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater);
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
     List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList());
+        getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList(), idealState);
     int numPartitionGroups = newPartitionGroupMetadataList.size();
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
@@ -781,7 +782,7 @@ public class PinotLLCRealtimeSegmentManager {
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState)) {
+    if (!isTablePaused(idealState) && !isTopicPaused(idealState, 
committingSegmentName)) {
       LLCSegmentName committingLLCSegment = new 
LLCSegmentName(committingSegmentName);
       int committingSegmentPartitionGroupId = 
committingLLCSegment.getPartitionGroupId();
 
@@ -932,8 +933,7 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Handle offset auto reset
     String nextOffset = committingSegmentDescriptor.getNextOffset();
-    String startOffset = computeStartOffset(
-        nextOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
+    String startOffset = computeStartOffset(nextOffset, streamConfig, 
newLLCSegmentName.getPartitionGroupId());
 
     LOGGER.info(
         "Creating segment ZK metadata for new CONSUMING segment: {} with start 
offset: {} and creation time: {}",
@@ -972,8 +972,9 @@ public class PinotLLCRealtimeSegmentManager {
     int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
     if (timeThreshold <= 0 && offsetThreshold <= 0) {
       LOGGER.warn("Invalid offset auto reset configuration for table: {}, 
topic: {}. "
-              + "timeThreshold: {}, offsetThreshold: {}",
-          streamConfig.getTableNameWithType(), streamConfig.getTopicName(), 
timeThreshold, offsetThreshold);
+              + "timeThreshold: {}, offsetThreshold: {}", 
streamConfig.getTableNameWithType(),
+          streamConfig.getTopicName(),
+          timeThreshold, offsetThreshold);
       return nextOffset;
     }
     String clientId = getTableTopicUniqueClientId(streamConfig);
@@ -990,8 +991,8 @@ public class PinotLLCRealtimeSegmentManager {
       // (CurrentTime - SLA)'s offset > nextOffset.
       // TODO: it is relying on System.currentTimeMillis() which might be 
affected by time drift. If we are able to
       // get nextOffset's time, we should instead check (nextOffset's time + 
SLA)'s offset < latestOffset
-      latestOffset = metadataProvider.fetchStreamPartitionOffset(
-          OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS);
+      latestOffset =
+          
metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
 STREAM_FETCH_TIMEOUT_MS);
       LOGGER.info("Latest offset of topic {} and partition {} is {}", 
streamConfig.getTopicName(), partitionId,
           latestOffset);
       if (timeThreshold > 0) {
@@ -1140,7 +1141,7 @@ public class PinotLLCRealtimeSegmentManager {
       List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
           getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
       List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-          getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
+          getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState);
       partitionIds.addAll(newPartitionGroupMetadataList.stream()
           .map(PartitionGroupMetadata::getPartitionGroupId)
           .collect(Collectors.toSet()));
@@ -1155,9 +1156,9 @@ public class PinotLLCRealtimeSegmentManager {
    */
   @VisibleForTesting
   List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
-      List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList) {
-    return 
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
-        currentPartitionGroupConsumptionStatusList, false);
+      List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList, IdealState idealState) {
+    return getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState,
+        false);
   }
 
   /**
@@ -1167,10 +1168,12 @@ public class PinotLLCRealtimeSegmentManager {
    */
   @VisibleForTesting
   List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
-      List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList,
+      List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList, IdealState idealState,
       boolean forceGetOffsetFromStream) {
+    PauseState pauseState = extractTablePauseState(idealState);
     return 
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
-        currentPartitionGroupConsumptionStatusList, forceGetOffsetFromStream);
+        currentPartitionGroupConsumptionStatusList,
+        pauseState == null ? new ArrayList<>() : 
pauseState.getIndexOfInactiveTopics(), forceGetOffsetFromStream);
   }
 
   /**
@@ -1343,7 +1346,7 @@ public class PinotLLCRealtimeSegmentManager {
               .forEach(streamConfig -> streamConfig.setOffsetCriteria(
                   offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
           List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-              getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
+              getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState);
           streamConfigs.stream().forEach(streamConfig -> 
streamConfig.setOffsetCriteria(originalOffsetCriteria));
           return ensureAllPartitionsConsuming(tableConfig, streamConfigs, 
idealState, newPartitionGroupMetadataList,
               offsetCriteria);
@@ -1383,7 +1386,8 @@ public class PinotLLCRealtimeSegmentManager {
             "Exceeded max segment completion time for segment " + 
committingSegmentName);
       }
       
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
 committingSegmentName,
-          isTablePaused(idealState) ? null : newSegmentName, 
segmentAssignment, instancePartitionsMap);
+          isTablePaused(idealState) || isTopicPaused(idealState, 
committingSegmentName) ? null : newSegmentName,
+          segmentAssignment, instancePartitionsMap);
       return idealState;
     }, DEFAULT_RETRY_POLICY);
   }
@@ -1399,7 +1403,24 @@ public class PinotLLCRealtimeSegmentManager {
     return 
Boolean.parseBoolean(idealState.getRecord().getSimpleField(IS_TABLE_PAUSED));
   }
 
-  private static PauseState extractTablePauseState(IdealState idealState) {
+  public static boolean isTopicPaused(IdealState idealState, int topicIndex) {
+    PauseState pauseState = extractTablePauseState(idealState);
+    if (pauseState != null) {
+      return pauseState.getIndexOfInactiveTopics().contains(topicIndex);
+    }
+    return false;
+  }
+
+  public static boolean isTopicPaused(IdealState idealState, String 
segmentName) {
+    LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+    if (llcSegmentName != null) {
+      return isTopicPaused(idealState,
+          
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(llcSegmentName.getPartitionGroupId()));
+    }
+    return false;
+  }
+
+  public static PauseState extractTablePauseState(IdealState idealState) {
     String pauseStateStr = 
idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.PAUSE_STATE);
     try {
       if (pauseStateStr != null) {
@@ -1800,8 +1821,8 @@ public class PinotLLCRealtimeSegmentManager {
       // Temporarily, we are passing a boolean flag to indicate if we want to 
use the current status
       // The kafka implementation of computePartitionGroupMetadata() will 
ignore the current status
       // while the kinesis implementation will use it.
-      List<PartitionGroupMetadata> partitionGroupMetadataList =
-          getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, true);
+      List<PartitionGroupMetadata> partitionGroupMetadataList = 
getNewPartitionGroupMetadataList(
+          streamConfigs, currentPartitionGroupConsumptionStatusList, 
idealState, true);
       streamConfig.setOffsetCriteria(originalOffsetCriteria);
       for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
         partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
@@ -2406,8 +2427,12 @@ public class PinotLLCRealtimeSegmentManager {
   public IdealState updatePauseStateInIdealState(String tableNameWithType, 
boolean pause,
       PauseState.ReasonCode reasonCode, @Nullable String comment) {
     PauseState pauseState =
-        new PauseState(pause, reasonCode, comment, new 
Timestamp(System.currentTimeMillis()).toString());
+        new PauseState(pause, reasonCode, comment, new 
Timestamp(System.currentTimeMillis()).toString(), null);
     IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager, 
tableNameWithType, idealState -> {
+      PauseState previousPauseState = extractTablePauseState(idealState);
+      if (previousPauseState != null) {
+        
pauseState.setIndexOfInactiveTopics(previousPauseState.getIndexOfInactiveTopics());
+      }
       ZNRecord znRecord = idealState.getRecord();
       znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
       // maintain for backward compatibility
@@ -2419,6 +2444,48 @@ public class PinotLLCRealtimeSegmentManager {
     return updatedIdealState;
   }
 
+  public PauseState pauseTopicsConsumption(String tableNameWithType, 
List<Integer> indexOfPausedTopics) {
+    IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager, 
tableNameWithType, idealState -> {
+      PauseState pauseState = extractTablePauseState(idealState);
+      if (pauseState == null) {
+        pauseState = new PauseState(false, 
PauseState.ReasonCode.ADMINISTRATIVE, null,
+            new Timestamp(System.currentTimeMillis()).toString(), 
indexOfPausedTopics);
+      } else {
+        // Union the existing paused topics with the newly paused topics
+        pauseState.setIndexOfInactiveTopics(
+            Stream.concat(pauseState.getIndexOfInactiveTopics().stream(), 
indexOfPausedTopics.stream())
+                .distinct()
+                .collect(Collectors.toList()));
+      }
+      ZNRecord znRecord = idealState.getRecord();
+      znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
+      LOGGER.info("Set 'pauseState' to {} in the Ideal State for table {}. " + 
"to pause topics with indices {}.",
+          pauseState, tableNameWithType, indexOfPausedTopics);
+      return new IdealState(znRecord);
+    }, RetryPolicies.noDelayRetryPolicy(3));
+    Set<String> consumingSegments = 
findConsumingSegmentsOfTopics(updatedIdealState, indexOfPausedTopics);
+    sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+    return extractTablePauseState(updatedIdealState);
+  }
+
+  public PauseState resumeTopicsConsumption(String tableNameWithType, 
List<Integer> indexOfPausedTopics) {
+    IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager, 
tableNameWithType, idealState -> {
+      PauseState pauseState = extractTablePauseState(idealState);
+      if (pauseState == null) {
+        return idealState;
+      }
+      pauseState.getIndexOfInactiveTopics().removeAll(indexOfPausedTopics);
+      ZNRecord znRecord = idealState.getRecord();
+      znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
+      LOGGER.info("Set 'pauseState' to {} in the Ideal State for table {}. " + 
"to resume topics with indices {}.",
+          pauseState, tableNameWithType, indexOfPausedTopics);
+      return new IdealState(znRecord);
+    }, RetryPolicies.noDelayRetryPolicy(3));
+    _helixResourceManager.invokeControllerPeriodicTask(tableNameWithType, 
Constants.REALTIME_SEGMENT_VALIDATION_MANAGER,
+        new HashMap<>());
+    return extractTablePauseState(updatedIdealState);
+  }
+
   private void sendForceCommitMessageToServers(String tableNameWithType, 
Set<String> consumingSegments) {
     if (!consumingSegments.isEmpty()) {
       LOGGER.info("Sending force commit messages for segments: {} of table: 
{}", consumingSegments, tableNameWithType);
@@ -2446,6 +2513,24 @@ public class PinotLLCRealtimeSegmentManager {
     return consumingSegments;
   }
 
+  private Set<String> findConsumingSegmentsOfTopics(IdealState idealState, 
List<Integer> topicIndices) {
+    Set<String> consumingSegments = new TreeSet<>();
+    idealState.getRecord().getMapFields().forEach((segmentName, 
instanceToStateMap) -> {
+      LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+      if (llcSegmentName != null && topicIndices.contains(
+          
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(llcSegmentName.getPartitionGroupId())))
 {
+        return;
+      }
+      for (String state : instanceToStateMap.values()) {
+        if (state.equals(SegmentStateModel.CONSUMING)) {
+          consumingSegments.add(segmentName);
+          break;
+        }
+      }
+    });
+    return consumingSegments;
+  }
+
   /**
    * Return pause status:
    *   - Information from the 'pauseState' in the table ideal state
@@ -2456,6 +2541,7 @@ public class PinotLLCRealtimeSegmentManager {
     Set<String> consumingSegments = findConsumingSegments(idealState);
     PauseState pauseState = extractTablePauseState(idealState);
     if (pauseState != null) {
+      // TODO: add paused topics information
       return new PauseStatusDetails(pauseState.isPaused(), consumingSegments, 
pauseState.getReasonCode(),
           pauseState.getComment(), pauseState.getTimeInMillis());
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 263484c5d9d..a04fd27ab9c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -131,6 +131,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
   static final int NUM_DOCS = RANDOM.nextInt(Integer.MAX_VALUE) + 1;
   static final long LATEST_OFFSET = PARTITION_OFFSET.getOffset() * 2 + 
NUM_DOCS;
   static final int SEGMENT_SIZE_IN_BYTES = 100000000;
+
   @AfterClass
   public void tearDown()
       throws IOException {
@@ -289,7 +290,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     // committing segment's partitionGroupId no longer in the 
newPartitionGroupMetadataList
     List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 =
-        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, 
Collections.emptyList());
+        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, 
Collections.emptyList(),
+            mock(IdealState.class));
     partitionGroupMetadataListWithout0.remove(0);
     segmentManager._partitionGroupMetadataList = 
partitionGroupMetadataListWithout0;
 
@@ -743,7 +745,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
      */
     // 1 reached end of shard.
     List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 =
-        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, 
Collections.emptyList());
+        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, 
Collections.emptyList(),
+            mock(IdealState.class));
     partitionGroupMetadataListWithout1.remove(1);
     segmentManager._partitionGroupMetadataList = 
partitionGroupMetadataListWithout1;
     // noop
@@ -1463,7 +1466,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assertNull(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(4), null).getDownloadUrl());
   }
 
-
   @Test
   public void testDeleteTmpSegmentFiles()
       throws Exception {
@@ -1529,7 +1531,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
         List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)),
             new PartitionGroupMetadata(1, new LongMsgOffset(345)));
     doReturn(partitionGroupMetadataList).when(segmentManagerSpy)
-        .getNewPartitionGroupMetadataList(streamConfigs, 
partitionGroupConsumptionStatusList);
+        .getNewPartitionGroupMetadataList(streamConfigs, 
partitionGroupConsumptionStatusList, idealState);
     partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, 
idealState);
     Assert.assertEquals(partitionIds.size(), 2);
   }
@@ -1744,7 +1746,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
   }
 
   @Test
-  public void testSyncCommittingSegments() throws Exception {
+  public void testSyncCommittingSegments()
+      throws Exception {
     // Set up mocks for the resource management infrastructure
     PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
     HelixManager helixManager = mock(HelixManager.class);
@@ -1767,7 +1770,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
     String committingSegmentsListPath =
         
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
 
-
     // Create test segments with different states
     String committingSegment1 = "testTable__0__0__20250210T1142Z";
     String committingSegment2 = "testTable__0__1__20250210T1142Z";
@@ -1814,16 +1816,15 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assertEquals(new 
HashSet<>(existingRecord.getListField(COMMITTING_SEGMENTS)),
         new HashSet<>(List.of(committingSegment1, committingSegment2)));
 
-
     // Test 3: Error handling during ZooKeeper operations
     when(zkHelixPropertyStore.set(eq(committingSegmentsListPath), any(), 
anyInt(), eq(AccessOption.PERSISTENT)))
         .thenThrow(new RuntimeException("ZooKeeper operation failed"));
     assertFalse(segmentManager.syncCommittingSegments(realtimeTableName, 
newSegments));
   }
 
-
   
//////////////////////////////////////////////////////////////////////////////////
   // Fake classes
+
   
/////////////////////////////////////////////////////////////////////////////////
 
   private static class FakePinotLLCRealtimeSegmentManager extends 
PinotLLCRealtimeSegmentManager {
@@ -1889,7 +1890,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     public void ensureAllPartitionsConsuming() {
       ensureAllPartitionsConsuming(_tableConfig, _streamConfigs, _idealState,
-          getNewPartitionGroupMetadataList(_streamConfigs, 
Collections.emptyList()), null);
+          getNewPartitionGroupMetadataList(_streamConfigs, 
Collections.emptyList(), mock(IdealState.class)), null);
     }
 
     @Override
@@ -1971,7 +1972,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     @Override
     List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
-        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList) {
+        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList, IdealState idealState) {
       if (_partitionGroupMetadataList != null) {
         return _partitionGroupMetadataList;
       } else {
@@ -1982,9 +1983,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     @Override
     List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
-        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList,
+        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList, IdealState idealState,
         boolean forceGetOffsetFromStream) {
-      return getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
+      return getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState);
     }
 
     @Override
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
index 4781bfbdac7..50ae0404854 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.spi.config.table;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.pinot.spi.config.BaseJsonConfig;
 
 
@@ -26,15 +28,19 @@ public class PauseState extends BaseJsonConfig {
   private ReasonCode _reasonCode;
   private String _comment;
   private String _timestamp;
+  // List of inactive topic indices. Index is the index of the topic in the 
streamConfigMaps.
+  private List<Integer> _indexOfInactiveTopics;
 
   public PauseState() {
   }
 
-  public PauseState(boolean paused, ReasonCode reasonCode, String comment, 
String timestamp) {
+  public PauseState(boolean paused, ReasonCode reasonCode, String comment, 
String timestamp,
+      List<Integer> indexOfInactiveTopics) {
     _paused = paused;
     _reasonCode = reasonCode;
     _comment = comment;
     _timestamp = timestamp;
+    setIndexOfInactiveTopics(indexOfInactiveTopics);
   }
 
   public boolean isPaused() {
@@ -53,6 +59,10 @@ public class PauseState extends BaseJsonConfig {
     return _timestamp;
   }
 
+  public List<Integer> getIndexOfInactiveTopics() {
+    return _indexOfInactiveTopics;
+  }
+
   public void setPaused(boolean paused) {
     _paused = paused;
   }
@@ -69,6 +79,10 @@ public class PauseState extends BaseJsonConfig {
     _timestamp = timestamp;
   }
 
+  public void setIndexOfInactiveTopics(List<Integer> indexOfInactiveTopics) {
+    _indexOfInactiveTopics = indexOfInactiveTopics == null ? new ArrayList<>() 
: indexOfInactiveTopics;
+  }
+
   public enum ReasonCode {
     ADMINISTRATIVE, STORAGE_QUOTA_EXCEEDED, RESOURCE_UTILIZATION_LIMIT_EXCEEDED
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index bf05ea02854..698ad472e1a 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -38,14 +38,17 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
   private final List<PartitionGroupConsumptionStatus> 
_partitionGroupConsumptionStatusList;
   private final boolean _forceGetOffsetFromStream;
   private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList = 
new ArrayList<>();
+  private final List<Integer> _pausedTopicIndices;
 
   private Exception _exception;
 
   public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs,
-      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList, boolean forceGetOffsetFromStream) {
+      List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList, List<Integer> pausedTopicIndices,
+      boolean forceGetOffsetFromStream) {
     _streamConfigs = streamConfigs;
     _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
     _forceGetOffsetFromStream = forceGetOffsetFromStream;
+    _pausedTopicIndices = pausedTopicIndices;
   }
 
   public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
@@ -100,6 +103,11 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
       throws Exception {
     int numStreams = _streamConfigs.size();
     for (int i = 0; i < numStreams; i++) {
+      if (_pausedTopicIndices.contains(i)) {
+        LOGGER.info("Skipping fetching PartitionGroupMetadata for paused 
topic: {}",
+            _streamConfigs.get(i).getTopicName());
+        continue;
+      }
       StreamConfig streamConfig = _streamConfigs.get(i);
       String topicName = streamConfig.getTopicName();
       String clientId =
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
new file mode 100644
index 00000000000..9fa65254b63
--- /dev/null
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class PartitionGroupMetadataFetcherTest {
+
+  @Test
+  public void testFetchSingleStreamSuccess()
+      throws Exception {
+    // Setup
+    StreamConfig streamConfig = createMockStreamConfig("test-topic", 
"test-table", false);
+    List<StreamConfig> streamConfigs = Collections.singletonList(streamConfig);
+
+    PartitionGroupConsumptionStatus status = 
mock(PartitionGroupConsumptionStatus.class);
+    when(status.getPartitionGroupId()).thenReturn(0);
+    List<PartitionGroupConsumptionStatus> statusList = 
Collections.singletonList(status);
+
+    PartitionGroupMetadata metadata = new PartitionGroupMetadata(0, 
mock(StreamPartitionMsgOffset.class));
+    List<PartitionGroupMetadata> metadataList = 
Collections.singletonList(metadata);
+
+    StreamMetadataProvider metadataProvider = 
mock(StreamMetadataProvider.class);
+    when(metadataProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), anyBoolean())).thenReturn(metadataList);
+
+    StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+    
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider = 
Mockito.mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+      mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+      PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
+          streamConfigs, statusList, Collections.emptyList(), false);
+
+      // Execute
+      Boolean result = fetcher.call();
+
+      // Verify
+      Assert.assertTrue(result);
+      Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 1);
+      Assert.assertNull(fetcher.getException());
+    }
+  }
+
+  @Test
+  public void testFetchSingleStreamTransientException()
+      throws Exception {
+    // Setup
+    StreamConfig streamConfig = createMockStreamConfig("test-topic", 
"test-table", false);
+    List<StreamConfig> streamConfigs = Collections.singletonList(streamConfig);
+
+    List<PartitionGroupConsumptionStatus> statusList = Collections.emptyList();
+
+    StreamMetadataProvider metadataProvider = 
mock(StreamMetadataProvider.class);
+    when(metadataProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), anyBoolean()))
+        .thenThrow(new TransientConsumerException(new 
RuntimeException("Transient error")));
+
+    StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+    
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider = 
Mockito.mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+      mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+      PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
+          streamConfigs, statusList, Collections.emptyList(), false);
+
+      // Execute
+      Boolean result = fetcher.call();
+
+      // Verify
+      Assert.assertFalse(result);
+      Assert.assertTrue(fetcher.getException() instanceof 
TransientConsumerException);
+    }
+  }
+
+  @Test
+  public void testFetchMultipleStreams()
+      throws Exception {
+    // Setup
+    StreamConfig streamConfig1 = createMockStreamConfig("topic1", 
"test-table", false);
+    StreamConfig streamConfig2 = createMockStreamConfig("topic2", 
"test-table", false);
+    List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1, 
streamConfig2);
+
+    PartitionGroupConsumptionStatus status1 = new 
PartitionGroupConsumptionStatus(0, 0, null, null, "IN_PROGRESS");
+    PartitionGroupConsumptionStatus status2 = new 
PartitionGroupConsumptionStatus(1, 1, null, null, "IN_PROGRESS");
+    List<PartitionGroupConsumptionStatus> statusList = Arrays.asList(status1, 
status2);
+
+    PartitionGroupMetadata mockedMetadata1 = new PartitionGroupMetadata(0, 
mock(StreamPartitionMsgOffset.class));
+    PartitionGroupMetadata mockedMetadata2 = new PartitionGroupMetadata(1, 
mock(StreamPartitionMsgOffset.class));
+
+    StreamMetadataProvider metadataProvider = 
mock(StreamMetadataProvider.class);
+    when(metadataProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), anyBoolean()))
+        .thenReturn(Arrays.asList(mockedMetadata1, mockedMetadata2));
+
+    StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+    
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider = 
Mockito.mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+      mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+      PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
+          streamConfigs, statusList, Collections.emptyList(), false);
+
+      // Execute
+      Boolean result = fetcher.call();
+
+      // Verify
+      Assert.assertTrue(result);
+      Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 4);
+      Assert.assertNull(fetcher.getException());
+
+      // Verify the correct partition group IDs: 0, 1, 10000, 10001
+      List<PartitionGroupMetadata> resultMetadata = 
fetcher.getPartitionGroupMetadataList();
+      List<Integer> partitionIds = resultMetadata.stream()
+          .map(PartitionGroupMetadata::getPartitionGroupId)
+          .sorted()
+          .collect(Collectors.toList());
+
+      Assert.assertEquals(partitionIds, Arrays.asList(0, 1, 10000, 10001));
+    }
+  }
+
+  @Test
+  public void testFetchMultipleStreamsWithPause()
+      throws Exception {
+    // Setup
+    StreamConfig streamConfig1 = createMockStreamConfig("topic1", 
"test-table", false);
+    StreamConfig streamConfig2 = createMockStreamConfig("topic2", 
"test-table", false);
+    StreamConfig streamConfig3 = createMockStreamConfig("topic3", 
"test-table", false);
+    List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1, 
streamConfig2, streamConfig3);
+
+    PartitionGroupConsumptionStatus status1 = new 
PartitionGroupConsumptionStatus(0, 0, null, null, "IN_PROGRESS");
+    PartitionGroupConsumptionStatus status2 = new 
PartitionGroupConsumptionStatus(1, 1, null, null, "IN_PROGRESS");
+    List<PartitionGroupConsumptionStatus> statusList = Arrays.asList(status1, 
status2);
+
+    PartitionGroupMetadata mockedMetadata1 = new PartitionGroupMetadata(0, 
mock(StreamPartitionMsgOffset.class));
+    PartitionGroupMetadata mockedMetadata2 = new PartitionGroupMetadata(1, 
mock(StreamPartitionMsgOffset.class));
+
+    StreamMetadataProvider metadataProvider = 
mock(StreamMetadataProvider.class);
+    when(metadataProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), anyBoolean()))
+        .thenReturn(Arrays.asList(mockedMetadata1, mockedMetadata2));
+
+    StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+    
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider = 
Mockito.mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+      mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+      PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
+          streamConfigs, statusList, Arrays.asList(1), false);
+
+      // Execute
+      Boolean result = fetcher.call();
+
+      // Verify
+      Assert.assertTrue(result);
+      Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 4);
+      Assert.assertNull(fetcher.getException());
+
+      // Verify the correct partition group IDs
+      List<PartitionGroupMetadata> resultMetadata = 
fetcher.getPartitionGroupMetadataList();
+      List<Integer> partitionIds = resultMetadata.stream()
+          .map(PartitionGroupMetadata::getPartitionGroupId)
+          .sorted()
+          .collect(Collectors.toList());
+
+      Assert.assertEquals(partitionIds, Arrays.asList(0, 1, 20000, 20001));
+    }
+  }
+
+  private StreamConfig createMockStreamConfig(String topicName, String 
tableName, boolean isEphemeral) {
+    StreamConfig streamConfig = mock(StreamConfig.class);
+    when(streamConfig.getTopicName()).thenReturn(topicName);
+    when(streamConfig.getTableNameWithType()).thenReturn(tableName);
+    return streamConfig;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to