This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 52db36c816f Adds ability to pause consumption in batches (#17194)
52db36c816f is described below

commit 52db36c816f91ef8887fddd0beade5d169824296
Author: NOOB <[email protected]>
AuthorDate: Fri Dec 12 09:49:18 2025 +0530

    Adds ability to pause consumption in batches (#17194)
    
    * Adds ability to pause consumption in batches
    
    * Adds tests
    
    * Removes reduntant code
    
    * nit
    
    * Renames ForceCommitBatchConfig to BatchConfig
    
    * Fixes lint
---
 ...orceCommitBatchConfig.java => BatchConfig.java} |   8 +-
 .../api/resources/PinotRealtimeTableResource.java  |  28 +++--
 .../api/resources/PinotTableRestletResource.java   |   6 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  26 +++--
 .../helix/core/rebalance/RebalanceConfig.java      |  14 +--
 .../helix/core/rebalance/TableRebalancer.java      |   6 +-
 ...itBatchConfigTest.java => BatchConfigTest.java} |  12 +--
 .../resources/PinotRealtimeTableResourceTest.java  | 117 +++++++++++++++++++++
 .../PinotLLCRealtimeSegmentManagerTest.java        |  61 +++++++++--
 9 files changed, 235 insertions(+), 43 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/BatchConfig.java
similarity index 84%
rename from 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java
rename to 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/BatchConfig.java
index 483bd09db60..a5e931888a7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/BatchConfig.java
@@ -21,7 +21,7 @@ package org.apache.pinot.controller.api.resources;
 import com.google.common.base.Preconditions;
 
 
