This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch add-api-for-tracking-push-job in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit b0da181bee979e5e109f139c095b0e31e576c1d5 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Tue Oct 27 15:40:02 2020 -0700 Add submitJobResult API in PinotTableRestletResource --- .../pinot/common/metrics/ControllerGauge.java | 18 ++++++++++- .../api/resources/PinotTableRestletResource.java | 35 ++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 0eb1fd6..5e597d9 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -64,7 +64,11 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { TABLE_STORAGE_QUOTA_UTILIZATION("TableStorageQuotaUtilization", false), // Percentage of segments we failed to get size for - TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false); + TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false), + + JOB_EXECUTION_TIME("JobExecutionTime", false), + + JOB_FAILED("JobFailed", false); private final String gaugeName; private final String unit; @@ -95,4 +99,16 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { public boolean isGlobal() { return global; } + + public static ControllerGauge getGauge(String gaugeName) { + if (gaugeName == null || gaugeName.isEmpty()) { + return null; + } + for (ControllerGauge gauge : ControllerGauge.values()) { + if (gauge.getGaugeName().equalsIgnoreCase(gaugeName)) { + return gauge; + } + } + return null; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 7306d04..1af457a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -29,6 +30,7 @@ import java.io.IOException; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -47,6 +49,7 @@ import javax.ws.rs.core.Response; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.pinot.common.exception.TableNotFoundException; +import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; @@ -558,4 +561,36 @@ public class PinotTableRestletResource { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); } } + + /** + * API to submit the status of the jobs like segment creation job, segment push job that run in Hadoop or Spark, + * and emit certain fields as metrics. + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tables/{tableName}/jobStatus") + @ApiOperation(value = "Submit job status for a Pinot table", notes = "Submit job status for a Pinot table") + public SuccessResponse submitJobStatus( + @ApiParam(value = "Name of the table to submit job status", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "Job status") @QueryParam("jobStatus") String jobStatus) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); + Map<String, String> statusMap = parseMapFromPayload(jobStatus); + LOGGER.info("Received job status for Table: {}. Details: {}", tableName, jobStatus); + + for (Map.Entry<String, String> entry : statusMap.entrySet()) { + String metricName = entry.getKey(); + String metricValue = entry.getValue(); + ControllerGauge controllerGauge = ControllerGauge.getGauge(metricName); + if (controllerGauge != null) { + _controllerMetrics + .setValueOfTableGauge(offlineTableName, ControllerGauge.getGauge(metricName), Long.parseLong(metricValue)); + } + } + return new SuccessResponse("Successfully submitted job status for Table: " + tableName); + } + + private Map<String, String> parseMapFromPayload(String payload) { + return Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=').split(payload); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org