This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 142fb45c82 Allow periodic tasks to run with properties via the POST 
API (#14915)
142fb45c82 is described below

commit 142fb45c8295d1192d34b7caca307593c46557e9
Author: 9aman <35227405+9a...@users.noreply.github.com>
AuthorDate: Wed Apr 2 19:43:50 2025 +0530

    Allow periodic tasks to run with properties via the POST API (#14915)
---
 ...PinotControllerPeriodicTaskRestletResource.java | 39 ++++++++++++++++++++++
 .../RealtimeSegmentValidationManager.java          | 23 +++++++++++--
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
index 906356fba9..ac92b5ba56 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
@@ -26,8 +26,10 @@ import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.util.List;
+import java.util.Map;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
@@ -103,6 +105,43 @@ public class PinotControllerPeriodicTaskRestletResource {
         .build();
   }
 
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/run")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.EXECUTE_TASK)
+  @ApiOperation(value = "Run periodic task against table with custom 
properties. If table name is missing, task will "
+      + "run against all tables.")
+  public Response runPeriodicTaskWithProperties(
+      @ApiParam(value = "Periodic task name", required = true) 
@QueryParam("taskname") String periodicTaskName,
+      @ApiParam(value = "Name of the table") @QueryParam("tableName") String 
tableName,
+      @ApiParam(value = "OFFLINE | REALTIME") @QueryParam("type") String 
tableType,
+      @ApiParam(value = "Task properties") Map<String, String> taskProperties,
+      @Context HttpHeaders headers) {
+
+    if (!_periodicTaskScheduler.hasTask(periodicTaskName)) {
+      throw new WebApplicationException("Periodic task '" + periodicTaskName + 
"' not found.",
+          Response.Status.NOT_FOUND);
+    }
+
+    if (tableName != null) {
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+      List<String> matchingTableNamesWithType =
+          
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName,
+              Constants.validateTableType(tableType), LOGGER);
+
+      if (matchingTableNamesWithType.size() > 1) {
+        throw new WebApplicationException(
+            "More than one table matches Table '" + tableName + "'. Matching 
names: " + matchingTableNamesWithType);
+      }
+
+      tableName = matchingTableNamesWithType.get(0);
+    }
+
+    return Response.ok()
+        
.entity(_pinotHelixResourceManager.invokeControllerPeriodicTask(tableName, 
periodicTaskName, taskProperties))
+        .build();
+  }
+
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/names")
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 2fcb669335..9fa236ecde 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -65,6 +66,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
   private final boolean _segmentAutoResetOnErrorAtValidation;
 
   public static final String OFFSET_CRITERIA = "offsetCriteria";
+  public static final String RUN_SEGMENT_LEVEL_VALIDATION = 
"runSegmentLevelValidation";
 
   public RealtimeSegmentValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
@@ -89,8 +91,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     Context context = new Context();
     // Run segment level validation only if certain time has passed after 
previous run
     long currentTimeMs = System.currentTimeMillis();
-    if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - 
_lastSegmentLevelValidationRunTimeMs)
-        >= _segmentLevelValidationIntervalInSeconds) {
+    if (shouldRunSegmentValidation(periodicTaskProperties, currentTimeMs)) {
       LOGGER.info("Run segment-level validation");
       context._runSegmentLevelValidation = true;
       _lastSegmentLevelValidationRunTimeMs = currentTimeMs;
@@ -242,6 +243,24 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     }
   }
 
+  private boolean shouldRunSegmentValidation(Properties 
periodicTaskProperties, long currentTimeMs) {
+    boolean runValidation = Optional.ofNullable(
+            periodicTaskProperties.getProperty(RUN_SEGMENT_LEVEL_VALIDATION))
+        .map(value -> {
+          try {
+            return Boolean.parseBoolean(value);
+          } catch (Exception e) {
+            return false;
+          }
+        })
+        .orElse(false);
+
+    boolean timeThresholdMet = TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - 
_lastSegmentLevelValidationRunTimeMs)
+        >= _segmentLevelValidationIntervalInSeconds;
+
+    return runValidation || timeThresholdMet;
+  }
+
   @Override
   protected void nonLeaderCleanup(List<String> tableNamesWithType) {
     for (String tableNameWithType : tableNamesWithType) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to