-public class ForceCommitBatchConfig {
+public class BatchConfig {
   public static final int DEFAULT_BATCH_SIZE = Integer.MAX_VALUE;
   public static final int DEFAULT_STATUS_CHECK_INTERVAL_SEC = 5;
   public static final int DEFAULT_STATUS_CHECK_TIMEOUT_SEC = 180;
@@ -29,20 +29,20 @@ public class ForceCommitBatchConfig {
   private final int _batchStatusCheckIntervalMs;
   private final int _batchStatusCheckTimeoutMs;
 
-  private ForceCommitBatchConfig(int batchSize, int 
batchStatusCheckIntervalMs, int batchStatusCheckTimeoutMs) {
+  private BatchConfig(int batchSize, int batchStatusCheckIntervalMs, int 
batchStatusCheckTimeoutMs) {
     _batchSize = batchSize;
     _batchStatusCheckIntervalMs = batchStatusCheckIntervalMs;
     _batchStatusCheckTimeoutMs = batchStatusCheckTimeoutMs;
   }
 
-  public static ForceCommitBatchConfig of(int batchSize, int 
batchStatusCheckIntervalSec,
+  public static BatchConfig of(int batchSize, int batchStatusCheckIntervalSec,
       int batchStatusCheckTimeoutSec) {
     Preconditions.checkArgument(batchSize > 0, "Batch size should be greater 
than zero");
     Preconditions.checkArgument(batchStatusCheckIntervalSec > 0,
         "Batch status check interval should be greater than zero");
     Preconditions.checkArgument(batchStatusCheckTimeoutSec > 0,
         "Batch status check timeout should be greater than zero");
-    return new ForceCommitBatchConfig(batchSize, batchStatusCheckIntervalSec * 
1000, batchStatusCheckTimeoutSec * 1000);
+    return new BatchConfig(batchSize, batchStatusCheckIntervalSec * 1000, 
batchStatusCheckTimeoutSec * 1000);
   }
 
   public int getBatchSize() {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index bd262a9f995..2e7ad9b880a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -114,13 +114,29 @@ public class PinotRealtimeTableResource {
   public Response pauseConsumption(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "Comment on pausing the consumption") 
@QueryParam("comment") String comment,
+      @ApiParam(value = "Max number of consuming segments to commit at once")
+      @QueryParam("batchSize") @DefaultValue(BatchConfig.DEFAULT_BATCH_SIZE + 
"") int batchSize,
+      @ApiParam(value = "How often to check whether the current batch of 
segments have been successfully committed or"
+          + " not")
+      @QueryParam("batchStatusCheckIntervalSec")
+      @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC + "") int 
batchStatusCheckIntervalSec,
+      @ApiParam(value = "Timeout based on which the controller will stop 
checking the forceCommit status of the batch"
+          + " of segments and throw an exception")
+      @QueryParam("batchStatusCheckTimeoutSec")
+      @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC + "") int 
batchStatusCheckTimeoutSec,
       @Context HttpHeaders headers) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     validateTable(tableNameWithType);
+    BatchConfig batchConfig;
+    try {
+      batchConfig = BatchConfig.of(batchSize, batchStatusCheckIntervalSec, 
batchStatusCheckTimeoutSec);
+    } catch (IllegalArgumentException e) {
+      throw new ControllerApplicationException(LOGGER, "Invalid batch config", 
Response.Status.BAD_REQUEST, e);
+    }
     try {
       return 
Response.ok(_pinotLLCRealtimeSegmentManager.pauseConsumption(tableNameWithType,
-          PauseState.ReasonCode.ADMINISTRATIVE, comment)).build();
+          PauseState.ReasonCode.ADMINISTRATIVE, comment, batchConfig)).build();
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
     }
@@ -246,24 +262,24 @@ public class PinotRealtimeTableResource {
       @ApiParam(value = "Comma separated list of consuming segments to be 
committed") @QueryParam("segments")
       String consumingSegments,
       @ApiParam(value = "Max number of consuming segments to commit at once")
-      @QueryParam("batchSize") 
@DefaultValue(ForceCommitBatchConfig.DEFAULT_BATCH_SIZE + "") int batchSize,
+      @QueryParam("batchSize") @DefaultValue(BatchConfig.DEFAULT_BATCH_SIZE + 
"") int batchSize,
       @ApiParam(value = "How often to check whether the current batch of 
segments have been successfully committed or"
           + " not")
       @QueryParam("batchStatusCheckIntervalSec")
-      @DefaultValue(ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC + 
"") int batchStatusCheckIntervalSec,
+      @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC + "") int 
batchStatusCheckIntervalSec,
       @ApiParam(value = "Timeout based on which the controller will stop 
checking the forceCommit status of the batch"
           + " of segments and throw an exception")
       @QueryParam("batchStatusCheckTimeoutSec")
-      @DefaultValue(ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC + 
"") int batchStatusCheckTimeoutSec,
+      @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC + "") int 
batchStatusCheckTimeoutSec,
       @Context HttpHeaders headers) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     if (partitionGroupIds != null && consumingSegments != null) {
       throw new ControllerApplicationException(LOGGER, "Cannot specify both 
partitions and segments to commit",
           Response.Status.BAD_REQUEST);
     }
-    ForceCommitBatchConfig batchConfig;
+    BatchConfig batchConfig;
     try {
-      batchConfig = ForceCommitBatchConfig.of(batchSize, 
batchStatusCheckIntervalSec, batchStatusCheckTimeoutSec);
+      batchConfig = BatchConfig.of(batchSize, batchStatusCheckIntervalSec, 
batchStatusCheckTimeoutSec);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, "Invalid batch config", 
Response.Status.BAD_REQUEST, e);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 505216602fb..2175394555b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -785,13 +785,13 @@ public class PinotTableRestletResource {
       @DefaultValue("false")
       @QueryParam("forceCommit") boolean forceCommit,
       @ApiParam(value = "Batch size for force commit operations")
-      @DefaultValue(ForceCommitBatchConfig.DEFAULT_BATCH_SIZE + "")
+      @DefaultValue(BatchConfig.DEFAULT_BATCH_SIZE + "")
       @QueryParam("forceCommitBatchSize") int forceCommitBatchSize,
       @ApiParam(value = "Interval in milliseconds for checking force commit 
batch status")
-      @DefaultValue(ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC * 
1000 + "")
+      @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC * 1000 + "")
       @QueryParam("forceCommitBatchStatusCheckIntervalMs") int 
forceCommitBatchStatusCheckIntervalMs,
       @ApiParam(value = "Timeout in milliseconds for force commit batch status 
check")
-      @DefaultValue(ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC * 
1000 + "")
+      @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC * 1000 + "")
       @QueryParam("forceCommitBatchStatusCheckTimeoutMs") int 
forceCommitBatchStatusCheckTimeoutMs,
       @Context HttpHeaders headers
       //@formatter:on
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 2f8b6d8b307..9c3a710bfe2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -88,8 +88,8 @@ import 
org.apache.pinot.common.utils.helix.IdealStateSingleCommit;
 import org.apache.pinot.common.utils.http.HttpClient;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
+import org.apache.pinot.controller.api.resources.BatchConfig;
 import org.apache.pinot.controller.api.resources.Constants;
-import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
 import org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
@@ -2286,17 +2286,17 @@ public class PinotLLCRealtimeSegmentManager {
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit, ForceCommitBatchConfig batchConfig) {
+      @Nullable String segmentsToCommit, @Nullable BatchConfig batchConfig) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments =
         filterSegmentsToCommit(allConsumingSegments, 
partitionGroupIdsToCommit, segmentsToCommit);
-    int batchSize = batchConfig.getBatchSize();
-    if (batchSize >= targetConsumingSegments.size()) {
+    if ((batchConfig == null) || (batchConfig.getBatchSize() >= 
targetConsumingSegments.size())) {
       // No need to divide segments in batches.
       sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
     } else {
-      List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, 
targetConsumingSegments, batchSize);
+      List<Set<String>> segmentBatchList =
+          getSegmentBatchList(idealState, targetConsumingSegments, 
batchConfig.getBatchSize());
       ExecutorService executor = Executors.newSingleThreadExecutor();
       executor.submit(() -> processBatchesSequentially(segmentBatchList, 
tableNameWithType, batchConfig));
       executor.shutdown();
@@ -2305,7 +2305,7 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private void processBatchesSequentially(List<Set<String>> segmentBatchList, 
String tableNameWithType,
-      ForceCommitBatchConfig forceCommitBatchConfig) {
+      BatchConfig forceCommitBatchConfig) {
     Set<String> prevBatch = null;
     try {
       for (Set<String> segmentBatchToCommit : segmentBatchList) {
@@ -2322,7 +2322,7 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   public void waitUntilSegmentsForceCommitted(String tableNameWithType, 
Set<String> segmentsToWait,
-      ForceCommitBatchConfig forceCommitBatchConfig)
+      BatchConfig forceCommitBatchConfig)
       throws InterruptedException {
     int batchStatusCheckIntervalMs = 
forceCommitBatchConfig.getBatchStatusCheckIntervalMs();
     int batchStatusCheckTimeoutMs = 
forceCommitBatchConfig.getBatchStatusCheckTimeoutMs();
@@ -2450,9 +2450,19 @@ public class PinotLLCRealtimeSegmentManager {
    */
   public PauseStatusDetails pauseConsumption(String tableNameWithType, 
PauseState.ReasonCode reasonCode,
       @Nullable String comment) {
+    return pauseConsumption(tableNameWithType, reasonCode, comment, null);
+  }
+
+  /**
+   * Pause consumption on a table by
+   *   1) Update PauseState in the table ideal state and
+   *   2) Sending force commit messages to servers
+   */
+  public PauseStatusDetails pauseConsumption(String tableNameWithType, 
PauseState.ReasonCode reasonCode,
+      @Nullable String comment, @Nullable BatchConfig batchConfig) {
     IdealState updatedIdealState = 
updatePauseStateInIdealState(tableNameWithType, true, reasonCode, comment);
     Set<String> consumingSegments = findConsumingSegments(updatedIdealState);
-    sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+    forceCommit(tableNameWithType, null, null, batchConfig);
     return new PauseStatusDetails(true, consumingSegments, reasonCode, comment 
!= null ? comment
         : "Pause flag is set. Consuming segments are being committed."
             + " Use /pauseStatus endpoint in a few moments to check if all 
consuming segments have been committed.",
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index 716cb94ad6c..5aa9bb05d90 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -24,7 +24,7 @@ import com.google.common.base.Preconditions;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import java.util.Objects;
-import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
+import org.apache.pinot.controller.api.resources.BatchConfig;
 import org.apache.pinot.spi.utils.Enablement;
 
 
@@ -167,16 +167,16 @@ public class RebalanceConfig {
   private boolean _forceCommit = false;
 
   @JsonProperty("forceCommitBatchSize")
-  @ApiModelProperty(example = ForceCommitBatchConfig.DEFAULT_BATCH_SIZE + "")
-  private int _forceCommitBatchSize = 
ForceCommitBatchConfig.DEFAULT_BATCH_SIZE;
+  @ApiModelProperty(example = BatchConfig.DEFAULT_BATCH_SIZE + "")
+  private int _forceCommitBatchSize = BatchConfig.DEFAULT_BATCH_SIZE;
 
   @JsonProperty("forceCommitBatchStatusCheckIntervalMs")
-  @ApiModelProperty(example = 
ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC * 1000 + "")
-  private int _forceCommitBatchStatusCheckIntervalMs = 
ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC * 1000;
+  @ApiModelProperty(example = BatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC * 
1000 + "")
+  private int _forceCommitBatchStatusCheckIntervalMs = 
BatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC * 1000;
 
   @JsonProperty("forceCommitBatchStatusCheckTimeoutMs")
-  @ApiModelProperty(example = 
ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC * 1000 + "")
-  private int _forceCommitBatchStatusCheckTimeoutMs = 
ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC * 1000;
+  @ApiModelProperty(example = BatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC * 
1000 + "")
+  private int _forceCommitBatchStatusCheckTimeoutMs = 
BatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC * 1000;
 
   public boolean isDryRun() {
     return _dryRun;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index d71fa7f4da8..a292e396ce5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -69,7 +69,7 @@ import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.PauselessConsumptionUtils;
 import org.apache.pinot.common.utils.SegmentUtils;
 import org.apache.pinot.common.utils.config.TierConfigUtils;
-import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
+import org.apache.pinot.controller.api.resources.BatchConfig;
 import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.BaseStrictRealtimeSegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
@@ -2226,8 +2226,8 @@ public class TableRebalancer {
     tableRebalanceLogger.info("Force committing {} consuming segments before 
moving them", segmentsToCommit.size());
     Preconditions.checkState(_pinotLLCRealtimeSegmentManager != null,
         "PinotLLCRealtimeSegmentManager is not initialized");
-    ForceCommitBatchConfig forceCommitBatchConfig =
-        ForceCommitBatchConfig.of(forceCommitBatchSize, 
forceCommitBatchStatusCheckIntervalMs / 1000,
+    BatchConfig forceCommitBatchConfig =
+        BatchConfig.of(forceCommitBatchSize, 
forceCommitBatchStatusCheckIntervalMs / 1000,
             forceCommitBatchStatusCheckTimeoutMs / 1000);
     segmentsToCommit = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
         StringUtil.join(",", segmentsToCommit.toArray(String[]::new)), 
forceCommitBatchConfig);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/BatchConfigTest.java
similarity index 78%
rename from 
pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java
rename to 
pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/BatchConfigTest.java
index 862f19218f9..b16e1bda0ec 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/BatchConfigTest.java
@@ -24,26 +24,26 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertThrows;
 
 
-public class ForceCommitBatchConfigTest {
+public class BatchConfigTest {
 
   @Test
   public void testForceCommitBatchConfig() {
-    ForceCommitBatchConfig forceCommitBatchConfig = 
ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+    BatchConfig forceCommitBatchConfig = BatchConfig.of(Integer.MAX_VALUE, 5, 
180);
     assertEquals(forceCommitBatchConfig.getBatchSize(), Integer.MAX_VALUE);
     assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(), 5000);
     assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(), 
180000);
 
-    forceCommitBatchConfig = ForceCommitBatchConfig.of(1, 5, 180);
+    forceCommitBatchConfig = BatchConfig.of(1, 5, 180);
     assertEquals(forceCommitBatchConfig.getBatchSize(), 1);
     assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(), 5000);
     assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(), 
180000);
 
-    forceCommitBatchConfig = ForceCommitBatchConfig.of(1, 23, 37);
+    forceCommitBatchConfig = BatchConfig.of(1, 23, 37);
     assertEquals(forceCommitBatchConfig.getBatchSize(), 1);
     assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(), 
23000);
     assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(), 37000);
 
-    assertThrows(IllegalArgumentException.class, () -> 
ForceCommitBatchConfig.of(0, 5, 180));
-    assertThrows(IllegalArgumentException.class, () -> 
ForceCommitBatchConfig.of(32, 0, 0));
+    assertThrows(IllegalArgumentException.class, () -> BatchConfig.of(0, 5, 
180));
+    assertThrows(IllegalArgumentException.class, () -> BatchConfig.of(32, 0, 
0));
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResourceTest.java
new file mode 100644
index 00000000000..068883ab6d1
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResourceTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import org.apache.helix.model.IdealState;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.PauseState;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.expectThrows;
+
+
+public class PinotRealtimeTableResourceTest {
+
+  @Test
+  public void testPauseConsumptionWithValidBatchParameters()
+      throws Exception {
+    PinotRealtimeTableResource resource = new PinotRealtimeTableResource();
+    PinotHelixResourceManager helixResourceManager = 
mock(PinotHelixResourceManager.class);
+    PinotLLCRealtimeSegmentManager segmentManager = 
mock(PinotLLCRealtimeSegmentManager.class);
+
+    setField(resource, "_pinotHelixResourceManager", helixResourceManager);
+    setField(resource, "_pinotLLCRealtimeSegmentManager", segmentManager);
+
+    String tableName = "testTable";
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    IdealState idealState = new IdealState(tableNameWithType);
+    idealState.enable(true);
+    
when(helixResourceManager.getTableIdealState(tableNameWithType)).thenReturn(idealState);
+
+    PauseStatusDetails expectedDetails =
+        new PauseStatusDetails(true, Collections.singleton("segment"), 
PauseState.ReasonCode.ADMINISTRATIVE, "comment",
+            "ts");
+    when(segmentManager.pauseConsumption(eq(tableNameWithType), 
eq(PauseState.ReasonCode.ADMINISTRATIVE),
+        eq("comment"), any())).thenReturn(expectedDetails);
+
+    HttpHeaders headers = mock(HttpHeaders.class);
+    Response response = resource.pauseConsumption(tableName, "comment", 4, 7, 
30, headers);
+
+    assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+    PauseStatusDetails actual = (PauseStatusDetails) response.getEntity();
+    assertNotNull(actual);
+    assertEquals(actual.getConsumingSegments(), 
expectedDetails.getConsumingSegments());
+
+    ArgumentCaptor<BatchConfig> configCaptor = 
ArgumentCaptor.forClass(BatchConfig.class);
+    verify(segmentManager).pauseConsumption(eq(tableNameWithType), 
eq(PauseState.ReasonCode.ADMINISTRATIVE),
+        eq("comment"), configCaptor.capture());
+    BatchConfig capturedConfig = configCaptor.getValue();
+    assertNotNull(capturedConfig);
+    assertEquals(capturedConfig.getBatchSize(), 4);
+    assertEquals(capturedConfig.getBatchStatusCheckIntervalMs(), 7000);
+    assertEquals(capturedConfig.getBatchStatusCheckTimeoutMs(), 30000);
+  }
+
+  @Test
+  public void testPauseConsumptionWithInvalidBatchParametersThrowsBadRequest()
+      throws Exception {
+    PinotRealtimeTableResource resource = new PinotRealtimeTableResource();
+    PinotHelixResourceManager helixResourceManager = 
mock(PinotHelixResourceManager.class);
+    PinotLLCRealtimeSegmentManager segmentManager = 
mock(PinotLLCRealtimeSegmentManager.class);
+
+    setField(resource, "_pinotHelixResourceManager", helixResourceManager);
+    setField(resource, "_pinotLLCRealtimeSegmentManager", segmentManager);
+
+    String tableName = "badTable";
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    IdealState idealState = new IdealState(tableNameWithType);
+    idealState.enable(true);
+    
when(helixResourceManager.getTableIdealState(tableNameWithType)).thenReturn(idealState);
+
+    HttpHeaders headers = mock(HttpHeaders.class);
+    ControllerApplicationException exception = 
expectThrows(ControllerApplicationException.class,
+        () -> resource.pauseConsumption(tableName, "comment", 0, 5, 10, 
headers));
+
+    assertEquals(exception.getResponse().getStatus(), 
Response.Status.BAD_REQUEST.getStatusCode());
+    verify(segmentManager, never()).pauseConsumption(any(), any(), any(), 
any());
+  }
+
+  private static void setField(Object target, String fieldName, Object value)
+      throws Exception {
+    Field field = target.getClass().getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 0506b08b558..d2c26e78f2b 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -64,7 +64,8 @@ import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
+import org.apache.pinot.controller.api.resources.BatchConfig;
+import org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
@@ -73,6 +74,7 @@ import 
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
+import org.apache.pinot.spi.config.table.PauseState;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -353,7 +355,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     // Provide a segment that is not in CONSUMING state (non-existent); should 
be ignored without exception
     String nonConsumingSegment = "nonExistingSegment";
     Set<String> committed = segmentManager.forceCommit(REALTIME_TABLE_NAME, 
null, nonConsumingSegment,
-        ForceCommitBatchConfig.of(1, 1, 5));
+        BatchConfig.of(1, 1, 5));
     assertTrue(committed.isEmpty(), "Expected no segments to be committed when 
only non-consuming segments provided");
   }
 
@@ -378,7 +380,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
     segmentManager._idealState.setPartitionState(consumingSegment, "Server_0", 
SegmentStateModel.CONSUMING);
     try {
-      segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment, 
ForceCommitBatchConfig.of(1, 1, 5));
+      segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment, 
BatchConfig.of(1, 1, 5));
       fail("Expected IllegalStateException for partial upsert table with RF > 
1");
     } catch (IllegalStateException e) {
       assertTrue(e.getMessage().contains("Force commit is not allowed when 
replication > 1 for partial-upsert tables"),
@@ -405,7 +407,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
     segmentManager._idealState.setPartitionState(consumingSegment, "Server_0", 
SegmentStateModel.CONSUMING);
     try {
-      segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment, 
ForceCommitBatchConfig.of(1, 1, 5));
+      segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment, 
BatchConfig.of(1, 1, 5));
       fail("Expected IllegalStateException for partial upsert table with RF > 
1");
     } catch (IllegalStateException e) {
       assertTrue(e.getMessage().contains(
@@ -437,7 +439,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     // Force commit should succeed for partial upsert table with RF = 1 (no 
exception thrown)
     try {
-      segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment, 
ForceCommitBatchConfig.of(1, 1, 5));
+      segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment, 
BatchConfig.of(1, 1, 5));
       // If we reach here without exception, test passes
     } catch (IllegalStateException e) {
       fail("Should not throw exception for partial upsert table with RF = 1, 
but got: " + e.getMessage());
@@ -461,7 +463,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     try {
       Set<String> committed = segmentManager.forceCommit(REALTIME_TABLE_NAME, 
null, consumingSegment,
-          ForceCommitBatchConfig.of(1, 1, 5));
+          BatchConfig.of(1, 1, 5));
       assertFalse(committed.isEmpty(), "Expected segments to be committed");
       // If we reach here without exception, test passes
     } catch (IllegalStateException e) {
@@ -469,6 +471,53 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
   }
 
+  @Test
+  public void testForceCommitWithNullBatchConfigUsesSingleBatch() {
+    FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
+    segmentManager._numReplicas = 1;
+    segmentManager.makeTableConfig();
+    segmentManager._numInstances = 2;
+    segmentManager.makeConsumingInstancePartitions();
+    segmentManager._numPartitions = 1;
+    segmentManager.setUpNewTable();
+
+    String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    segmentManager._idealState.setPartitionState(consumingSegment, "Server_0", 
SegmentStateModel.CONSUMING);
+
+    Set<String> committedSegments = 
segmentManager.forceCommit(REALTIME_TABLE_NAME, null, null, null);
+    assertFalse(committedSegments.isEmpty(), "Expected consuming segments to 
be committed with null batch config");
+    assertTrue(committedSegments.contains(consumingSegment),
+        "Returned segments should include the consuming segment present in 
ideal state");
+  }
+
+  @Test
+  public void testPauseConsumptionPassesBatchConfigToForceCommit() {
+    FakePinotLLCRealtimeSegmentManager segmentManager = spy(new 
FakePinotLLCRealtimeSegmentManager());
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+
+    IdealState pausedIdealState = new IdealState(tableNameWithType);
+    String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    pausedIdealState.setPartitionState(consumingSegment, "Server_0", 
SegmentStateModel.CONSUMING);
+
+    
doReturn(pausedIdealState).when(segmentManager).updatePauseStateInIdealState(eq(tableNameWithType),
 eq(true),
+        eq(PauseState.ReasonCode.ADMINISTRATIVE), any());
+
+    BatchConfig batchConfig = BatchConfig.of(2, 3, 10);
+    final BatchConfig[] capturedConfig = new BatchConfig[1];
+    doAnswer(invocation -> {
+      capturedConfig[0] = invocation.getArgument(3);
+      return Collections.singleton(consumingSegment);
+    }).when(segmentManager).forceCommit(eq(tableNameWithType), isNull(), 
isNull(), any());
+
+    PauseStatusDetails pauseStatusDetails =
+        segmentManager.pauseConsumption(tableNameWithType, 
PauseState.ReasonCode.ADMINISTRATIVE, "comment",
+            batchConfig);
+
+    assertEquals(capturedConfig[0], batchConfig, "pauseConsumption should 
forward the supplied batch config");
+    assertEquals(pauseStatusDetails.getConsumingSegments(), 
Collections.singleton(consumingSegment),
+        "pauseConsumption should include consuming segments from the updated 
ideal state");
+  }
+
 
   @Test
   public void testCommitSegmentWithOffsetAutoResetOnOffset()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to