This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 142fb45c82 Allow periodic tasks to run with properties via the POST API (#14915) 142fb45c82 is described below commit 142fb45c8295d1192d34b7caca307593c46557e9 Author: 9aman <35227405+9a...@users.noreply.github.com> AuthorDate: Wed Apr 2 19:43:50 2025 +0530 Allow periodic tasks to run with properties via the POST API (#14915) --- ...PinotControllerPeriodicTaskRestletResource.java | 39 ++++++++++++++++++++++ .../RealtimeSegmentValidationManager.java | 23 +++++++++++-- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java index 906356fba9..ac92b5ba56 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java @@ -26,8 +26,10 @@ import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; import java.util.List; +import java.util.Map; import javax.inject.Inject; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; @@ -103,6 +105,43 @@ public class PinotControllerPeriodicTaskRestletResource { .build(); } + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/run") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.EXECUTE_TASK) + @ApiOperation(value = "Run periodic task against table with custom properties. If table name is missing, task will " + + "run against all tables.") + public Response runPeriodicTaskWithProperties( + @ApiParam(value = "Periodic task name", required = true) @QueryParam("taskname") String periodicTaskName, + @ApiParam(value = "Name of the table") @QueryParam("tableName") String tableName, + @ApiParam(value = "OFFLINE | REALTIME") @QueryParam("type") String tableType, + @ApiParam(value = "Task properties") Map<String, String> taskProperties, + @Context HttpHeaders headers) { + + if (!_periodicTaskScheduler.hasTask(periodicTaskName)) { + throw new WebApplicationException("Periodic task '" + periodicTaskName + "' not found.", + Response.Status.NOT_FOUND); + } + + if (tableName != null) { + tableName = DatabaseUtils.translateTableName(tableName, headers); + List<String> matchingTableNamesWithType = + ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, + Constants.validateTableType(tableType), LOGGER); + + if (matchingTableNamesWithType.size() > 1) { + throw new WebApplicationException( + "More than one table matches Table '" + tableName + "'. Matching names: " + matchingTableNamesWithType); + } + + tableName = matchingTableNamesWithType.get(0); + } + + return Response.ok() + .entity(_pinotHelixResourceManager.invokeControllerPeriodicTask(tableName, periodicTaskName, taskProperties)) + .build(); + } + @GET @Produces(MediaType.APPLICATION_JSON) @Path("/names") diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 2fcb669335..9fa236ecde 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -65,6 +66,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea private final boolean _segmentAutoResetOnErrorAtValidation; public static final String OFFSET_CRITERIA = "offsetCriteria"; + public static final String RUN_SEGMENT_LEVEL_VALIDATION = "runSegmentLevelValidation"; public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, @@ -89,8 +91,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea Context context = new Context(); // Run segment level validation only if certain time has passed after previous run long currentTimeMs = System.currentTimeMillis(); - if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationRunTimeMs) - >= _segmentLevelValidationIntervalInSeconds) { + if (shouldRunSegmentValidation(periodicTaskProperties, currentTimeMs)) { LOGGER.info("Run segment-level validation"); context._runSegmentLevelValidation = true; _lastSegmentLevelValidationRunTimeMs = currentTimeMs; @@ -242,6 +243,24 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea } } + private boolean shouldRunSegmentValidation(Properties periodicTaskProperties, long currentTimeMs) { + boolean runValidation = Optional.ofNullable( + periodicTaskProperties.getProperty(RUN_SEGMENT_LEVEL_VALIDATION)) + .map(value -> { + try { + return Boolean.parseBoolean(value); + } catch (Exception e) { + return false; + } + }) + .orElse(false); + + boolean timeThresholdMet = TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationRunTimeMs) + >= _segmentLevelValidationIntervalInSeconds; + + return runValidation || timeThresholdMet; + } + @Override protected void nonLeaderCleanup(List<String> tableNamesWithType) { for (String tableNameWithType : tableNamesWithType) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org