morningman commented on code in PR #36862: URL: https://github.com/apache/doris/pull/36862#discussion_r1706547317
########## fe/fe-common/src/main/java/org/apache/doris/common/Config.java: ########## @@ -606,6 +606,10 @@ public class Config extends ConfigBase { @ConfField(description = {"Yarn 配置文件的路径", "Yarn config path"}) public static String yarn_config_dir = System.getenv("DORIS_HOME") + "/lib/yarn-config"; + @ConfField(mutable = true, masterOnly = true, description = {"Bucket load 的默认超时时间,单位是秒。", + "Default timeout for bucket load job, in seconds."}) + public static int bucket_load_default_timeout_second = 86400; // 1 day Review Comment: ingestion_load ########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java: ########## @@ -637,4 +653,138 @@ private String getAllHeaders(HttpServletRequest request) { } return headers.toString(); } + + @RequestMapping(path = "/api/ingestion_load/{" + DB_KEY + "}/_create", method = RequestMethod.POST) Review Comment: add catalog level: `/api/ingestion_load/{catalog}/{" + DB_KEY + "}/_create` And now we can only support internal catalog. Same for other APIs ########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java: ########## @@ -637,4 +653,138 @@ private String getAllHeaders(HttpServletRequest request) { } return headers.toString(); } + + @RequestMapping(path = "/api/ingestion_load/{" + DB_KEY + "}/_create", method = RequestMethod.POST) + public Object createIngestionLoad(HttpServletRequest request, HttpServletResponse response, + @PathVariable(value = DB_KEY) String db) { + if (needRedirect(request.getScheme())) { + return redirectToHttps(request); + } + + executeCheckPassword(request, response); + + String fullDbName = getFullDbName(db); + + Map<String, Object> resultMap = new HashMap<>(); + + try { + + String body = HttpUtils.getBody(request); + JsonMapper mapper = JsonMapper.builder().build(); + JsonNode jsonNode = mapper.reader().readTree(body); + + String label = jsonNode.get("label").asText(); + Map<String, List<String>> tableToPartition = mapper.reader() + .readValue(jsonNode.get("tableToPartition").traverse(), + new TypeReference<Map<String, List<String>>>() { + }); + List<String> tableNames = new LinkedList<>(tableToPartition.keySet()); + for (String tableName : tableNames) { + checkTblAuth(ConnectContext.get().getCurrentUserIdentity(), fullDbName, tableName, PrivPredicate.LOAD); + } + + Map<String, String> properties = new HashMap<>(); + if (jsonNode.hasNonNull("properties")) { + properties = mapper.readValue(jsonNode.get("properties").traverse(), + new TypeReference<HashMap<String, String>>() { + }); + } + + executeCreateAndStartIngestionLoad(fullDbName, label, tableNames, properties, tableToPartition, resultMap, + ConnectContext.get().getCurrentUserIdentity()); + + } catch (Exception e) { + LOG.error("create spark load job failed, err: {}", e.getMessage()); Review Comment: use warn level. And add some identified in log, like db or something, so that we can find out which load this log belongs to ########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java: ########## @@ -637,4 +653,138 @@ private String getAllHeaders(HttpServletRequest request) { } return headers.toString(); } + + @RequestMapping(path = "/api/ingestion_load/{" + DB_KEY + "}/_create", method = RequestMethod.POST) Review Comment: Add comment to show an example of the request body, in json format. Same for other APIs ########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java: ########## @@ -637,4 +653,138 @@ private String getAllHeaders(HttpServletRequest request) { } return headers.toString(); } + + @RequestMapping(path = "/api/ingestion_load/{" + DB_KEY + "}/_create", method = RequestMethod.POST) + public Object createIngestionLoad(HttpServletRequest request, HttpServletResponse response, + @PathVariable(value = DB_KEY) String db) { + if (needRedirect(request.getScheme())) { + return redirectToHttps(request); + } + + executeCheckPassword(request, response); + + String fullDbName = getFullDbName(db); + + Map<String, Object> resultMap = new HashMap<>(); + + try { + + String body = HttpUtils.getBody(request); + JsonMapper mapper = JsonMapper.builder().build(); + JsonNode jsonNode = mapper.reader().readTree(body); + + String label = jsonNode.get("label").asText(); + Map<String, List<String>> tableToPartition = mapper.reader() + .readValue(jsonNode.get("tableToPartition").traverse(), + new TypeReference<Map<String, List<String>>>() { + }); + List<String> tableNames = new LinkedList<>(tableToPartition.keySet()); + for (String tableName : tableNames) { + checkTblAuth(ConnectContext.get().getCurrentUserIdentity(), fullDbName, tableName, PrivPredicate.LOAD); + } + + Map<String, String> properties = new HashMap<>(); + if (jsonNode.hasNonNull("properties")) { + properties = mapper.readValue(jsonNode.get("properties").traverse(), + new TypeReference<HashMap<String, String>>() { + }); + } + + executeCreateAndStartIngestionLoad(fullDbName, label, tableNames, properties, tableToPartition, resultMap, + ConnectContext.get().getCurrentUserIdentity()); + + } catch (Exception e) { + LOG.error("create spark load job failed, err: {}", e.getMessage()); Review Comment: same suggestion for other logs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org