This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-api-for-tracking-push-job
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 7a74bd5d6206f5b5d6c8fcbbb0f8d5bc884db1a3
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Tue Oct 27 15:40:02 2020 -0700

    Add submitJobResult API in PinotTableRestletResource
---
 .../pinot/common/metrics/ControllerGauge.java      | 18 +++++++++++-
 .../api/resources/PinotTableRestletResource.java   | 33 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 0eb1fd6..be4f8da 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -64,7 +64,11 @@ public enum ControllerGauge implements AbstractMetrics.Gauge 
{
   TABLE_STORAGE_QUOTA_UTILIZATION("TableStorageQuotaUtilization", false),
 
   // Percentage of segments we failed to get size for
-  
TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent",
 false);
+  
TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent",
 false),
+
+  JOB_EXECUTION_TIME("JobExecutionTime", false),
+
+  JOB_FAILED("JobFailed", false);
 
   private final String gaugeName;
   private final String unit;
@@ -95,4 +99,16 @@ public enum ControllerGauge implements AbstractMetrics.Gauge 
{
   public boolean isGlobal() {
     return global;
   }
+
+  public static ControllerGauge getGauge(String gaugeName) {
+    if (gaugeName == null || gaugeName.isEmpty()) {
+      throw new RuntimeException("Gauge name is missing");
+    }
+    for (ControllerGauge gauge : ControllerGauge.values()) {
+      if (gauge.getGaugeName().equalsIgnoreCase(gaugeName)) {
+        return gauge;
+      }
+    }
+    throw new RuntimeException("Invalid gauge name: " + gaugeName);
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 7306d04..83d8e8f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -29,6 +30,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
@@ -47,6 +49,7 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
@@ -558,4 +561,34 @@ public class PinotTableRestletResource {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
     }
   }
+
+  /**
+   * API to submit the result of jobs like segment creation job, segment push 
job that run in Hadoop or Spark.
+   */
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/job")
+  @ApiOperation(value = "Adds a table", notes = "Adds a table")
+  public SuccessResponse submitJobResult(
+      @ApiParam(value = "Name of the table to submit job result", required = 
true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "Job results") @QueryParam("jobResults") String 
jobResults) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+    Map<String, String> resultMap = parseMapFromPayload(jobResults);
+    try {
+      for (Map.Entry<String, String> entry : resultMap.entrySet()) {
+        String metricName = entry.getKey();
+        String metricValue = entry.getValue();
+        _controllerMetrics
+            .setValueOfTableGauge(offlineTableName, 
ControllerGauge.getGauge(metricName), Long.parseLong(metricValue));
+      }
+      return new SuccessResponse("Successfully submitted job results for 
Table: " + tableName);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.BAD_REQUEST);
+    }
+  }
+
+  private Map<String, String> parseMapFromPayload(String payload) {
+    return 
Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=').split(payload);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to