This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch add-api-for-tracking-push-job in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 7a74bd5d6206f5b5d6c8fcbbb0f8d5bc884db1a3 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Tue Oct 27 15:40:02 2020 -0700 Add submitJobResult API in PinotTableRestletResource --- .../pinot/common/metrics/ControllerGauge.java | 18 +++++++++++- .../api/resources/PinotTableRestletResource.java | 33 ++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 0eb1fd6..be4f8da 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -64,7 +64,11 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { TABLE_STORAGE_QUOTA_UTILIZATION("TableStorageQuotaUtilization", false), // Percentage of segments we failed to get size for - TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false); + TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false), + + JOB_EXECUTION_TIME("JobExecutionTime", false), + + JOB_FAILED("JobFailed", false); private final String gaugeName; private final String unit; @@ -95,4 +99,16 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { public boolean isGlobal() { return global; } + + public static ControllerGauge getGauge(String gaugeName) { + if (gaugeName == null || gaugeName.isEmpty()) { + throw new RuntimeException("Gauge name is missing"); + } + for (ControllerGauge gauge : ControllerGauge.values()) { + if (gauge.getGaugeName().equalsIgnoreCase(gaugeName)) { + return gauge; + } + } + throw new RuntimeException("Invalid gauge name: " + gaugeName); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 7306d04..83d8e8f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -29,6 +30,7 @@ import java.io.IOException; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -47,6 +49,7 @@ import javax.ws.rs.core.Response; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.pinot.common.exception.TableNotFoundException; +import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; @@ -558,4 +561,34 @@ public class PinotTableRestletResource { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); } } + + /** + * API to submit the result of jobs like segment creation job, segment push job that run in Hadoop or Spark. + */ + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tables/{tableName}/job") + @ApiOperation(value = "Adds a table", notes = "Adds a table") + public SuccessResponse submitJobResult( + @ApiParam(value = "Name of the table to submit job result", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "Job results") @QueryParam("jobResults") String jobResults) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); + Map<String, String> resultMap = parseMapFromPayload(jobResults); + try { + for (Map.Entry<String, String> entry : resultMap.entrySet()) { + String metricName = entry.getKey(); + String metricValue = entry.getValue(); + _controllerMetrics + .setValueOfTableGauge(offlineTableName, ControllerGauge.getGauge(metricName), Long.parseLong(metricValue)); + } + return new SuccessResponse("Successfully submitted job results for Table: " + tableName); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST); + } + } + + private Map<String, String> parseMapFromPayload(String payload) { + return Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=').split(payload); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org