dataroaring commented on code in PR #21621: URL: https://github.com/apache/doris/pull/21621#discussion_r1273444231
########## be/src/http/action/http_load.cpp: ########## @@ -0,0 +1,572 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/http_load.h" + +// use string iequal +#include <event2/buffer.h> +#include <event2/http.h> +#include <gen_cpp/FrontendService.h> +#include <gen_cpp/FrontendService_types.h> +#include <gen_cpp/HeartbeatService_types.h> +#include <gen_cpp/PaloInternalService_types.h> +#include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/Types_types.h> +#include <stdint.h> +#include <stdlib.h> +#include <sys/time.h> +#include <thrift/protocol/TDebugProtocol.h> +#include <time.h> + +#include <future> +#include <map> +#include <sstream> +#include <stdexcept> +#include <utility> + +#include "common/config.h" +#include "common/consts.h" +#include "common/logging.h" +#include "common/status.h" +#include "common/utils.h" +#include "http/http_channel.h" +#include "http/http_common.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/utils.h" +#include "io/fs/stream_load_pipe.h" +#include "olap/storage_engine.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/load_path_mgr.h" +#include "runtime/message_body_sink.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" +#include "runtime/stream_load/stream_load_executor.h" +#include "runtime/stream_load/stream_load_recorder.h" +#include "util/byte_buffer.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" +#include "util/string_util.h" +#include "util/thrift_rpc_helper.h" +#include "util/time.h" +#include "util/uid_util.h" +#include "util/url_coding.h" + +namespace doris { +using namespace ErrorCode; + +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_load_requests_total, MetricUnit::REQUESTS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_load_duration_ms, MetricUnit::MILLISECONDS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(http_load_current_processing, MetricUnit::REQUESTS); + +void HttpLoadAction::_parse_format(const std::string& format_str, Review Comment: duplicate with code in stream_load.cpp we should refactor it. BTW, please add ut. ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -1785,6 +1797,89 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ return result; } + @Override + public THttpLoadPutParams httpLoadPut(THttpLoadPutRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("receive http load put request: {}, backend: {}", request, clientAddr); + THttpLoadPutParams params = null; + try { + params = httpLoadPutImpl(request); + } catch (UserException e) { + LOG.warn("failed to get stream load plan: {}", e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + } + return params; + } + + private THttpLoadPutParams httpLoadPutImpl(THttpLoadPutRequest request) throws UserException { + LOG.info("receive http load put request"); + THttpLoadPutParams params = new THttpLoadPutParams(); + String loadSql = request.getLoadSql(); + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setQueryId(request.getLoadId()); + ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER); + ctx.setCurrentUserIdentity(UserIdentity.ROOT); + ctx.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser()); + ctx.setThreadLocalInfo(); + + SqlScanner input = new SqlScanner(new StringReader(loadSql), ctx.getSessionVariable().getSqlMode()); + SqlParser parser = new SqlParser(input); + try { + StatementBase parsedStmt = SqlParserUtils.getFirstStmt(parser); + if (parsedStmt instanceof InsertStmt) { + InsertStmt insertStmt = (InsertStmt) parsedStmt; + + SelectStmt selectStmt = (SelectStmt) insertStmt.getQueryStmt(); + + List<TableRef> tableRefList = selectStmt.getTableRefs(); + TableValuedFunctionRef tableValuedFunctionRef = (TableValuedFunctionRef) tableRefList.get(0); + // case-insensitive + Map<String, String> paramMap = new CaseInsensitiveMap(tableValuedFunctionRef.getParams()); + + params.setDb(insertStmt.getDbName()); + params.setTable(insertStmt.getTbl()); + params.setLabel(insertStmt.getLabel()); + // setting params + params.setColumnSeparator(paramMap.get("column_separator")); + params.setLineDelimiter(paramMap.get("line_delimiter")); + params.setMaxFilterRatio(Double.valueOf(paramMap.getOrDefault("max_filter_ratio", "0"))); + params.setWhere(paramMap.get("where")); + params.setPartitions(paramMap.get("partitions")); + params.setTemporaryPartitions(paramMap.get("temporary_partitions")); + params.setColumns(paramMap.get("columns")); + params.setFormat(paramMap.get("format")); + params.setExecMemLimit(paramMap.get("exec_mem_limit")); + params.setStrictMode(Boolean.valueOf(paramMap.getOrDefault("strict_mode", "false"))); + // params.setMergeType((TMergeType) paramMap.get("merge_type")); + params.setTwoPhaseCommit(Boolean.valueOf(paramMap.getOrDefault("two_phase_commit", "false"))); + params.setEnableProfile(Boolean.valueOf(paramMap.getOrDefault("enable_profile", "false"))); + params.setCompressType(paramMap.get("compress_type")); + params.setReadJsonByLine(Boolean.valueOf(paramMap.getOrDefault("read_json_by_line", "false"))); + params.setTimeout(paramMap.get("timeout")); + params.setComment(paramMap.get("comment")); + params.setNegative(Boolean.valueOf(paramMap.getOrDefault("negative", "false"))); + params.setJsonpaths(paramMap.get("jsonpaths")); + params.setJsonRoot(paramMap.get("json_root")); + params.setStripOuterArray(Boolean.valueOf(paramMap.getOrDefault("strip_outer_array", "false"))); + params.setNumAsString(Boolean.valueOf(paramMap.getOrDefault("num_as_string", "false"))); + params.setFuzzyParse(Boolean.valueOf(paramMap.getOrDefault("fuzzy_parse", "false"))); + params.setSequenceCol(paramMap.get("sequence_col")); + params.setSendBatchParallelism(Integer.valueOf(paramMap.getOrDefault("send_batch_parallelism", "0"))); + params.setLoadToSingleTablet(Boolean.valueOf(paramMap.getOrDefault("load_to_single_tablet", "false"))); + params.setDeleteCondition(paramMap.get("delete_condition")); + params.setHiddenColumns(paramMap.get("hidden_columns")); + params.setTrimDoubleQuotes(Boolean.valueOf(paramMap.getOrDefault("trim_double_quotes", "false"))); + params.setSkipLines(Integer.valueOf(paramMap.getOrDefault("skip_lines", "0"))); + params.setPartialColumns(Boolean.valueOf(paramMap.getOrDefault("partial_columns", "false"))); Review Comment: we should write these code much more beauty. ########## gensrc/thrift/FrontendService.thrift: ########## @@ -621,6 +621,59 @@ struct TStreamLoadMultiTablePutResult { 3: optional list<PaloInternalService.TPipelineFragmentParams> pipeline_params } +// HttpLoad request, used to load a streaming to engine +struct THttpLoadPutRequest { + 1: required Types.TUniqueId loadId + 2: optional string load_sql // insert into sql used by http load +} + +struct THttpLoadPutResult { + 1: required Status.TStatus status + 2: required i64 txn_id + 3: optional i64 total_rows + 4: optional i64 loaded_rows + 5: optional i64 filtered_rows + 6: optional i64 unselected_rows +} + +// httptvf properties +struct THttpLoadPutParams { + 1: required string db + 2: required string table + 3: optional string label + 4: optional string column_separator + 5: optional string line_delimiter + 6: optional double max_filter_ratio + 7: optional string where + 8: optional string partitions + 9: optional string temporary_partitions + 10: optional string columns + 11: required string format + 12: optional string exec_mem_limit + 13: optional bool strict_mode + 14: optional string merge_type + 15: optional bool two_phase_commit + 16: optional bool enable_profile + 17: optional string compress_type + 18: optional bool read_json_by_line + 19: optional string timeout + 20: optional string comment + 21: optional bool negative + 22: optional string jsonpaths + 23: optional string json_root + 24: optional bool strip_outer_array + 25: optional bool num_as_string + 26: optional bool fuzzy_parse + 27: optional string sequence_col + 28: optional i32 send_batch_parallelism + 29: optional bool load_to_single_tablet + 30: optional string delete_condition // delete + 31: optional string hidden_columns + 32: optional bool trim_double_quotes // trim double quotes for csv + 33: optional i32 skip_lines // csv skip line num, only used when csv header_type is not set. Review Comment: we should classify these options, like scanner_options, etc. @TangSiyang2001 what is your opinion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org