This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 52db36c816f Adds ability to pause consumption in batches (#17194)
52db36c816f is described below
commit 52db36c816f91ef8887fddd0beade5d169824296
Author: NOOB <[email protected]>
AuthorDate: Fri Dec 12 09:49:18 2025 +0530
Adds ability to pause consumption in batches (#17194)
* Adds ability to pause consumption in batches
* Adds tests
* Removes reduntant code
* nit
* Renames ForceCommitBatchConfig to BatchConfig
* Fixes lint
---
...orceCommitBatchConfig.java => BatchConfig.java} | 8 +-
.../api/resources/PinotRealtimeTableResource.java | 28 +++--
.../api/resources/PinotTableRestletResource.java | 6 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 26 +++--
.../helix/core/rebalance/RebalanceConfig.java | 14 +--
.../helix/core/rebalance/TableRebalancer.java | 6 +-
...itBatchConfigTest.java => BatchConfigTest.java} | 12 +--
.../resources/PinotRealtimeTableResourceTest.java | 117 +++++++++++++++++++++
.../PinotLLCRealtimeSegmentManagerTest.java | 61 +++++++++--
9 files changed, 235 insertions(+), 43 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/BatchConfig.java
similarity index 84%
rename from
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java
rename to
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/BatchConfig.java
index 483bd09db60..a5e931888a7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/BatchConfig.java
@@ -21,7 +21,7 @@ package org.apache.pinot.controller.api.resources;
import com.google.common.base.Preconditions;
-public class ForceCommitBatchConfig {
+public class BatchConfig {
public static final int DEFAULT_BATCH_SIZE = Integer.MAX_VALUE;
public static final int DEFAULT_STATUS_CHECK_INTERVAL_SEC = 5;
public static final int DEFAULT_STATUS_CHECK_TIMEOUT_SEC = 180;
@@ -29,20 +29,20 @@ public class ForceCommitBatchConfig {
private final int _batchStatusCheckIntervalMs;
private final int _batchStatusCheckTimeoutMs;
- private ForceCommitBatchConfig(int batchSize, int
batchStatusCheckIntervalMs, int batchStatusCheckTimeoutMs) {
+ private BatchConfig(int batchSize, int batchStatusCheckIntervalMs, int
batchStatusCheckTimeoutMs) {
_batchSize = batchSize;
_batchStatusCheckIntervalMs = batchStatusCheckIntervalMs;
_batchStatusCheckTimeoutMs = batchStatusCheckTimeoutMs;
}
- public static ForceCommitBatchConfig of(int batchSize, int
batchStatusCheckIntervalSec,
+ public static BatchConfig of(int batchSize, int batchStatusCheckIntervalSec,
int batchStatusCheckTimeoutSec) {
Preconditions.checkArgument(batchSize > 0, "Batch size should be greater
than zero");
Preconditions.checkArgument(batchStatusCheckIntervalSec > 0,
"Batch status check interval should be greater than zero");
Preconditions.checkArgument(batchStatusCheckTimeoutSec > 0,
"Batch status check timeout should be greater than zero");
- return new ForceCommitBatchConfig(batchSize, batchStatusCheckIntervalSec *
1000, batchStatusCheckTimeoutSec * 1000);
+ return new BatchConfig(batchSize, batchStatusCheckIntervalSec * 1000,
batchStatusCheckTimeoutSec * 1000);
}
public int getBatchSize() {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index bd262a9f995..2e7ad9b880a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -114,13 +114,29 @@ public class PinotRealtimeTableResource {
public Response pauseConsumption(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "Comment on pausing the consumption")
@QueryParam("comment") String comment,
+ @ApiParam(value = "Max number of consuming segments to commit at once")
+ @QueryParam("batchSize") @DefaultValue(BatchConfig.DEFAULT_BATCH_SIZE +
"") int batchSize,
+ @ApiParam(value = "How often to check whether the current batch of
segments have been successfully committed or"
+ + " not")
+ @QueryParam("batchStatusCheckIntervalSec")
+ @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC + "") int
batchStatusCheckIntervalSec,
+ @ApiParam(value = "Timeout based on which the controller will stop
checking the forceCommit status of the batch"
+ + " of segments and throw an exception")
+ @QueryParam("batchStatusCheckTimeoutSec")
+ @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC + "") int
batchStatusCheckTimeoutSec,
@Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
validateTable(tableNameWithType);
+ BatchConfig batchConfig;
+ try {
+ batchConfig = BatchConfig.of(batchSize, batchStatusCheckIntervalSec,
batchStatusCheckTimeoutSec);
+ } catch (IllegalArgumentException e) {
+ throw new ControllerApplicationException(LOGGER, "Invalid batch config",
Response.Status.BAD_REQUEST, e);
+ }
try {
return
Response.ok(_pinotLLCRealtimeSegmentManager.pauseConsumption(tableNameWithType,
- PauseState.ReasonCode.ADMINISTRATIVE, comment)).build();
+ PauseState.ReasonCode.ADMINISTRATIVE, comment, batchConfig)).build();
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
}
@@ -246,24 +262,24 @@ public class PinotRealtimeTableResource {
@ApiParam(value = "Comma separated list of consuming segments to be
committed") @QueryParam("segments")
String consumingSegments,
@ApiParam(value = "Max number of consuming segments to commit at once")
- @QueryParam("batchSize")
@DefaultValue(ForceCommitBatchConfig.DEFAULT_BATCH_SIZE + "") int batchSize,
+ @QueryParam("batchSize") @DefaultValue(BatchConfig.DEFAULT_BATCH_SIZE +
"") int batchSize,
@ApiParam(value = "How often to check whether the current batch of
segments have been successfully committed or"
+ " not")
@QueryParam("batchStatusCheckIntervalSec")
- @DefaultValue(ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC +
"") int batchStatusCheckIntervalSec,
+ @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC + "") int
batchStatusCheckIntervalSec,
@ApiParam(value = "Timeout based on which the controller will stop
checking the forceCommit status of the batch"
+ " of segments and throw an exception")
@QueryParam("batchStatusCheckTimeoutSec")
- @DefaultValue(ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC +
"") int batchStatusCheckTimeoutSec,
+ @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC + "") int
batchStatusCheckTimeoutSec,
@Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
if (partitionGroupIds != null && consumingSegments != null) {
throw new ControllerApplicationException(LOGGER, "Cannot specify both
partitions and segments to commit",
Response.Status.BAD_REQUEST);
}
- ForceCommitBatchConfig batchConfig;
+ BatchConfig batchConfig;
try {
- batchConfig = ForceCommitBatchConfig.of(batchSize,
batchStatusCheckIntervalSec, batchStatusCheckTimeoutSec);
+ batchConfig = BatchConfig.of(batchSize, batchStatusCheckIntervalSec,
batchStatusCheckTimeoutSec);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Invalid batch config",
Response.Status.BAD_REQUEST, e);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 505216602fb..2175394555b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -785,13 +785,13 @@ public class PinotTableRestletResource {
@DefaultValue("false")
@QueryParam("forceCommit") boolean forceCommit,
@ApiParam(value = "Batch size for force commit operations")
- @DefaultValue(ForceCommitBatchConfig.DEFAULT_BATCH_SIZE + "")
+ @DefaultValue(BatchConfig.DEFAULT_BATCH_SIZE + "")
@QueryParam("forceCommitBatchSize") int forceCommitBatchSize,
@ApiParam(value = "Interval in milliseconds for checking force commit
batch status")
- @DefaultValue(ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC *
1000 + "")
+ @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC * 1000 + "")
@QueryParam("forceCommitBatchStatusCheckIntervalMs") int
forceCommitBatchStatusCheckIntervalMs,
@ApiParam(value = "Timeout in milliseconds for force commit batch status
check")
- @DefaultValue(ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC *
1000 + "")
+ @DefaultValue(BatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC * 1000 + "")
@QueryParam("forceCommitBatchStatusCheckTimeoutMs") int
forceCommitBatchStatusCheckTimeoutMs,
@Context HttpHeaders headers
//@formatter:on
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 2f8b6d8b307..9c3a710bfe2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -88,8 +88,8 @@ import
org.apache.pinot.common.utils.helix.IdealStateSingleCommit;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
+import org.apache.pinot.controller.api.resources.BatchConfig;
import org.apache.pinot.controller.api.resources.Constants;
-import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
@@ -2286,17 +2286,17 @@ public class PinotLLCRealtimeSegmentManager {
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit, ForceCommitBatchConfig batchConfig) {
+ @Nullable String segmentsToCommit, @Nullable BatchConfig batchConfig) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments,
partitionGroupIdsToCommit, segmentsToCommit);
- int batchSize = batchConfig.getBatchSize();
- if (batchSize >= targetConsumingSegments.size()) {
+ if ((batchConfig == null) || (batchConfig.getBatchSize() >=
targetConsumingSegments.size())) {
// No need to divide segments in batches.
sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
} else {
- List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+ List<Set<String>> segmentBatchList =
+ getSegmentBatchList(idealState, targetConsumingSegments,
batchConfig.getBatchSize());
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> processBatchesSequentially(segmentBatchList,
tableNameWithType, batchConfig));
executor.shutdown();
@@ -2305,7 +2305,7 @@ public class PinotLLCRealtimeSegmentManager {
}
private void processBatchesSequentially(List<Set<String>> segmentBatchList,
String tableNameWithType,
- ForceCommitBatchConfig forceCommitBatchConfig) {
+ BatchConfig forceCommitBatchConfig) {
Set<String> prevBatch = null;
try {
for (Set<String> segmentBatchToCommit : segmentBatchList) {
@@ -2322,7 +2322,7 @@ public class PinotLLCRealtimeSegmentManager {
}
public void waitUntilSegmentsForceCommitted(String tableNameWithType,
Set<String> segmentsToWait,
- ForceCommitBatchConfig forceCommitBatchConfig)
+ BatchConfig forceCommitBatchConfig)
throws InterruptedException {
int batchStatusCheckIntervalMs =
forceCommitBatchConfig.getBatchStatusCheckIntervalMs();
int batchStatusCheckTimeoutMs =
forceCommitBatchConfig.getBatchStatusCheckTimeoutMs();
@@ -2450,9 +2450,19 @@ public class PinotLLCRealtimeSegmentManager {
*/
public PauseStatusDetails pauseConsumption(String tableNameWithType,
PauseState.ReasonCode reasonCode,
@Nullable String comment) {
+ return pauseConsumption(tableNameWithType, reasonCode, comment, null);
+ }
+
+ /**
+ * Pause consumption on a table by
+ * 1) Update PauseState in the table ideal state and
+ * 2) Sending force commit messages to servers
+ */
+ public PauseStatusDetails pauseConsumption(String tableNameWithType,
PauseState.ReasonCode reasonCode,
+ @Nullable String comment, @Nullable BatchConfig batchConfig) {
IdealState updatedIdealState =
updatePauseStateInIdealState(tableNameWithType, true, reasonCode, comment);
Set<String> consumingSegments = findConsumingSegments(updatedIdealState);
- sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+ forceCommit(tableNameWithType, null, null, batchConfig);
return new PauseStatusDetails(true, consumingSegments, reasonCode, comment
!= null ? comment
: "Pause flag is set. Consuming segments are being committed."
+ " Use /pauseStatus endpoint in a few moments to check if all
consuming segments have been committed.",
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index 716cb94ad6c..5aa9bb05d90 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -24,7 +24,7 @@ import com.google.common.base.Preconditions;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.Objects;
-import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
+import org.apache.pinot.controller.api.resources.BatchConfig;
import org.apache.pinot.spi.utils.Enablement;
@@ -167,16 +167,16 @@ public class RebalanceConfig {
private boolean _forceCommit = false;
@JsonProperty("forceCommitBatchSize")
- @ApiModelProperty(example = ForceCommitBatchConfig.DEFAULT_BATCH_SIZE + "")
- private int _forceCommitBatchSize =
ForceCommitBatchConfig.DEFAULT_BATCH_SIZE;
+ @ApiModelProperty(example = BatchConfig.DEFAULT_BATCH_SIZE + "")
+ private int _forceCommitBatchSize = BatchConfig.DEFAULT_BATCH_SIZE;
@JsonProperty("forceCommitBatchStatusCheckIntervalMs")
- @ApiModelProperty(example =
ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC * 1000 + "")
- private int _forceCommitBatchStatusCheckIntervalMs =
ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC * 1000;
+ @ApiModelProperty(example = BatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC *
1000 + "")
+ private int _forceCommitBatchStatusCheckIntervalMs =
BatchConfig.DEFAULT_STATUS_CHECK_INTERVAL_SEC * 1000;
@JsonProperty("forceCommitBatchStatusCheckTimeoutMs")
- @ApiModelProperty(example =
ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC * 1000 + "")
- private int _forceCommitBatchStatusCheckTimeoutMs =
ForceCommitBatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC * 1000;
+ @ApiModelProperty(example = BatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC *
1000 + "")
+ private int _forceCommitBatchStatusCheckTimeoutMs =
BatchConfig.DEFAULT_STATUS_CHECK_TIMEOUT_SEC * 1000;
public boolean isDryRun() {
return _dryRun;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index d71fa7f4da8..a292e396ce5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -69,7 +69,7 @@ import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
-import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
+import org.apache.pinot.controller.api.resources.BatchConfig;
import
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import
org.apache.pinot.controller.helix.core.assignment.segment.BaseStrictRealtimeSegmentAssignment;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
@@ -2226,8 +2226,8 @@ public class TableRebalancer {
tableRebalanceLogger.info("Force committing {} consuming segments before
moving them", segmentsToCommit.size());
Preconditions.checkState(_pinotLLCRealtimeSegmentManager != null,
"PinotLLCRealtimeSegmentManager is not initialized");
- ForceCommitBatchConfig forceCommitBatchConfig =
- ForceCommitBatchConfig.of(forceCommitBatchSize,
forceCommitBatchStatusCheckIntervalMs / 1000,
+ BatchConfig forceCommitBatchConfig =
+ BatchConfig.of(forceCommitBatchSize,
forceCommitBatchStatusCheckIntervalMs / 1000,
forceCommitBatchStatusCheckTimeoutMs / 1000);
segmentsToCommit =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, null,
StringUtil.join(",", segmentsToCommit.toArray(String[]::new)),
forceCommitBatchConfig);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/BatchConfigTest.java
similarity index 78%
rename from
pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java
rename to
pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/BatchConfigTest.java
index 862f19218f9..b16e1bda0ec 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/BatchConfigTest.java
@@ -24,26 +24,26 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;
-public class ForceCommitBatchConfigTest {
+public class BatchConfigTest {
@Test
public void testForceCommitBatchConfig() {
- ForceCommitBatchConfig forceCommitBatchConfig =
ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+ BatchConfig forceCommitBatchConfig = BatchConfig.of(Integer.MAX_VALUE, 5,
180);
assertEquals(forceCommitBatchConfig.getBatchSize(), Integer.MAX_VALUE);
assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(), 5000);
assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(),
180000);
- forceCommitBatchConfig = ForceCommitBatchConfig.of(1, 5, 180);
+ forceCommitBatchConfig = BatchConfig.of(1, 5, 180);
assertEquals(forceCommitBatchConfig.getBatchSize(), 1);
assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(), 5000);
assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(),
180000);
- forceCommitBatchConfig = ForceCommitBatchConfig.of(1, 23, 37);
+ forceCommitBatchConfig = BatchConfig.of(1, 23, 37);
assertEquals(forceCommitBatchConfig.getBatchSize(), 1);
assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(),
23000);
assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(), 37000);
- assertThrows(IllegalArgumentException.class, () ->
ForceCommitBatchConfig.of(0, 5, 180));
- assertThrows(IllegalArgumentException.class, () ->
ForceCommitBatchConfig.of(32, 0, 0));
+ assertThrows(IllegalArgumentException.class, () -> BatchConfig.of(0, 5,
180));
+ assertThrows(IllegalArgumentException.class, () -> BatchConfig.of(32, 0,
0));
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResourceTest.java
new file mode 100644
index 00000000000..068883ab6d1
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResourceTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import org.apache.helix.model.IdealState;
+import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.PauseState;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.expectThrows;
+
+
+public class PinotRealtimeTableResourceTest {
+
+ @Test
+ public void testPauseConsumptionWithValidBatchParameters()
+ throws Exception {
+ PinotRealtimeTableResource resource = new PinotRealtimeTableResource();
+ PinotHelixResourceManager helixResourceManager =
mock(PinotHelixResourceManager.class);
+ PinotLLCRealtimeSegmentManager segmentManager =
mock(PinotLLCRealtimeSegmentManager.class);
+
+ setField(resource, "_pinotHelixResourceManager", helixResourceManager);
+ setField(resource, "_pinotLLCRealtimeSegmentManager", segmentManager);
+
+ String tableName = "testTable";
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ IdealState idealState = new IdealState(tableNameWithType);
+ idealState.enable(true);
+
when(helixResourceManager.getTableIdealState(tableNameWithType)).thenReturn(idealState);
+
+ PauseStatusDetails expectedDetails =
+ new PauseStatusDetails(true, Collections.singleton("segment"),
PauseState.ReasonCode.ADMINISTRATIVE, "comment",
+ "ts");
+ when(segmentManager.pauseConsumption(eq(tableNameWithType),
eq(PauseState.ReasonCode.ADMINISTRATIVE),
+ eq("comment"), any())).thenReturn(expectedDetails);
+
+ HttpHeaders headers = mock(HttpHeaders.class);
+ Response response = resource.pauseConsumption(tableName, "comment", 4, 7,
30, headers);
+
+ assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ PauseStatusDetails actual = (PauseStatusDetails) response.getEntity();
+ assertNotNull(actual);
+ assertEquals(actual.getConsumingSegments(),
expectedDetails.getConsumingSegments());
+
+ ArgumentCaptor<BatchConfig> configCaptor =
ArgumentCaptor.forClass(BatchConfig.class);
+ verify(segmentManager).pauseConsumption(eq(tableNameWithType),
eq(PauseState.ReasonCode.ADMINISTRATIVE),
+ eq("comment"), configCaptor.capture());
+ BatchConfig capturedConfig = configCaptor.getValue();
+ assertNotNull(capturedConfig);
+ assertEquals(capturedConfig.getBatchSize(), 4);
+ assertEquals(capturedConfig.getBatchStatusCheckIntervalMs(), 7000);
+ assertEquals(capturedConfig.getBatchStatusCheckTimeoutMs(), 30000);
+ }
+
+ @Test
+ public void testPauseConsumptionWithInvalidBatchParametersThrowsBadRequest()
+ throws Exception {
+ PinotRealtimeTableResource resource = new PinotRealtimeTableResource();
+ PinotHelixResourceManager helixResourceManager =
mock(PinotHelixResourceManager.class);
+ PinotLLCRealtimeSegmentManager segmentManager =
mock(PinotLLCRealtimeSegmentManager.class);
+
+ setField(resource, "_pinotHelixResourceManager", helixResourceManager);
+ setField(resource, "_pinotLLCRealtimeSegmentManager", segmentManager);
+
+ String tableName = "badTable";
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ IdealState idealState = new IdealState(tableNameWithType);
+ idealState.enable(true);
+
when(helixResourceManager.getTableIdealState(tableNameWithType)).thenReturn(idealState);
+
+ HttpHeaders headers = mock(HttpHeaders.class);
+ ControllerApplicationException exception =
expectThrows(ControllerApplicationException.class,
+ () -> resource.pauseConsumption(tableName, "comment", 0, 5, 10,
headers));
+
+ assertEquals(exception.getResponse().getStatus(),
Response.Status.BAD_REQUEST.getStatusCode());
+ verify(segmentManager, never()).pauseConsumption(any(), any(), any(),
any());
+ }
+
+ private static void setField(Object target, String fieldName, Object value)
+ throws Exception {
+ Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 0506b08b558..d2c26e78f2b 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -64,7 +64,8 @@ import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
+import org.apache.pinot.controller.api.resources.BatchConfig;
+import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
@@ -73,6 +74,7 @@ import
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
+import org.apache.pinot.spi.config.table.PauseState;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -353,7 +355,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Provide a segment that is not in CONSUMING state (non-existent); should
be ignored without exception
String nonConsumingSegment = "nonExistingSegment";
Set<String> committed = segmentManager.forceCommit(REALTIME_TABLE_NAME,
null, nonConsumingSegment,
- ForceCommitBatchConfig.of(1, 1, 5));
+ BatchConfig.of(1, 1, 5));
assertTrue(committed.isEmpty(), "Expected no segments to be committed when
only non-consuming segments provided");
}
@@ -378,7 +380,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
segmentManager._idealState.setPartitionState(consumingSegment, "Server_0",
SegmentStateModel.CONSUMING);
try {
- segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment,
ForceCommitBatchConfig.of(1, 1, 5));
+ segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment,
BatchConfig.of(1, 1, 5));
fail("Expected IllegalStateException for partial upsert table with RF >
1");
} catch (IllegalStateException e) {
assertTrue(e.getMessage().contains("Force commit is not allowed when
replication > 1 for partial-upsert tables"),
@@ -405,7 +407,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
segmentManager._idealState.setPartitionState(consumingSegment, "Server_0",
SegmentStateModel.CONSUMING);
try {
- segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment,
ForceCommitBatchConfig.of(1, 1, 5));
+ segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment,
BatchConfig.of(1, 1, 5));
fail("Expected IllegalStateException for partial upsert table with RF >
1");
} catch (IllegalStateException e) {
assertTrue(e.getMessage().contains(
@@ -437,7 +439,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Force commit should succeed for partial upsert table with RF = 1 (no
exception thrown)
try {
- segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment,
ForceCommitBatchConfig.of(1, 1, 5));
+ segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment,
BatchConfig.of(1, 1, 5));
// If we reach here without exception, test passes
} catch (IllegalStateException e) {
fail("Should not throw exception for partial upsert table with RF = 1,
but got: " + e.getMessage());
@@ -461,7 +463,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
try {
Set<String> committed = segmentManager.forceCommit(REALTIME_TABLE_NAME,
null, consumingSegment,
- ForceCommitBatchConfig.of(1, 1, 5));
+ BatchConfig.of(1, 1, 5));
assertFalse(committed.isEmpty(), "Expected segments to be committed");
// If we reach here without exception, test passes
} catch (IllegalStateException e) {
@@ -469,6 +471,53 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
}
+ @Test
+ public void testForceCommitWithNullBatchConfigUsesSingleBatch() {
+ FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
+ segmentManager._numReplicas = 1;
+ segmentManager.makeTableConfig();
+ segmentManager._numInstances = 2;
+ segmentManager.makeConsumingInstancePartitions();
+ segmentManager._numPartitions = 1;
+ segmentManager.setUpNewTable();
+
+ String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ segmentManager._idealState.setPartitionState(consumingSegment, "Server_0",
SegmentStateModel.CONSUMING);
+
+ Set<String> committedSegments =
segmentManager.forceCommit(REALTIME_TABLE_NAME, null, null, null);
+ assertFalse(committedSegments.isEmpty(), "Expected consuming segments to
be committed with null batch config");
+ assertTrue(committedSegments.contains(consumingSegment),
+ "Returned segments should include the consuming segment present in
ideal state");
+ }
+
+ @Test
+ public void testPauseConsumptionPassesBatchConfigToForceCommit() {
+ FakePinotLLCRealtimeSegmentManager segmentManager = spy(new
FakePinotLLCRealtimeSegmentManager());
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+
+ IdealState pausedIdealState = new IdealState(tableNameWithType);
+ String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ pausedIdealState.setPartitionState(consumingSegment, "Server_0",
SegmentStateModel.CONSUMING);
+
+
doReturn(pausedIdealState).when(segmentManager).updatePauseStateInIdealState(eq(tableNameWithType),
eq(true),
+ eq(PauseState.ReasonCode.ADMINISTRATIVE), any());
+
+ BatchConfig batchConfig = BatchConfig.of(2, 3, 10);
+ final BatchConfig[] capturedConfig = new BatchConfig[1];
+ doAnswer(invocation -> {
+ capturedConfig[0] = invocation.getArgument(3);
+ return Collections.singleton(consumingSegment);
+ }).when(segmentManager).forceCommit(eq(tableNameWithType), isNull(),
isNull(), any());
+
+ PauseStatusDetails pauseStatusDetails =
+ segmentManager.pauseConsumption(tableNameWithType,
PauseState.ReasonCode.ADMINISTRATIVE, "comment",
+ batchConfig);
+
+ assertEquals(capturedConfig[0], batchConfig, "pauseConsumption should
forward the supplied batch config");
+ assertEquals(pauseStatusDetails.getConsumingSegments(),
Collections.singleton(consumingSegment),
+ "pauseConsumption should include consuming segments from the updated
ideal state");
+ }
+
@Test
public void testCommitSegmentWithOffsetAutoResetOnOffset()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]