This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b836e86889 Fix job submission time for reload and foce commit job (#11803) b836e86889 is described below commit b836e868895b55d8fbb01d5f95c1164ac458f4be Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon Oct 16 11:47:00 2023 -0700 Fix job submission time for reload and foce commit job (#11803) --- .../api/resources/PinotRealtimeTableResource.java | 8 ++++---- .../api/resources/PinotSegmentRestletResource.java | 5 ++++- .../helix/core/PinotHelixResourceManager.java | 17 ++++++++++------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index 1c0083494f..404e7553db 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.controller.api.resources; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import io.swagger.annotations.Api; import io.swagger.annotations.ApiKeyAuthDefinition; @@ -139,8 +138,8 @@ public class PinotRealtimeTableResource { + "Please note that this is an asynchronous operation, " + "and 200 response does not mean it has actually been done already") public Map<String, String> forceCommit( - @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) - throws JsonProcessingException { + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { + long startTimeMs = System.currentTimeMillis(); String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); validate(tableNameWithType); Map<String, String> response = new HashMap<>(); @@ -149,7 +148,8 @@ public class PinotRealtimeTableResource { response.put("forceCommitStatus", "SUCCESS"); try { String jobId = UUID.randomUUID().toString(); - _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, jobId, consumingSegmentsForceCommitted); + _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, jobId, startTimeMs, + consumingSegmentsForceCommitted); response.put("jobMetaZKWriteStatus", "SUCCESS"); response.put("forceCommitJobId", jobId); } catch (Exception e) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 5db7d8c1c3..33d4b65b5d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -480,6 +480,7 @@ public class PinotSegmentRestletResource { @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName, @ApiParam(value = "Whether to force server to download segment") @QueryParam("forceDownload") @DefaultValue("false") boolean forceDownload) { + long startTimeMs = System.currentTimeMillis(); segmentName = URIUtils.decode(segmentName); String tableNameWithType = getExistingTable(tableName, segmentName); Pair<Integer, String> msgInfo = @@ -488,6 +489,7 @@ public class PinotSegmentRestletResource { if (msgInfo.getLeft() > 0) { try { if (_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(), + startTimeMs, msgInfo.getLeft())) { zkJobMetaWriteSuccess = true; } else { @@ -749,6 +751,7 @@ public class PinotSegmentRestletResource { @ApiParam(value = "Whether to force server to download segment") @QueryParam("forceDownload") @DefaultValue("false") boolean forceDownload) throws JsonProcessingException { + long startTimeMs = System.currentTimeMillis(); TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName); TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr); // When rawTableName is provided but w/o table type, Pinot tries to reload both OFFLINE @@ -771,7 +774,7 @@ public class PinotSegmentRestletResource { perTableMsgData.put(tableNameWithType, tableReloadMeta); // Store in ZK try { - if (_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, msgInfo.getRight(), + if (_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, msgInfo.getRight(), startTimeMs, msgInfo.getLeft())) { tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS"); } else { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 81dec9a34e..c3fde8a85d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2028,32 +2028,33 @@ public class PinotHelixResourceManager { * @param tableNameWithType Table for which job is to be added * @param segmentName Name of the segment being reloaded * @param jobId job's UUID + * @param jobSubmissionTimeMs time at which the job was submitted * @param numMessagesSent number of messages that were sent to servers. Saved as metadata * @return boolean representing success / failure of the ZK write step */ public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId, - int numMessagesSent) { + long jobSubmissionTimeMs, int numMessagesSent) { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString()); - jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numMessagesSent)); jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName); return addControllerJobToZK(jobId, jobMetadata, ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT)); } - public boolean addNewForceCommitJob(String tableNameWithType, String jobId, Set<String> consumingSegmentsCommitted) + public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, + Set<String> consumingSegmentsCommitted) throws JsonProcessingException { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT.toString()); - jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST, JsonUtils.objectToString(consumingSegmentsCommitted)); - return addControllerJobToZK(jobId, jobMetadata, ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.FORCE_COMMIT)); } @@ -2062,15 +2063,17 @@ public class PinotHelixResourceManager { * Adds a new reload segment job metadata into ZK * @param tableNameWithType Table for which job is to be added * @param jobId job's UUID + * @param jobSubmissionTimeMs time at which the job was submitted * @param numberOfMessagesSent number of messages that were sent to servers. Saved as metadata * @return boolean representing success / failure of the ZK write step */ - public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, int numberOfMessagesSent) { + public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, + int numberOfMessagesSent) { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString()); - jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numberOfMessagesSent)); return addControllerJobToZK(jobId, jobMetadata, ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org