github-actions[bot] commented on code in PR #43536:
URL: https://github.com/apache/doris/pull/43536#discussion_r1834765657


##########
be/src/http/action/stream_load.cpp:
##########
@@ -873,4 +908,470 @@ Status 
StreamLoadAction::_handle_group_commit(HttpRequest* req,
     return Status::OK();
 }
 
+void StreamLoadAction::_httpstream_handle(HttpRequest* req,
+                                          std::shared_ptr<StreamLoadContext> 
ctx) {
+    // status already set to fail
+    if (ctx->status.ok()) {
+        ctx->status = _http_stream_handle(req, ctx);
+        if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
+                         << ", errmsg=" << ctx->status;
+        }
+    }
+    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
+
+    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+        if (ctx->body_sink != nullptr) {
+            ctx->body_sink->cancel(ctx->status.to_string());
+        }
+    }
+
+    if (!ctx->status.ok()) {
+        auto str = std::string(ctx->to_json());
+        // add new line at end
+        str = str + '\n';
+        HttpChannel::send_reply(req, str);
+        return;
+    }
+    auto str = std::string(ctx->to_json());
+    // add new line at end
+    str = str + '\n';
+    HttpChannel::send_reply(req, str);
+    if (config::enable_stream_load_record) {
+        str = ctx->prepare_stream_load_record(str);
+        _save_stream_load_record(ctx, str);
+    }
+    // update statistics
+    streaming_load_requests_total->increment(1);
+    streaming_load_duration_ms->increment(ctx->load_cost_millis);
+}
+
+Status StreamLoadAction::_http_stream_handle(HttpRequest* http_req,
+                                             
std::shared_ptr<StreamLoadContext> ctx) {
+    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
+        LOG(WARNING) << "recevie body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes
+                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << 
ctx->id;
+        return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't 
equal with body bytes");
+    }
+    RETURN_IF_ERROR(ctx->body_sink->finish());
+
+    // wait stream load finish
+    RETURN_IF_ERROR(ctx->future.get());
+
+    if (ctx->group_commit) {
+        LOG(INFO) << "skip commit because this is group commit, pipe_id=" << 
ctx->id.to_string();
+        return Status::OK();
+    }
+
+    if (ctx->two_phase_commit) {
+        int64_t pre_commit_start_time = MonotonicNanos();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
+        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
+    } else {
+        // If put file success we need commit this load
+        int64_t commit_and_publish_start_time = MonotonicNanos();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
+    }
+    return Status::OK();
+}
+
+int StreamLoadAction::_httpstream_on_header(HttpRequest* req,
+                                            std::shared_ptr<StreamLoadContext> 
ctx) {
+    req->set_handler_ctx(ctx);
+
+    ctx->load_type = TLoadType::MANUL_LOAD;
+    ctx->load_src_type = TLoadSourceType::RAW;
+    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
+    Status st = _handle_group_commit(req, ctx);
+
+    LOG(INFO) << "new income streaming load request." << ctx->brief() << " sql 
: " << ctx->sql_str
+              << ", group_commit=" << ctx->group_commit;
+    if (st.ok()) {
+        st = _http_stream_on_header(req, ctx);
+    }
+    if (!st.ok()) {
+        ctx->status = std::move(st);
+        if (ctx->body_sink != nullptr) {
+            ctx->body_sink->cancel(ctx->status.to_string());
+        }
+        auto str = ctx->to_json();
+        // add new line at end
+        str = str + '\n';
+        HttpChannel::send_reply(req, str);
+        if (config::enable_stream_load_record) {
+            str = ctx->prepare_stream_load_record(str);
+            _save_stream_load_record(ctx, str);
+        }
+        return -1;
+    }
+    return 0;
+}
+
+Status StreamLoadAction::_http_stream_on_header(HttpRequest* http_req,
+                                                
std::shared_ptr<StreamLoadContext> ctx) {
+    // auth information
+    if (!parse_basic_auth(*http_req, &ctx->auth)) {
+        LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
+        return Status::NotAuthorized("no valid Basic authorization");
+    }
+
+    // TODO(zs) : need Need to request an FE to obtain information such as 
format
+    // check content length
+    ctx->body_bytes = 0;
+    size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
+    if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
+        try {
+            ctx->body_bytes = 
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+        } catch (const std::exception& e) {
+            return Status::InvalidArgument("invalid HTTP header 
CONTENT_LENGTH={}: {}",
+                                           
http_req->header(HttpHeaders::CONTENT_LENGTH), e.what());
+        }
+        // csv max body size
+        if (ctx->body_bytes > csv_max_body_bytes) {
+            LOG(WARNING) << "body exceed max size." << ctx->brief();
+            return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
+                    "body size {} exceed BE's conf `streaming_load_max_mb` {}. 
increase it if you "
+                    "are sure this load is reasonable",
+                    ctx->body_bytes, csv_max_body_bytes);
+        }
+    }
+
+    auto pipe = std::make_shared<io::StreamLoadPipe>(
+            io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* 
min_chunk_size */,
+            ctx->body_bytes /* total_length */);
+    ctx->body_sink = pipe;
+    ctx->pipe = pipe;
+
+    RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx));
+
+    // Here, transactions are set from fe's NativeInsertStmt.
+    // TODO(zs) : How to support two_phase_commit
+
+    return Status::OK();
+}
+
+void StreamLoadAction::_httpstream_on_chunk_data(HttpRequest* req,
+                                                 
std::shared_ptr<StreamLoadContext> ctx) {
+    if (!req->header(HTTP_WAL_ID_KY).empty()) {
+        ctx->wal_id = std::stoll(req->header(HTTP_WAL_ID_KY));
+    }
+    struct evhttp_request* ev_req = req->get_evhttp_request();
+    auto evbuf = evhttp_request_get_input_buffer(ev_req);
+
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+
+    int64_t start_read_data_time = MonotonicNanos();
+    Status st = ctx->allocate_schema_buffer();
+    if (!st.ok()) {
+        ctx->status = st;
+        return;
+    }
+    while (evbuffer_get_length(evbuf) > 0) {
+        ByteBufferPtr bb;
+        st = ByteBuffer::allocate(128 * 1024, &bb);
+        if (!st.ok()) {
+            ctx->status = st;
+            return;
+        }
+        auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
+        bb->pos = remove_bytes;
+        bb->flip();
+        st = ctx->body_sink->append(bb);
+        // schema_buffer stores 1M of data for parsing column information
+        // need to determine whether to cache for the first time
+        if (ctx->is_read_schema) {
+            if (ctx->schema_buffer()->pos + remove_bytes < 
config::stream_tvf_buffer_size) {
+                ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
+            } else {
+                LOG(INFO) << "use a portion of data to request fe to obtain 
column information";
+                ctx->is_read_schema = false;
+                ctx->status = httpstream_process_put(req, ctx);
+            }
+        }
+        if (!st.ok()) {
+            LOG(WARNING) << "append body content failed. errmsg=" << st << ", 
" << ctx->brief();
+            ctx->status = st;
+            return;
+        }
+        ctx->receive_bytes += remove_bytes;
+    }
+    // after all the data has been read and it has not reached 1M, it will 
execute here
+    if (ctx->is_read_schema) {
+        LOG(INFO) << "after all the data has been read and it has not reached 
1M, it will execute "
+                  << "here";
+        ctx->is_read_schema = false;
+        ctx->status = httpstream_process_put(req, ctx);
+    }
+    ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
+}
+
+Status StreamLoadAction::httpstream_process_put(HttpRequest* http_req,
+                                                
std::shared_ptr<StreamLoadContext> ctx) {
+    TStreamLoadPutRequest request;
+    if (http_req != nullptr) {
+        request.__set_load_sql(ctx->sql_str);
+        if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
+            bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), 
"true");
+            request.__set_memtable_on_sink_node(value);
+        }
+    } else {
+        request.__set_token(ctx->auth.token);
+        request.__set_load_sql(ctx->sql_str);
+        ctx->auth.token = "";
+    }
+    set_request_auth(&request, ctx->auth);
+    request.__set_loadId(ctx->id.to_thrift());
+    request.__set_label(ctx->label);
+    if (ctx->group_commit) {
+        if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
+            
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
+        } else {
+            // used for wait_internal_group_commit_finish
+            request.__set_group_commit_mode("sync_mode");
+        }
+    }
+    if (_exec_env->master_info()->__isset.backend_id) {
+        request.__set_backend_id(_exec_env->master_info()->backend_id);
+    } else {
+        LOG(WARNING) << "_exec_env->master_info not set backend_id";
+    }
+
+    // plan this load
+    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    int64_t stream_load_put_start_time = MonotonicNanos();
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&request, ctx](FrontendServiceConnection& client) {
+                client->streamLoadPut(ctx->put_result, request);
+            }));
+    ctx->stream_load_put_cost_nanos = MonotonicNanos() - 
stream_load_put_start_time;
+    Status plan_status(Status::create(ctx->put_result.status));
+    if (!plan_status.ok()) {
+        LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status 
<< ctx->brief();
+        return plan_status;
+    }
+    if (config::is_cloud_mode() && ctx->two_phase_commit && 
ctx->is_mow_table()) {
+        return Status::NotSupported("http stream 2pc is unsupported for mow 
table");
+    }
+    ctx->db = ctx->put_result.pipeline_params.db_name;
+    ctx->table = ctx->put_result.pipeline_params.table_name;
+    ctx->txn_id = ctx->put_result.pipeline_params.txn_conf.txn_id;
+    ctx->label = ctx->put_result.pipeline_params.import_label;
+    ctx->put_result.pipeline_params.__set_wal_id(ctx->wal_id);
+    if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == 
"async_mode") {
+        // FIXME find a way to avoid chunked stream load write large WALs
+        size_t content_length = 0;
+        if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
+            try {
+                content_length = 
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+            } catch (const std::exception& e) {
+                return Status::InvalidArgument("invalid HTTP header 
CONTENT_LENGTH={}: {}",
+                                               
http_req->header(HttpHeaders::CONTENT_LENGTH),
+                                               e.what());
+            }
+            if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
+                ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
+                ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+                content_length *= 3;
+            }
+        }
+        ctx->put_result.pipeline_params.__set_content_length(content_length);
+    }
+
+    return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
+}
+
+Status StreamLoadAction::_construct_sql_from_req(HttpRequest* req,
+                                                 
std::shared_ptr<StreamLoadContext> ctx) {
+    string db_name;
+    string table_name;
+    url_decode(req->param(HTTP_DB_KEY), &db_name);
+    url_decode(req->param(HTTP_TABLE_KEY), &table_name);
+
+    string& sql = ctx->sql_str;
+
+    sql = "INSERT INTO ";
+
+    sql += db_name + "." + table_name + " ";
+
+    sql += "SELECT * ";
+
+    sql += "FROM http_stream(";
+
+    if (!req->header(HTTP_ENCLOSE).empty() && 
!req->header(HTTP_ENCLOSE).empty()) {
+        const auto& enclose_str = req->header(HTTP_ENCLOSE);
+        if (enclose_str.length() != 1) {
+            return Status::InvalidArgument("enclose must be single-char, 
actually is {}",
+                                           enclose_str);
+        }
+    }
+    if (!req->header(HTTP_ESCAPE).empty() && 
!req->header(HTTP_ESCAPE).empty()) {
+        const auto& escape_str = req->header(HTTP_ESCAPE);
+        if (escape_str.length() != 1) {
+            return Status::InvalidArgument("escape must be single-char, 
actually is {}",
+                                           escape_str);
+        }
+    }
+    if (!req->header(HTTP_PARTITIONS).empty()) {
+        if (!req->header(HTTP_TEMP_PARTITIONS).empty()) {
+            return Status::InvalidArgument(
+                    "Can not specify both partitions and temporary 
partitions");
+        }
+    }
+    if (!req->header(HTTP_TEMP_PARTITIONS).empty()) {
+        if (!req->header(HTTP_PARTITIONS).empty()) {
+            return Status::InvalidArgument(
+                    "Can not specify both partitions and temporary 
partitions");
+        }

Review Comment:
   warning: function '_construct_sql_from_req' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   Status StreamLoadAction::_construct_sql_from_req(HttpRequest* req,
                            ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/http/action/stream_load.cpp:1229:** 186 lines including whitespace 
and comments (threshold 80)
   ```cpp
   Status StreamLoadAction::_construct_sql_from_req(HttpRequest* req,
                            ^
   ```
   
   </details>
   



##########
be/src/http/action/stream_load.cpp:
##########
@@ -873,4 +908,470 @@ Status 
StreamLoadAction::_handle_group_commit(HttpRequest* req,
     return Status::OK();
 }
 
+void StreamLoadAction::_httpstream_handle(HttpRequest* req,
+                                          std::shared_ptr<StreamLoadContext> 
ctx) {
+    // status already set to fail
+    if (ctx->status.ok()) {
+        ctx->status = _http_stream_handle(req, ctx);
+        if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
+                         << ", errmsg=" << ctx->status;
+        }
+    }
+    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
+
+    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+        if (ctx->body_sink != nullptr) {
+            ctx->body_sink->cancel(ctx->status.to_string());
+        }
+    }
+
+    if (!ctx->status.ok()) {
+        auto str = std::string(ctx->to_json());
+        // add new line at end
+        str = str + '\n';
+        HttpChannel::send_reply(req, str);
+        return;
+    }
+    auto str = std::string(ctx->to_json());
+    // add new line at end
+    str = str + '\n';
+    HttpChannel::send_reply(req, str);
+    if (config::enable_stream_load_record) {
+        str = ctx->prepare_stream_load_record(str);
+        _save_stream_load_record(ctx, str);
+    }
+    // update statistics
+    streaming_load_requests_total->increment(1);
+    streaming_load_duration_ms->increment(ctx->load_cost_millis);
+}
+
+Status StreamLoadAction::_http_stream_handle(HttpRequest* http_req,
+                                             
std::shared_ptr<StreamLoadContext> ctx) {
+    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
+        LOG(WARNING) << "recevie body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes
+                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << 
ctx->id;
+        return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't 
equal with body bytes");
+    }
+    RETURN_IF_ERROR(ctx->body_sink->finish());
+
+    // wait stream load finish
+    RETURN_IF_ERROR(ctx->future.get());
+
+    if (ctx->group_commit) {
+        LOG(INFO) << "skip commit because this is group commit, pipe_id=" << 
ctx->id.to_string();
+        return Status::OK();
+    }
+
+    if (ctx->two_phase_commit) {
+        int64_t pre_commit_start_time = MonotonicNanos();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
+        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
+    } else {
+        // If put file success we need commit this load
+        int64_t commit_and_publish_start_time = MonotonicNanos();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
+    }
+    return Status::OK();
+}
+
+int StreamLoadAction::_httpstream_on_header(HttpRequest* req,
+                                            std::shared_ptr<StreamLoadContext> 
ctx) {
+    req->set_handler_ctx(ctx);
+
+    ctx->load_type = TLoadType::MANUL_LOAD;
+    ctx->load_src_type = TLoadSourceType::RAW;
+    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
+    Status st = _handle_group_commit(req, ctx);
+
+    LOG(INFO) << "new income streaming load request." << ctx->brief() << " sql 
: " << ctx->sql_str
+              << ", group_commit=" << ctx->group_commit;
+    if (st.ok()) {
+        st = _http_stream_on_header(req, ctx);
+    }
+    if (!st.ok()) {
+        ctx->status = std::move(st);
+        if (ctx->body_sink != nullptr) {
+            ctx->body_sink->cancel(ctx->status.to_string());
+        }
+        auto str = ctx->to_json();
+        // add new line at end
+        str = str + '\n';
+        HttpChannel::send_reply(req, str);
+        if (config::enable_stream_load_record) {
+            str = ctx->prepare_stream_load_record(str);
+            _save_stream_load_record(ctx, str);
+        }
+        return -1;
+    }
+    return 0;
+}
+
+Status StreamLoadAction::_http_stream_on_header(HttpRequest* http_req,
+                                                
std::shared_ptr<StreamLoadContext> ctx) {
+    // auth information
+    if (!parse_basic_auth(*http_req, &ctx->auth)) {
+        LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
+        return Status::NotAuthorized("no valid Basic authorization");
+    }
+
+    // TODO(zs) : need Need to request an FE to obtain information such as 
format
+    // check content length
+    ctx->body_bytes = 0;
+    size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
+    if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
+        try {
+            ctx->body_bytes = 
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+        } catch (const std::exception& e) {
+            return Status::InvalidArgument("invalid HTTP header 
CONTENT_LENGTH={}: {}",
+                                           
http_req->header(HttpHeaders::CONTENT_LENGTH), e.what());
+        }
+        // csv max body size
+        if (ctx->body_bytes > csv_max_body_bytes) {
+            LOG(WARNING) << "body exceed max size." << ctx->brief();
+            return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
+                    "body size {} exceed BE's conf `streaming_load_max_mb` {}. 
increase it if you "
+                    "are sure this load is reasonable",
+                    ctx->body_bytes, csv_max_body_bytes);
+        }
+    }
+
+    auto pipe = std::make_shared<io::StreamLoadPipe>(
+            io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* 
min_chunk_size */,
+            ctx->body_bytes /* total_length */);
+    ctx->body_sink = pipe;
+    ctx->pipe = pipe;
+
+    RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx));
+
+    // Here, transactions are set from fe's NativeInsertStmt.
+    // TODO(zs) : How to support two_phase_commit
+
+    return Status::OK();
+}
+
+void StreamLoadAction::_httpstream_on_chunk_data(HttpRequest* req,
+                                                 
std::shared_ptr<StreamLoadContext> ctx) {
+    if (!req->header(HTTP_WAL_ID_KY).empty()) {
+        ctx->wal_id = std::stoll(req->header(HTTP_WAL_ID_KY));
+    }
+    struct evhttp_request* ev_req = req->get_evhttp_request();
+    auto evbuf = evhttp_request_get_input_buffer(ev_req);
+
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+
+    int64_t start_read_data_time = MonotonicNanos();
+    Status st = ctx->allocate_schema_buffer();
+    if (!st.ok()) {
+        ctx->status = st;
+        return;
+    }
+    while (evbuffer_get_length(evbuf) > 0) {
+        ByteBufferPtr bb;
+        st = ByteBuffer::allocate(128 * 1024, &bb);
+        if (!st.ok()) {
+            ctx->status = st;
+            return;
+        }
+        auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
+        bb->pos = remove_bytes;
+        bb->flip();
+        st = ctx->body_sink->append(bb);
+        // schema_buffer stores 1M of data for parsing column information
+        // need to determine whether to cache for the first time
+        if (ctx->is_read_schema) {
+            if (ctx->schema_buffer()->pos + remove_bytes < 
config::stream_tvf_buffer_size) {
+                ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
+            } else {
+                LOG(INFO) << "use a portion of data to request fe to obtain 
column information";
+                ctx->is_read_schema = false;
+                ctx->status = httpstream_process_put(req, ctx);
+            }
+        }
+        if (!st.ok()) {
+            LOG(WARNING) << "append body content failed. errmsg=" << st << ", 
" << ctx->brief();
+            ctx->status = st;
+            return;
+        }
+        ctx->receive_bytes += remove_bytes;
+    }
+    // after all the data has been read and it has not reached 1M, it will 
execute here
+    if (ctx->is_read_schema) {
+        LOG(INFO) << "after all the data has been read and it has not reached 
1M, it will execute "
+                  << "here";
+        ctx->is_read_schema = false;
+        ctx->status = httpstream_process_put(req, ctx);
+    }
+    ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
+}
+
+Status StreamLoadAction::httpstream_process_put(HttpRequest* http_req,
+                                                
std::shared_ptr<StreamLoadContext> ctx) {
+    TStreamLoadPutRequest request;
+    if (http_req != nullptr) {
+        request.__set_load_sql(ctx->sql_str);
+        if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
+            bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), 
"true");
+            request.__set_memtable_on_sink_node(value);
+        }
+    } else {
+        request.__set_token(ctx->auth.token);
+        request.__set_load_sql(ctx->sql_str);
+        ctx->auth.token = "";
+    }
+    set_request_auth(&request, ctx->auth);
+    request.__set_loadId(ctx->id.to_thrift());
+    request.__set_label(ctx->label);
+    if (ctx->group_commit) {
+        if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
+            
request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
+        } else {
+            // used for wait_internal_group_commit_finish
+            request.__set_group_commit_mode("sync_mode");
+        }
+    }
+    if (_exec_env->master_info()->__isset.backend_id) {
+        request.__set_backend_id(_exec_env->master_info()->backend_id);
+    } else {
+        LOG(WARNING) << "_exec_env->master_info not set backend_id";
+    }
+
+    // plan this load
+    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    int64_t stream_load_put_start_time = MonotonicNanos();
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&request, ctx](FrontendServiceConnection& client) {
+                client->streamLoadPut(ctx->put_result, request);
+            }));
+    ctx->stream_load_put_cost_nanos = MonotonicNanos() - 
stream_load_put_start_time;
+    Status plan_status(Status::create(ctx->put_result.status));
+    if (!plan_status.ok()) {
+        LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status 
<< ctx->brief();
+        return plan_status;
+    }
+    if (config::is_cloud_mode() && ctx->two_phase_commit && 
ctx->is_mow_table()) {
+        return Status::NotSupported("http stream 2pc is unsupported for mow 
table");
+    }
+    ctx->db = ctx->put_result.pipeline_params.db_name;
+    ctx->table = ctx->put_result.pipeline_params.table_name;
+    ctx->txn_id = ctx->put_result.pipeline_params.txn_conf.txn_id;
+    ctx->label = ctx->put_result.pipeline_params.import_label;
+    ctx->put_result.pipeline_params.__set_wal_id(ctx->wal_id);
+    if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == 
"async_mode") {
+        // FIXME find a way to avoid chunked stream load write large WALs
+        size_t content_length = 0;
+        if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
+            try {
+                content_length = 
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+            } catch (const std::exception& e) {
+                return Status::InvalidArgument("invalid HTTP header 
CONTENT_LENGTH={}: {}",
+                                               
http_req->header(HttpHeaders::CONTENT_LENGTH),
+                                               e.what());
+            }
+            if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
+                ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
+                ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
+                ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+                content_length *= 3;
+            }
+        }
+        ctx->put_result.pipeline_params.__set_content_length(content_length);
+    }
+
+    return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
+}
+
+Status StreamLoadAction::_construct_sql_from_req(HttpRequest* req,
+                                                 
std::shared_ptr<StreamLoadContext> ctx) {
+    string db_name;
+    string table_name;
+    url_decode(req->param(HTTP_DB_KEY), &db_name);
+    url_decode(req->param(HTTP_TABLE_KEY), &table_name);
+
+    string& sql = ctx->sql_str;
+
+    sql = "INSERT INTO ";
+
+    sql += db_name + "." + table_name + " ";
+
+    sql += "SELECT * ";
+
+    sql += "FROM http_stream(";
+
+    if (!req->header(HTTP_ENCLOSE).empty() && 
!req->header(HTTP_ENCLOSE).empty()) {
+        const auto& enclose_str = req->header(HTTP_ENCLOSE);
+        if (enclose_str.length() != 1) {
+            return Status::InvalidArgument("enclose must be single-char, 
actually is {}",
+                                           enclose_str);
+        }
+    }
+    if (!req->header(HTTP_ESCAPE).empty() && 
!req->header(HTTP_ESCAPE).empty()) {
+        const auto& escape_str = req->header(HTTP_ESCAPE);
+        if (escape_str.length() != 1) {
+            return Status::InvalidArgument("escape must be single-char, 
actually is {}",
+                                           escape_str);
+        }
+    }
+    if (!req->header(HTTP_PARTITIONS).empty()) {
+        if (!req->header(HTTP_TEMP_PARTITIONS).empty()) {
+            return Status::InvalidArgument(
+                    "Can not specify both partitions and temporary 
partitions");
+        }
+    }
+    if (!req->header(HTTP_TEMP_PARTITIONS).empty()) {
+        if (!req->header(HTTP_PARTITIONS).empty()) {
+            return Status::InvalidArgument(
+                    "Can not specify both partitions and temporary 
partitions");
+        }

Review Comment:
   warning: function '_construct_sql_from_req' has cognitive complexity of 102 
(threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status StreamLoadAction::_construct_sql_from_req(HttpRequest* req,
                            ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/http/action/stream_load.cpp:1246:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       if (!req->header(HTTP_ENCLOSE).empty() && 
!req->header(HTTP_ENCLOSE).empty()) {
       ^
   ```
   **be/src/http/action/stream_load.cpp:1246:** +1
   ```cpp
       if (!req->header(HTTP_ENCLOSE).empty() && 
!req->header(HTTP_ENCLOSE).empty()) {
                                              ^
   ```
   **be/src/http/action/stream_load.cpp:1248:** +2, including nesting penalty 
of 1, nesting level increased to 2
   ```cpp
           if (enclose_str.length() != 1) {
           ^
   ```
   **be/src/http/action/stream_load.cpp:1253:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       if (!req->header(HTTP_ESCAPE).empty() && 
!req->header(HTTP_ESCAPE).empty()) {
       ^
   ```
   **be/src/http/action/stream_load.cpp:1253:** +1
   ```cpp
       if (!req->header(HTTP_ESCAPE).empty() && 
!req->header(HTTP_ESCAPE).empty()) {
                                             ^
   ```
   **be/src/http/action/stream_load.cpp:1255:** +2, including nesting penalty 
of 1, nesting level increased to 2
   ```cpp
           if (escape_str.length() != 1) {
           ^
   ```
   **be/src/http/action/stream_load.cpp:1260:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       if (!req->header(HTTP_PARTITIONS).empty()) {
       ^
   ```
   **be/src/http/action/stream_load.cpp:1261:** +2, including nesting penalty 
of 1, nesting level increased to 2
   ```cpp
           if (!req->header(HTTP_TEMP_PARTITIONS).empty()) {
           ^
   ```
   **be/src/http/action/stream_load.cpp:1266:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       if (!req->header(HTTP_TEMP_PARTITIONS).empty()) {
       ^
   ```
   **be/src/http/action/stream_load.cpp:1267:** +2, including nesting penalty 
of 1, nesting level increased to 2
   ```cpp
           if (!req->header(HTTP_PARTITIONS).empty()) {
           ^
   ```
   **be/src/http/action/stream_load.cpp:1273:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       if (!req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) {
       ^
   ```
   **be/src/http/action/stream_load.cpp:1276:** +2, including nesting penalty 
of 1, nesting level increased to 2
   ```cpp
           } catch (const std::invalid_argument& e) {
             ^
   ```
   **be/src/http/action/stream_load.cpp:1279:** +2, including nesting penalty 
of 1, nesting level increased to 2
   ```cpp
           } catch (const std::out_of_range& e) {
             ^
   ```
   **be/src/http/action/stream_load.cpp:1288:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       if (!req->header(HTTP_MERGE_TYPE).empty()) {
       ^
   ```
   **be/src/http/action/stream_load.cpp:1291:** +2, including nesting penalty 
of 1, nesting level increased to 2
   ```cpp
           if (iter != merge_type_map.end()) {
           ^
   ```
   **be/src/http/action/stream_load.cpp:1293:** +1, nesting level increased to 2
   ```cpp
           } else {
             ^
   ```
   **be/src/http/action/stream_load.cpp:1296:** +2, including nesting penalty 
of 1, nesting level increased to 2
   ```cpp
           if (merge_type == TMergeType::MERGE && 
req->header(HTTP_DELETE_CONDITION).empty()) {
           ^
   ```
   **be/src/http/action/stream_load.cpp:1298:** +1, nesting level increased to 2
   ```cpp
           } else if (merge_type != TMergeType::MERGE && 
!req->header(HTTP_DELETE_CONDITION).empty()) {
                  ^
   ```
   **be/src/http/action/stream_load.cpp:1304:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       if (!req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
       ^
   ```
   **be/src/http/action/stream_load.cpp:1311:** +2, including nesting penalty 
of 1, nesting level increased to 2
   ```cpp
           if (iter != unique_key_update_mode_map.end()) {
           ^
   ```
   **be/src/http/action/stream_load.cpp:1313:** +3, including nesting penalty 
of 2, nesting level increased to 3
   ```cpp
               if (unique_key_update_mode == 
TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) {
               ^
   ```
   **be/src/http/action/stream_load.cpp:1315:** +4, including nesting penalty 
of 3, nesting level increased to 4
   ```cpp
                   if (ctx->format != TFileFormatType::FORMAT_JSON) {
                   ^
   ```
   **be/src/http/action/stream_load.cpp:1320:** +4, including nesting penalty 
of 3, nesting level increased to 4
   ```cpp
                   if (!req->header(HTTP_FUZZY_PARSE).empty() &&
                   ^
   ```
   **be/src/http/action/stream_load.cpp:1320:** +1
   ```cpp
                   if (!req->header(HTTP_FUZZY_PARSE).empty() &&
                                                              ^
   ```
   **be/src/http/action/stream_load.cpp:1325:** +4, including nesting penalty 
of 3, nesting level increased to 4
   ```cpp
                   if (!req->header(HTTP_COLUMNS).empty()) {
                   ^
   ```
   **be/src/http/action/stream_load.cpp:1329:** +4, including nesting penalty 
of 3, nesting level increased to 4
   ```cpp
                   if (!req->header(HTTP_JSONPATHS).empty()) {
                   ^
   ```
   **be/src/http/action/stream_load.cpp:1333:** +4, including nesting penalty 
of 3, nesting level increased to 4
   ```cpp
                   if (!req->header(HTTP_HIDDEN_COLUMNS).empty()) {
                   ^
   ```
   **be/src/http/action/stream_load.cpp:1338:** +4, including nesting penalty 
of 3, nesting level increased to 4
   ```cpp
                   if (!req->header(HTTP_FUNCTION_COLUMN + "." + 
HTTP_SEQUENCE_COL).empty()) {
                   ^
   ```
   **be/src/http/action/stream_load.cpp:1343:** +4, including nesting penalty 
of 3, nesting level increased to 4
   ```cpp
                   if (!req->header(HTTP_MERGE_TYPE).empty()) {
                   ^
   ```
   **be/src/http/action/stream_load.cpp:1348:** +4, including nesting penalty 
of 3, nesting level increased to 4
   ```cpp
                   if (!req->header(HTTP_WHERE).empty()) {
                   ^
   ```
   **be/src/http/action/stream_load.cpp:1354:** +1, nesting level increased to 2
   ```cpp
           } else {
             ^
   ```
   **be/src/http/action/stream_load.cpp:1362:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       if (!req->header(HTTP_FORMAT_KEY).empty()) {
       ^
   ```
   **be/src/http/action/stream_load.cpp:1365:** +1, nesting level increased to 1
   ```cpp
       } else {
         ^
   ```
   **be/src/http/action/stream_load.cpp:1374:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_COLUMNS)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1375:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_COLUMN_SEPARATOR)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1376:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_LINE_DELIMITER)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1377:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_ENCLOSE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1378:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_ESCAPE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1379:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_PARTITIONS)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1380:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_TEMP_PARTITIONS)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1381:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_NEGATIVE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1382:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_STRICT_MODE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1383:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_TIME_ZONE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1384:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_EXEC_MEM_LIMIT)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1385:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_JSONPATHS)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1386:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_JSONROOT)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1387:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_STRIP_OUTER_ARRAY)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1388:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_NUM_AS_STRING)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1389:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_FUZZY_PARSE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1390:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_READ_JSON_BY_LINE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1391:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1392:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_SEND_BATCH_PARALLELISM)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1393:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_LOAD_TO_SINGLE_TABLET)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1394:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_MERGE_TYPE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1395:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_DELETE_CONDITION)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1396:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_MAX_FILTER_RATIO)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1397:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_HIDDEN_COLUMNS)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1398:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_TRIM_DOUBLE_QUOTES)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1399:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_SKIP_LINES)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1400:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_ENABLE_PROFILE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1401:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_UNIQUE_KEY_UPDATE_MODE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1402:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_MEMTABLE_ON_SINKNODE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1403:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_LOAD_STREAM_PER_NODE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1404:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_GROUP_COMMIT)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1405:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_CLOUD_CLUSTER)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1406:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       __ADD_IF_EXIST(HTTP_UNIQUE_KEY_UPDATE_MODE)
       ^
   ```
   **be/src/http/action/stream_load.cpp:1370:** expanded from macro 
'__ADD_IF_EXIST'
   ```cpp
       if (!req->header(PROPERTIY_KEY).empty()) {                               
                      \
       ^
   ```
   **be/src/http/action/stream_load.cpp:1411:** +1, including nesting penalty 
of 0, nesting level increased to 1
   ```cpp
       if (!req->header(HTTP_WHERE).empty()) {
       ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to