yiguolei commented on code in PR #16940:
URL: https://github.com/apache/doris/pull/16940#discussion_r1112624733


##########
be/src/http/action/stream_load_with_sql.cpp:
##########
@@ -0,0 +1,417 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load_with_sql.h"
+
+#include <deque>
+#include <future>
+#include <sstream>
+
+// use string iequal
+#include <event2/buffer.h>
+#include <event2/bufferevent.h>
+#include <event2/http.h>
+#include <rapidjson/prettywriter.h>
+#include <thrift/protocol/TDebugProtocol.h>
+
+#include "common/config.h"
+#include "common/consts.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "common/utils.h"
+#include "gen_cpp/FrontendService.h"
+#include "gen_cpp/FrontendService_types.h"
+#include "gen_cpp/HeartbeatService_types.h"
+#include "http/http_channel.h"
+#include "http/http_common.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_response.h"
+#include "http/utils.h"
+#include "io/fs/stream_load_pipe.h"
+#include "olap/storage_engine.h"
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/export_task_mgr.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/load_path_mgr.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "runtime/stream_load/stream_load_executor.h"
+#include "runtime/stream_load/stream_load_recorder.h"
+#include "util/byte_buffer.h"
+#include "util/debug_util.h"
+#include "util/doris_metrics.h"
+#include "util/json_util.h"
+#include "util/metrics.h"
+#include "util/string_util.h"
+#include "util/thrift_rpc_helper.h"
+#include "util/time.h"
+#include "util/uid_util.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_requests_total, 
MetricUnit::REQUESTS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_duration_ms, 
MetricUnit::MILLISECONDS);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_current_processing,
+                                   MetricUnit::REQUESTS);
+
+#ifdef BE_TEST
+TStreamLoadPutResult k_stream_load_put_result;
+#endif
+
+static bool is_format_support_streaming(TFileFormatType::type format) {
+    switch (format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+    case TFileFormatType::FORMAT_CSV_BZ2:
+    case TFileFormatType::FORMAT_CSV_DEFLATE:
+    case TFileFormatType::FORMAT_CSV_GZ:
+    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+    case TFileFormatType::FORMAT_CSV_LZO:
+    case TFileFormatType::FORMAT_CSV_LZOP:
+    case TFileFormatType::FORMAT_JSON:
+        return true;
+    default:
+        return false;
+    }
+}
+
+StreamLoadWithSqlAction::StreamLoadWithSqlAction(ExecEnv* exec_env) : 
_exec_env(exec_env) {
+    _stream_load_with_sql_entity =
+            
DorisMetrics::instance()->metric_registry()->register_entity("stream_load_with_sql");
+    INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity,
+                                streaming_load_with_sql_requests_total);
+    INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, 
streaming_load_with_sql_duration_ms);
+    INT_GAUGE_METRIC_REGISTER(_stream_load_with_sql_entity,
+                              streaming_load_with_sql_current_processing);
+}
+
+StreamLoadWithSqlAction::~StreamLoadWithSqlAction() {
+    
DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_with_sql_entity);
+}
+
+void StreamLoadWithSqlAction::handle(HttpRequest* req) {
+    StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx();
+    if (ctx == nullptr) {
+        return;
+    }
+
+    // status already set to fail
+    if (ctx->status.ok()) {
+        ctx->status = _handle(ctx);
+        if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
+                         << ", errmsg=" << ctx->status;
+        }
+    }
+    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
+
+    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+        if (ctx->need_rollback) {
+            _exec_env->stream_load_executor()->rollback_txn(ctx);
+            ctx->need_rollback = false;
+        }
+        if (ctx->body_sink.get() != nullptr) {
+            ctx->body_sink->cancel(ctx->status.to_string());
+        }
+    }
+
+    if (!ctx->status.ok()) {
+        auto str = std::string(ctx->to_json());
+        // add new line at end
+        str = str + '\n';
+        HttpChannel::send_reply(req, str);
+        return;
+    }
+
+    // query stream load status
+    // put request
+    TStreamLoadWithLoadStatusRequest request;
+    TStreamLoadWithLoadStatusResult result;
+    request.__set_loadId(ctx->id.to_thrift());
+    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    while (true) {
+        ThriftRpcHelper::rpc<FrontendServiceClient>(
+                master_addr.hostname, master_addr.port,
+                [&request, &result](FrontendServiceConnection& client) {
+                    client->StreamLoadWithLoadStatus(result, request);
+                });
+        Status stream_load_status(result.status);
+        if (stream_load_status.ok()) {
+            ctx->txn_id = result.txn_id;
+            ctx->number_total_rows = result.total_rows;
+            ctx->number_loaded_rows = result.loaded_rows;
+            ctx->number_filtered_rows = result.filtered_rows;
+            ctx->number_unselected_rows = result.unselected_rows;
+            break;
+        }
+    }
+
+    auto str = std::string(ctx->to_json());
+    // add new line at end
+    str = str + '\n';
+    HttpChannel::send_reply(req, str);
+#ifndef BE_TEST
+    if (config::enable_stream_load_record) {
+        str = ctx->prepare_stream_load_record(str);
+        _save_stream_load_record(ctx, str);
+    }
+#endif
+    // update statistics
+    streaming_load_with_sql_requests_total->increment(1);
+    streaming_load_with_sql_duration_ms->increment(ctx->load_cost_millis);
+    streaming_load_with_sql_current_processing->increment(-1);
+}
+
+Status StreamLoadWithSqlAction::_handle(StreamLoadContext* ctx) {
+    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
+        LOG(WARNING) << "recevie body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes
+                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << 
ctx->id;
+        return Status::InternalError("receive body don't equal with body 
bytes");
+    }
+
+    RETURN_IF_ERROR(ctx->body_sink->finish());
+    // 
ctx->future.wait_for(std::chrono::seconds(config::max_fragment_start_wait_time_seconds));
+    // if (!ctx->future.valid()) {
+    //     return Status::TimedOut("data receive timeout");
+    // }
+    RETURN_IF_ERROR(ctx->future.get());
+
+    if (ctx->two_phase_commit) {
+        int64_t pre_commit_start_time = MonotonicNanos();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx));
+        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
+    } else {
+        // If put file success we need commit this load
+        int64_t commit_and_publish_start_time = MonotonicNanos();
+        // RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx));
+        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
+    }
+    while (!ctx->is_stream_load_put_success) {
+    }
+    return ctx->status;
+}
+
+int StreamLoadWithSqlAction::on_header(HttpRequest* req) {
+    streaming_load_with_sql_current_processing->increment(1);
+
+    StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
+    ctx->ref();
+    req->set_handler_ctx(ctx);
+
+    ctx->load_type = TLoadType::MANUL_LOAD;
+    ctx->load_src_type = TLoadSourceType::RAW;
+
+    ctx->db = req->param(HTTP_DB_KEY);
+    ctx->table = req->param(HTTP_TABLE_KEY);
+    ctx->label = req->header(HTTP_LABEL_KEY);
+    if (ctx->label.empty()) {
+        ctx->label = generate_uuid_string();
+    }
+
+    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? 
true : false;
+
+    LOG(INFO) << "new income streaming load request." << ctx->brief() << ", 
db=" << ctx->db
+              << ", tbl=" << ctx->table;
+
+    auto st = _on_header(req, ctx);
+    if (!st.ok()) {
+        ctx->status = std::move(st);
+        if (ctx->need_rollback) {
+            _exec_env->stream_load_executor()->rollback_txn(ctx);
+            ctx->need_rollback = false;
+        }
+        if (ctx->body_sink.get() != nullptr) {
+            ctx->body_sink->cancel(ctx->status.to_string());
+        }
+        auto str = ctx->to_json();
+        // add new line at end
+        str = str + '\n';
+        HttpChannel::send_reply(req, str);
+        streaming_load_with_sql_current_processing->increment(-1);
+#ifndef BE_TEST
+        if (config::enable_stream_load_record) {
+            str = ctx->prepare_stream_load_record(str);
+            _save_stream_load_record(ctx, str);
+        }
+#endif
+        return -1;
+    }
+    return 0;
+}
+
+Status StreamLoadWithSqlAction::_on_header(HttpRequest* http_req, 
StreamLoadContext* ctx) {
+    // auth information
+    if (!parse_basic_auth(*http_req, &ctx->auth)) {
+        LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
+        return Status::InternalError("no valid Basic authorization");
+    }
+    // default csv
+    ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
+
+    if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
+        return Status::InternalError("unknown data format, format={}",
+                                     http_req->header(HTTP_FORMAT_KEY));
+    }
+
+    // check content length
+    ctx->body_bytes = 0;
+    size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
+
+#ifndef BE_TEST
+    evhttp_connection_set_max_body_size(
+            evhttp_request_get_connection(http_req->get_evhttp_request()), 
csv_max_body_bytes);
+#endif
+
+    // begin transaction
+    int64_t begin_txn_start_time = MonotonicNanos();
+    // RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx));
+    ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
+

