This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ecca80cc8 Force commit consuming segments (#9197)
5ecca80cc8 is described below

commit 5ecca80cc8ac763d5c3ee256ea9504f60c581ecd
Author: Sajjad Moradi <moradi.saj...@gmail.com>
AuthorDate: Mon Aug 29 11:43:30 2022 -0700

    Force commit consuming segments (#9197)
---
 .../api/resources/PinotRealtimeTableResource.java    | 20 ++++++++++++++++++++
 .../realtime/PinotLLCRealtimeSegmentManager.java     |  9 +++++++++
 2 files changed, 29 insertions(+)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index ba03a6cc5c..7bf4ac4235 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -90,6 +90,26 @@ public class PinotRealtimeTableResource {
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/forceCommit")
+  @ApiOperation(value = "Force commit the current consuming segments",
+      notes = "Force commit the current segments in consuming state and 
restart consumption. "
+          + "This should be used after schema/table config changes. "
+          + "Please note that this is an asynchronous operation, "
+          + "and 200 response does not mean it has actually been done already")
+  public Response forceCommit(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    validate(tableNameWithType);
+    try {
+      _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
+      return Response.ok().build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+
   @GET
   @Path("/tables/{tableName}/pauseStatus")
   @Produces(MediaType.APPLICATION_JSON)
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index fa4f3063e0..5cec6b8c4b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1420,6 +1420,15 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  /**
+   * Force commit the current segments in consuming state and restart 
consumption
+   */
+  public void forceCommit(String tableNameWithType) {
+    IdealState idealState = getIdealState(tableNameWithType);
+    Set<String> consumingSegments = findConsumingSegments(idealState);
+    sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+  }
+
   /**
    * Pause consumption on a table by
    *   1) setting "isTablePaused" in ideal states to true and


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to