This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-api-for-tracking-push-job
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b0da181bee979e5e109f139c095b0e31e576c1d5
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Tue Oct 27 15:40:02 2020 -0700

    Add submitJobResult API in PinotTableRestletResource
---
 .../pinot/common/metrics/ControllerGauge.java      | 18 ++++++++++-
 .../api/resources/PinotTableRestletResource.java   | 35 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 0eb1fd6..5e597d9 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -64,7 +64,11 @@ public enum ControllerGauge implements AbstractMetrics.Gauge 
{
   TABLE_STORAGE_QUOTA_UTILIZATION("TableStorageQuotaUtilization", false),
 
   // Percentage of segments we failed to get size for
-  
TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent",
 false);
+  
TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent",
 false),
+
+  JOB_EXECUTION_TIME("JobExecutionTime", false),
+
+  JOB_FAILED("JobFailed", false);
 
   private final String gaugeName;
   private final String unit;
@@ -95,4 +99,16 @@ public enum ControllerGauge implements AbstractMetrics.Gauge 
{
   public boolean isGlobal() {
     return global;
   }
+
+  public static ControllerGauge getGauge(String gaugeName) {
+    if (gaugeName == null || gaugeName.isEmpty()) {
+      return null;
+    }
+    for (ControllerGauge gauge : ControllerGauge.values()) {
+      if (gauge.getGaugeName().equalsIgnoreCase(gaugeName)) {
+        return gauge;
+      }
+    }
+    return null;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 7306d04..1af457a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -29,6 +30,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
@@ -47,6 +49,7 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
@@ -558,4 +561,36 @@ public class PinotTableRestletResource {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
     }
   }
+
+  /**
+   * API to submit the status of the jobs like segment creation job, segment 
push job that run in Hadoop or Spark,
+   * and emit certain fields as metrics.
+   */
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/jobStatus")
+  @ApiOperation(value = "Submit job status for a Pinot table", notes = "Submit 
job status for a Pinot table")
+  public SuccessResponse submitJobStatus(
+      @ApiParam(value = "Name of the table to submit job status", required = 
true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "Job status") @QueryParam("jobStatus") String 
jobStatus) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+    Map<String, String> statusMap = parseMapFromPayload(jobStatus);
+    LOGGER.info("Received job status for Table: {}. Details: {}", tableName, 
jobStatus);
+
+    for (Map.Entry<String, String> entry : statusMap.entrySet()) {
+      String metricName = entry.getKey();
+      String metricValue = entry.getValue();
+      ControllerGauge controllerGauge = ControllerGauge.getGauge(metricName);
+      if (controllerGauge != null) {
+        _controllerMetrics
+            .setValueOfTableGauge(offlineTableName, 
ControllerGauge.getGauge(metricName), Long.parseLong(metricValue));
+      }
+    }
+    return new SuccessResponse("Successfully submitted job status for Table: " 
+ tableName);
+  }
+
+  private Map<String, String> parseMapFromPayload(String payload) {
+    return 
Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=').split(payload);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to