Cai-Yao commented on code in PR #16940: URL: https://github.com/apache/doris/pull/16940#discussion_r1112628496
########## be/src/http/action/stream_load_with_sql.cpp: ########## @@ -0,0 +1,417 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/stream_load_with_sql.h" + +#include <deque> +#include <future> +#include <sstream> + +// use string iequal +#include <event2/buffer.h> +#include <event2/bufferevent.h> +#include <event2/http.h> +#include <rapidjson/prettywriter.h> +#include <thrift/protocol/TDebugProtocol.h> + +#include "common/config.h" +#include "common/consts.h" +#include "common/logging.h" +#include "common/status.h" +#include "common/utils.h" +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "http/http_channel.h" +#include "http/http_common.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_response.h" +#include "http/utils.h" +#include "io/fs/stream_load_pipe.h" +#include "olap/storage_engine.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/export_task_mgr.h" +#include "runtime/fragment_mgr.h" +#include "runtime/load_path_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" +#include "runtime/stream_load/stream_load_executor.h" +#include "runtime/stream_load/stream_load_recorder.h" +#include "util/byte_buffer.h" +#include "util/debug_util.h" +#include "util/doris_metrics.h" +#include "util/json_util.h" +#include "util/metrics.h" +#include "util/string_util.h" +#include "util/thrift_rpc_helper.h" +#include "util/time.h" +#include "util/uid_util.h" + +namespace doris { +using namespace ErrorCode; + +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_requests_total, MetricUnit::REQUESTS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_duration_ms, MetricUnit::MILLISECONDS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_current_processing, + MetricUnit::REQUESTS); + +#ifdef BE_TEST +TStreamLoadPutResult k_stream_load_put_result; +#endif + +static bool is_format_support_streaming(TFileFormatType::type format) { + switch (format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZO: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_JSON: + return true; + default: + return false; + } +} + +StreamLoadWithSqlAction::StreamLoadWithSqlAction(ExecEnv* exec_env) : _exec_env(exec_env) { + _stream_load_with_sql_entity = + DorisMetrics::instance()->metric_registry()->register_entity("stream_load_with_sql"); + INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, + streaming_load_with_sql_requests_total); + INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, streaming_load_with_sql_duration_ms); + INT_GAUGE_METRIC_REGISTER(_stream_load_with_sql_entity, + streaming_load_with_sql_current_processing); +} + +StreamLoadWithSqlAction::~StreamLoadWithSqlAction() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_with_sql_entity); +} + +void StreamLoadWithSqlAction::handle(HttpRequest* req) { + StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx(); + if (ctx == nullptr) { + return; + } + + // status already set to fail + if (ctx->status.ok()) { + ctx->status = _handle(ctx); + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; + } + } + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; + + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + if (ctx->need_rollback) { + _exec_env->stream_load_executor()->rollback_txn(ctx); + ctx->need_rollback = false; + } + if (ctx->body_sink.get() != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + } + + if (!ctx->status.ok()) { + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + return; + } + + // query stream load status + // put request + TStreamLoadWithLoadStatusRequest request; + TStreamLoadWithLoadStatusResult result; + request.__set_loadId(ctx->id.to_thrift()); + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + while (true) { + ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->StreamLoadWithLoadStatus(result, request); + }); + Status stream_load_status(result.status); + if (stream_load_status.ok()) { + ctx->txn_id = result.txn_id; + ctx->number_total_rows = result.total_rows; + ctx->number_loaded_rows = result.loaded_rows; + ctx->number_filtered_rows = result.filtered_rows; + ctx->number_unselected_rows = result.unselected_rows; + break; + } + } + + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); +#ifndef BE_TEST + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } +#endif + // update statistics + streaming_load_with_sql_requests_total->increment(1); + streaming_load_with_sql_duration_ms->increment(ctx->load_cost_millis); + streaming_load_with_sql_current_processing->increment(-1); +} + +Status StreamLoadWithSqlAction::_handle(StreamLoadContext* ctx) { + if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { + LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes + << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; + return Status::InternalError("receive body don't equal with body bytes"); + } + + RETURN_IF_ERROR(ctx->body_sink->finish()); + // ctx->future.wait_for(std::chrono::seconds(config::max_fragment_start_wait_time_seconds)); + // if (!ctx->future.valid()) { + // return Status::TimedOut("data receive timeout"); + // } + RETURN_IF_ERROR(ctx->future.get()); + + if (ctx->two_phase_commit) { + int64_t pre_commit_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx)); + ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; + } else { + // If put file success we need commit this load + int64_t commit_and_publish_start_time = MonotonicNanos(); + // RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx)); + ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; + } + while (!ctx->is_stream_load_put_success) { + } + return ctx->status; +} + +int StreamLoadWithSqlAction::on_header(HttpRequest* req) { + streaming_load_with_sql_current_processing->increment(1); + + StreamLoadContext* ctx = new StreamLoadContext(_exec_env); + ctx->ref(); + req->set_handler_ctx(ctx); + + ctx->load_type = TLoadType::MANUL_LOAD; + ctx->load_src_type = TLoadSourceType::RAW; + + ctx->db = req->param(HTTP_DB_KEY); + ctx->table = req->param(HTTP_TABLE_KEY); + ctx->label = req->header(HTTP_LABEL_KEY); + if (ctx->label.empty()) { + ctx->label = generate_uuid_string(); + } + + ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false; + + LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db + << ", tbl=" << ctx->table; + + auto st = _on_header(req, ctx); + if (!st.ok()) { + ctx->status = std::move(st); + if (ctx->need_rollback) { + _exec_env->stream_load_executor()->rollback_txn(ctx); + ctx->need_rollback = false; + } + if (ctx->body_sink.get() != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + auto str = ctx->to_json(); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + streaming_load_with_sql_current_processing->increment(-1); +#ifndef BE_TEST + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } +#endif + return -1; + } + return 0; +} + +Status StreamLoadWithSqlAction::_on_header(HttpRequest* http_req, StreamLoadContext* ctx) { + // auth information + if (!parse_basic_auth(*http_req, &ctx->auth)) { + LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); + return Status::InternalError("no valid Basic authorization"); + } + // default csv + ctx->format = TFileFormatType::FORMAT_CSV_PLAIN; + + if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { + return Status::InternalError("unknown data format, format={}", + http_req->header(HTTP_FORMAT_KEY)); + } + + // check content length + ctx->body_bytes = 0; + size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; + +#ifndef BE_TEST + evhttp_connection_set_max_body_size( + evhttp_request_get_connection(http_req->get_evhttp_request()), csv_max_body_bytes); +#endif + + // begin transaction + int64_t begin_txn_start_time = MonotonicNanos(); + // RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx)); Review Comment: > begin_txn on fe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org