This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5ecca80cc8 Force commit consuming segments (#9197) 5ecca80cc8 is described below commit 5ecca80cc8ac763d5c3ee256ea9504f60c581ecd Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Mon Aug 29 11:43:30 2022 -0700 Force commit consuming segments (#9197) --- .../api/resources/PinotRealtimeTableResource.java | 20 ++++++++++++++++++++ .../realtime/PinotLLCRealtimeSegmentManager.java | 9 +++++++++ 2 files changed, 29 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index ba03a6cc5c..7bf4ac4235 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -90,6 +90,26 @@ public class PinotRealtimeTableResource { } } + @POST + @Path("/tables/{tableName}/forceCommit") + @ApiOperation(value = "Force commit the current consuming segments", + notes = "Force commit the current segments in consuming state and restart consumption. " + + "This should be used after schema/table config changes. " + + "Please note that this is an asynchronous operation, " + + "and 200 response does not mean it has actually been done already") + public Response forceCommit( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { + String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); + validate(tableNameWithType); + try { + _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType); + return Response.ok().build(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + @GET @Path("/tables/{tableName}/pauseStatus") @Produces(MediaType.APPLICATION_JSON) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index fa4f3063e0..5cec6b8c4b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1420,6 +1420,15 @@ public class PinotLLCRealtimeSegmentManager { } } + /** + * Force commit the current segments in consuming state and restart consumption + */ + public void forceCommit(String tableNameWithType) { + IdealState idealState = getIdealState(tableNameWithType); + Set<String> consumingSegments = findConsumingSegments(idealState); + sendForceCommitMessageToServers(tableNameWithType, consumingSegments); + } + /** * Pause consumption on a table by * 1) setting "isTablePaused" in ideal states to true and --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org