This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b836e86889 Fix job submission time for reload and foce commit job 
(#11803)
b836e86889 is described below

commit b836e868895b55d8fbb01d5f95c1164ac458f4be
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon Oct 16 11:47:00 2023 -0700

    Fix job submission time for reload and foce commit job (#11803)
---
 .../api/resources/PinotRealtimeTableResource.java       |  8 ++++----
 .../api/resources/PinotSegmentRestletResource.java      |  5 ++++-
 .../helix/core/PinotHelixResourceManager.java           | 17 ++++++++++-------
 3 files changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 1c0083494f..404e7553db 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.controller.api.resources;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiKeyAuthDefinition;
@@ -139,8 +138,8 @@ public class PinotRealtimeTableResource {
           + "Please note that this is an asynchronous operation, "
           + "and 200 response does not mean it has actually been done already")
   public Map<String, String> forceCommit(
-      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName)
-      throws JsonProcessingException {
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
+    long startTimeMs = System.currentTimeMillis();
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     validate(tableNameWithType);
     Map<String, String> response = new HashMap<>();
@@ -149,7 +148,8 @@ public class PinotRealtimeTableResource {
       response.put("forceCommitStatus", "SUCCESS");
       try {
         String jobId = UUID.randomUUID().toString();
-        _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, 
jobId, consumingSegmentsForceCommitted);
+        _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, 
jobId, startTimeMs,
+            consumingSegmentsForceCommitted);
         response.put("jobMetaZKWriteStatus", "SUCCESS");
         response.put("forceCommitJobId", jobId);
       } catch (Exception e) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 5db7d8c1c3..33d4b65b5d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -480,6 +480,7 @@ public class PinotSegmentRestletResource {
       @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
       @ApiParam(value = "Whether to force server to download segment") 
@QueryParam("forceDownload")
       @DefaultValue("false") boolean forceDownload) {
+    long startTimeMs = System.currentTimeMillis();
     segmentName = URIUtils.decode(segmentName);
     String tableNameWithType = getExistingTable(tableName, segmentName);
     Pair<Integer, String> msgInfo =
@@ -488,6 +489,7 @@ public class PinotSegmentRestletResource {
     if (msgInfo.getLeft() > 0) {
       try {
         if 
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentName, msgInfo.getRight(),
+            startTimeMs,
             msgInfo.getLeft())) {
           zkJobMetaWriteSuccess = true;
         } else {
@@ -749,6 +751,7 @@ public class PinotSegmentRestletResource {
       @ApiParam(value = "Whether to force server to download segment") 
@QueryParam("forceDownload")
       @DefaultValue("false") boolean forceDownload)
       throws JsonProcessingException {
+    long startTimeMs = System.currentTimeMillis();
     TableType tableTypeFromTableName = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
     // When rawTableName is provided but w/o table type, Pinot tries to reload 
both OFFLINE
@@ -771,7 +774,7 @@ public class PinotSegmentRestletResource {
       perTableMsgData.put(tableNameWithType, tableReloadMeta);
       // Store in ZK
       try {
-        if 
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, 
msgInfo.getRight(),
+        if 
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, 
msgInfo.getRight(), startTimeMs,
             msgInfo.getLeft())) {
           tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
         } else {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 81dec9a34e..c3fde8a85d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2028,32 +2028,33 @@ public class PinotHelixResourceManager {
    * @param tableNameWithType Table for which job is to be added
    * @param segmentName Name of the segment being reloaded
    * @param jobId job's UUID
+   * @param jobSubmissionTimeMs time at which the job was submitted
    * @param numMessagesSent number of messages that were sent to servers. 
Saved as metadata
    * @return boolean representing success / failure of the ZK write step
    */
   public boolean addNewReloadSegmentJob(String tableNameWithType, String 
segmentName, String jobId,
-      int numMessagesSent) {
+      long jobSubmissionTimeMs, int numMessagesSent) {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.RELOAD_SEGMENT.toString());
-    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numMessagesSent));
     
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, 
segmentName);
     return addControllerJobToZK(jobId, jobMetadata,
         
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT));
   }
 
-  public boolean addNewForceCommitJob(String tableNameWithType, String jobId, 
Set<String> consumingSegmentsCommitted)
+  public boolean addNewForceCommitJob(String tableNameWithType, String jobId, 
long jobSubmissionTimeMs,
+      Set<String> consumingSegmentsCommitted)
       throws JsonProcessingException {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.FORCE_COMMIT.toString());
-    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
         JsonUtils.objectToString(consumingSegmentsCommitted));
-
     return addControllerJobToZK(jobId, jobMetadata,
         
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.FORCE_COMMIT));
   }
@@ -2062,15 +2063,17 @@ public class PinotHelixResourceManager {
    * Adds a new reload segment job metadata into ZK
    * @param tableNameWithType Table for which job is to be added
    * @param jobId job's UUID
+   * @param jobSubmissionTimeMs time at which the job was submitted
    * @param numberOfMessagesSent number of messages that were sent to servers. 
Saved as metadata
    * @return boolean representing success / failure of the ZK write step
    */
-  public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String 
jobId, int numberOfMessagesSent) {
+  public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String 
jobId, long jobSubmissionTimeMs,
+      int numberOfMessagesSent) {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.RELOAD_SEGMENT.toString());
-    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numberOfMessagesSent));
     return addControllerJobToZK(jobId, jobMetadata,
         
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to