Review Comment:
   lost some code:
   // check content length
       ctx->body_bytes = 0;
       size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
       size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 
1024;
       bool read_json_by_line = false;
       if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
           if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
               read_json_by_line = true;
           }
       }
       if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
           ctx->body_bytes = 
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
           // json max body size
           if ((ctx->format == TFileFormatType::FORMAT_JSON) &&
               (ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) {
               return Status::InternalError(
                       "The size of this batch exceed the max size [{}]  of 
json type data "
                       " data [ {} ]. Split the file, or use 
'read_json_by_line'",
                       json_max_body_bytes, ctx->body_bytes);
           }
           // csv max body size
           else if (ctx->body_bytes > csv_max_body_bytes) {
               LOG(WARNING) << "body exceed max size." << ctx->brief();
               return Status::InternalError("body exceed max size: {}, data: 
{}", csv_max_body_bytes,
                                            ctx->body_bytes);
           }
       } else {
   #ifndef BE_TEST
           evhttp_connection_set_max_body_size(
                   
evhttp_request_get_connection(http_req->get_evhttp_request()), 
csv_max_body_bytes);
   #endif
       }
   
       if (!http_req->header(HTTP_TIMEOUT).empty()) {
           try {
               ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT));
           } catch (const std::invalid_argument& e) {
               return Status::InvalidArgument("Invalid timeout format");
           }
       }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to