This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 03901b9a7a1 [enhancement](group_commit): refector relay wal code 
(#29183)
03901b9a7a1 is described below

commit 03901b9a7a1970162fa71fd3892905496ef892c3
Author: huanghaibin <284824...@qq.com>
AuthorDate: Sat Dec 30 12:59:46 2023 +0800

    [enhancement](group_commit): refector relay wal code (#29183)
---
 be/src/http/action/http_stream.cpp                 |  16 +-
 be/src/http/action/http_stream.h                   |   2 +-
 be/src/olap/wal_dirs_info.cpp                      |   1 +
 be/src/olap/wal_info.cpp                           |  41 ++
 be/src/olap/wal_info.h                             |  38 ++
 be/src/olap/wal_manager.cpp                        |  41 +-
 be/src/olap/wal_manager.h                          |  17 +-
 be/src/olap/wal_table.cpp                          | 428 +++++++++------------
 be/src/olap/wal_table.h                            |  32 +-
 be/src/runtime/group_commit_mgr.cpp                |  26 +-
 be/src/runtime/stream_load/stream_load_context.h   |   1 +
 be/src/vec/exec/format/wal/wal_reader.cpp          |   2 -
 be/src/vec/sink/writer/vwal_writer.cpp             |   2 +-
 be/test/olap/wal_manager_test.cpp                  |   6 +-
 .../apache/doris/analysis/NativeInsertStmt.java    |   2 +-
 gensrc/thrift/FrontendService.thrift               |  14 +-
 16 files changed, 346 insertions(+), 323 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index a18b10f491f..15d2a1a18d1 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -256,7 +256,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
             } else {
                 LOG(INFO) << "use a portion of data to request fe to obtain 
column information";
                 ctx->is_read_schema = false;
-                ctx->status = _process_put(req, ctx);
+                ctx->status = process_put(req, ctx);
             }
         }
 
@@ -272,7 +272,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
         LOG(INFO) << "after all the data has been read and it has not reached 
1M, it will execute "
                   << "here";
         ctx->is_read_schema = false;
-        ctx->status = _process_put(req, ctx);
+        ctx->status = process_put(req, ctx);
     }
     ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
 }
@@ -290,11 +290,15 @@ void 
HttpStreamAction::free_handler_ctx(std::shared_ptr<void> param) {
     ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
 }
 
-Status HttpStreamAction::_process_put(HttpRequest* http_req,
-                                      std::shared_ptr<StreamLoadContext> ctx) {
+Status HttpStreamAction::process_put(HttpRequest* http_req,
+                                     std::shared_ptr<StreamLoadContext> ctx) {
     TStreamLoadPutRequest request;
     set_request_auth(&request, ctx->auth);
-    request.__set_load_sql(http_req->header(HTTP_SQL));
+    if (http_req != nullptr) {
+        request.__set_load_sql(http_req->header(HTTP_SQL));
+    } else {
+        request.__set_load_sql(ctx->sql_str);
+    }
     request.__set_loadId(ctx->id.to_thrift());
     request.__set_label(ctx->label);
     if (ctx->group_commit) {
@@ -330,7 +334,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
     ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
     ctx->label = ctx->put_result.params.import_label;
     ctx->put_result.params.__set_wal_id(ctx->wal_id);
-    if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
+    if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == 
"async_mode") {
         if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
             size_t content_length = 
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
             if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
diff --git a/be/src/http/action/http_stream.h b/be/src/http/action/http_stream.h
index d4140a118d6..90a8b48fb2b 100644
--- a/be/src/http/action/http_stream.h
+++ b/be/src/http/action/http_stream.h
@@ -43,11 +43,11 @@ public:
 
     void on_chunk_data(HttpRequest* req) override;
     void free_handler_ctx(std::shared_ptr<void> ctx) override;
+    Status process_put(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
 
 private:
     Status _on_header(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
     Status _handle(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx);
-    Status _process_put(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
     void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, 
const std::string& str);
     Status _handle_group_commit(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
 
diff --git a/be/src/olap/wal_dirs_info.cpp b/be/src/olap/wal_dirs_info.cpp
index 340d896a8c6..1b7216b0724 100644
--- a/be/src/olap/wal_dirs_info.cpp
+++ b/be/src/olap/wal_dirs_info.cpp
@@ -169,6 +169,7 @@ size_t WalDirsInfo::get_max_available_size() {
 
 Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) {
     for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+        LOG(INFO) << "wal_dir_info:" << wal_dir_info->get_wal_dir();
         if (wal_dir_info->get_wal_dir() == wal_dir) {
             return wal_dir_info->update_wal_dir_limit(limit);
         }
diff --git a/be/src/olap/wal_info.cpp b/be/src/olap/wal_info.cpp
new file mode 100644
index 00000000000..d93593cfaf0
--- /dev/null
+++ b/be/src/olap/wal_info.cpp
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/wal_info.h"
+namespace doris {
+WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, 
int64_t start_time_ms)
+        : _wal_id(wal_id),
+          _wal_path(wal_path),
+          _retry_num(retry_num),
+          _start_time_ms(start_time_ms) {}
+WalInfo::~WalInfo() {}
+int64_t WalInfo::get_wal_id() {
+    return _wal_id;
+}
+std::string WalInfo::get_wal_path() {
+    return _wal_path;
+}
+int64_t WalInfo::get_retry_num() {
+    return _retry_num;
+}
+int64_t WalInfo::get_start_time_ms() {
+    return _start_time_ms;
+}
+void WalInfo::add_retry_num() {
+    _retry_num++;
+}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_info.h b/be/src/olap/wal_info.h
new file mode 100644
index 00000000000..0383ac68f2c
--- /dev/null
+++ b/be/src/olap/wal_info.h
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+#include "runtime/exec_env.h"
+
+namespace doris {
+class WalInfo {
+public:
+    WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t 
start_time_ms);
+    ~WalInfo();
+    int64_t get_wal_id();
+    int64_t get_retry_num();
+    int64_t get_start_time_ms();
+    std::string get_wal_path();
+    void add_retry_num();
+
+private:
+    int64_t _wal_id;
+    std::string _wal_path;
+    int64_t _retry_num;
+    int64_t _start_time_ms;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp
index 0fe59cdf9ca..a9c35794698 100644
--- a/be/src/olap/wal_manager.cpp
+++ b/be/src/olap/wal_manager.cpp
@@ -82,6 +82,9 @@ Status WalManager::init() {
     RETURN_IF_ERROR(_init_wal_dirs_conf());
     RETURN_IF_ERROR(_init_wal_dirs());
     RETURN_IF_ERROR(_init_wal_dirs_info());
+    for (auto wal_dir : _wal_dirs) {
+        RETURN_IF_ERROR(scan_wals(wal_dir));
+    }
     return Thread::create(
             "WalMgr", "replay_wal", [this]() { 
static_cast<void>(this->replay()); },
             &_replay_thread);
@@ -112,7 +115,7 @@ Status WalManager::_init_wal_dirs_conf() {
 Status WalManager::_init_wal_dirs() {
     bool exists = false;
     for (auto wal_dir : _wal_dirs) {
-        std::string tmp_dir = wal_dir + "/tmp";
+        std::string tmp_dir = wal_dir + "/" + tmp;
         LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir;
         RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, 
&exists));
         if (!exists) {
@@ -122,7 +125,6 @@ Status WalManager::_init_wal_dirs() {
         if (!exists) {
             
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir));
         }
-        RETURN_IF_ERROR(scan_wals(wal_dir));
     }
     return Status::OK();
 }
@@ -164,15 +166,15 @@ Status WalManager::_init_wal_dirs_info() {
             &_update_wal_dirs_info_thread);
 }
 
-void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, 
WAL_STATUS wal_status) {
+void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, 
WalStatus wal_status) {
     std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
     LOG(INFO) << "add wal queue "
               << ",table_id:" << table_id << ",wal_id:" << wal_id << 
",status:" << wal_status;
     auto it = _wal_status_queues.find(table_id);
     if (it == _wal_status_queues.end()) {
-        std::unordered_map<int64_t, WAL_STATUS> tmp;
-        tmp.emplace(wal_id, wal_status);
-        _wal_status_queues.emplace(table_id, tmp);
+        std::unordered_map<int64_t, WalStatus> tmp_map;
+        tmp_map.emplace(wal_id, wal_status);
+        _wal_status_queues.emplace(table_id, tmp_map);
     } else {
         it->second.emplace(wal_id, wal_status);
     }
@@ -305,12 +307,12 @@ Status WalManager::scan_wals(const std::string& wal_path) 
{
         LOG(WARNING) << "Failed list files for dir=" << wal_path << ", st=" << 
st.to_string();
         return st;
     }
-    for (const auto& db_id : dbs) {
-        if (db_id.is_file) {
+    for (const auto& database_id : dbs) {
+        if (database_id.is_file || database_id.file_name == tmp) {
             continue;
         }
         std::vector<io::FileInfo> tables;
-        auto db_path = wal_path + "/" + db_id.file_name;
+        auto db_path = wal_path + "/" + database_id.file_name;
         st = io::global_local_filesystem()->list(db_path, false, &tables, 
&exists);
         if (!st.ok()) {
             LOG(WARNING) << "Failed list files for dir=" << db_path << ", st=" 
<< st.to_string();
@@ -342,20 +344,16 @@ Status WalManager::scan_wals(const std::string& wal_path) 
{
                         int64_t wal_id =
                                 std::strtoll(wal.file_name.substr(0, 
pos).c_str(), NULL, 10);
                         _wal_path_map.emplace(wal_id, wal_file);
+                        int64_t db_id = 
std::strtoll(database_id.file_name.c_str(), NULL, 10);
                         int64_t tb_id = 
std::strtoll(table_id.file_name.c_str(), NULL, 10);
-                        add_wal_status_queue(tb_id, wal_id, 
WalManager::WAL_STATUS::REPLAY);
+                        add_wal_status_queue(tb_id, wal_id, 
WalManager::WalStatus::REPLAY);
+                        RETURN_IF_ERROR(add_recover_wal(db_id, tb_id, wal_id, 
wal_file));
                     } catch (const std::invalid_argument& e) {
                         return Status::InvalidArgument("Invalid format, {}", 
e.what());
                     }
                 }
             }
-            st = add_recover_wal(std::stoll(db_id.file_name), 
std::stoll(table_id.file_name), res);
             count += res.size();
-            if (!st.ok()) {
-                LOG(WARNING) << "Failed add replay wal, db=" << db_id.file_name
-                             << ", table=" << table_id.file_name << ", st=" << 
st.to_string();
-                return st;
-            }
         }
     }
     LOG(INFO) << "Finish list all wals, size:" << count;
@@ -396,7 +394,8 @@ Status WalManager::replay() {
     return Status::OK();
 }
 
-Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, 
std::vector<std::string> wals) {
+Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t 
wal_id,
+                                   std::string wal) {
     std::lock_guard<std::shared_mutex> wrlock(_lock);
     std::shared_ptr<WalTable> table_ptr;
     auto it = _table_map.find(table_id);
@@ -406,12 +405,10 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t 
table_id, std::vector<
     } else {
         table_ptr = it->second;
     }
-    table_ptr->add_wals(wals);
+    table_ptr->add_wal(wal_id, wal);
 #ifndef BE_TEST
-    for (auto wal : wals) {
-        RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
-        RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
-    }
+    RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
+    RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
 #endif
     return Status::OK();
 }
diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h
index 8d330a64bb9..4b42beaf455 100644
--- a/be/src/olap/wal_manager.h
+++ b/be/src/olap/wal_manager.h
@@ -47,7 +47,7 @@ class WalManager {
     ENABLE_FACTORY_CREATOR(WalManager);
 
 public:
-    enum WAL_STATUS {
+    enum WalStatus {
         PREPARE = 0,
         REPLAY,
         CREATE,
@@ -56,6 +56,7 @@ public:
 public:
     WalManager(ExecEnv* exec_env, const std::string& wal_dir);
     ~WalManager();
+    // used for wal
     Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0);
     Status init();
     Status scan_wals(const std::string& wal_path);
@@ -64,13 +65,13 @@ public:
     Status create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& 
wal_writer);
     Status scan();
     size_t get_wal_table_size(int64_t table_id);
-    Status add_recover_wal(int64_t db_id, int64_t table_id, 
std::vector<std::string> wals);
+    Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, 
std::string wal);
     Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const 
std::string& label,
                         std::string& base_path);
     Status get_wal_path(int64_t wal_id, std::string& wal_path);
     Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
                                      PGetWalQueueSizeResponse* response);
-    void add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS 
wal_status);
+    void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus 
wal_status);
     Status erase_wal_status_queue(int64_t table_id, int64_t wal_id);
     void print_wal_status_queue();
     void stop();
@@ -80,6 +81,7 @@ public:
     void erase_wal_column_index(int64_t wal_id);
     Status get_wal_column_index(int64_t wal_id, std::vector<size_t>& 
column_index);
 
+    // used for limit
     Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1);
     Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1);
     Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t 
pre_allocated,
@@ -88,6 +90,7 @@ public:
     size_t get_max_available_size();
 
 private:
+    // used for limit
     Status _init_wal_dirs_conf();
     Status _init_wal_dirs();
     Status _init_wal_dirs_info();
@@ -99,11 +102,13 @@ public:
     // used for be ut
     size_t wal_limit_test_bytes;
 
+    const std::string tmp = "tmp";
+
 private:
+    //used for wal
     ExecEnv* _exec_env = nullptr;
     std::shared_mutex _lock;
     scoped_refptr<Thread> _replay_thread;
-    scoped_refptr<Thread> _update_wal_dirs_info_thread;
     CountDownLatch _stop_background_threads_latch;
     std::map<int64_t, std::shared_ptr<WalTable>> _table_map;
     std::vector<std::string> _wal_dirs;
@@ -111,11 +116,13 @@ private:
     std::shared_mutex _wal_status_lock;
     std::unordered_map<int64_t, std::string> _wal_path_map;
     std::unordered_map<int64_t, std::shared_ptr<WalWriter>> 
_wal_id_to_writer_map;
-    std::unordered_map<int64_t, std::unordered_map<int64_t, WAL_STATUS>> 
_wal_status_queues;
+    std::unordered_map<int64_t, std::unordered_map<int64_t, WalStatus>> 
_wal_status_queues;
     std::atomic<bool> _stop;
     std::shared_mutex _wal_column_id_map_lock;
     std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
     std::unique_ptr<doris::ThreadPool> _thread_pool;
+    // used for limit
+    scoped_refptr<Thread> _update_wal_dirs_info_thread;
     std::unique_ptr<WalDirsInfo> _wal_dirs_info;
 };
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp
index 7f98c410b07..a1c9a12f3e1 100644
--- a/be/src/olap/wal_table.cpp
+++ b/be/src/olap/wal_table.cpp
@@ -17,19 +17,16 @@
 
 #include "olap/wal_table.h"
 
-#include <event2/bufferevent.h>
-#include <event2/event.h>
-#include <event2/event_struct.h>
-#include <event2/http.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
-#include "evhttp.h"
+#include "http/action/http_stream.h"
 #include "http/action/stream_load.h"
 #include "http/ev_http_server.h"
 #include "http/http_common.h"
 #include "http/http_headers.h"
 #include "http/utils.h"
 #include "io/fs/local_file_system.h"
+#include "io/fs/stream_load_pipe.h"
 #include "olap/wal_manager.h"
 #include "runtime/client_cache.h"
 #include "runtime/fragment_mgr.h"
@@ -41,125 +38,149 @@
 namespace doris {
 
 WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
-        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), 
_stop(false) {}
+        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), 
_stop(false) {
+    _http_stream_action = std::make_shared<HttpStreamAction>(exec_env);
+}
 WalTable::~WalTable() {}
 
 #ifdef BE_TEST
-std::string k_request_line;
+Status k_stream_load_exec_status;
 #endif
 
-bool retry = false;
-
-void WalTable::add_wals(std::vector<std::string> wals) {
+void WalTable::add_wal(int64_t wal_id, std::string wal) {
     std::lock_guard<std::mutex> lock(_replay_wal_lock);
-    for (const auto& wal : wals) {
-        LOG(INFO) << "add replay wal " << wal;
-        _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false});
-    }
+    LOG(INFO) << "add replay wal " << wal;
+    auto wal_info = std::make_shared<WalInfo>(wal_id, wal, 0, UnixMillis());
+    _replay_wal_map.emplace(wal, wal_info);
 }
-Status WalTable::replay_wals() {
+void WalTable::pick_relay_wals() {
+    std::lock_guard<std::mutex> lock(_replay_wal_lock);
     std::vector<std::string> need_replay_wals;
     std::vector<std::string> need_erase_wals;
-    {
-        std::lock_guard<std::mutex> lock(_replay_wal_lock);
-        if (_replay_wal_map.empty()) {
-            return Status::OK();
-        }
-        VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << 
_table_id
-                   << ", wal size=" << _replay_wal_map.size();
-        for (auto& [wal, info] : _replay_wal_map) {
-            auto& [retry_num, start_ts, replaying] = info;
-            if (replaying) {
-                LOG(INFO) << wal << " is replaying, skip this round";
-                return Status::OK();
-            }
-            if (retry_num >= config::group_commit_replay_wal_retry_num) {
-                LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", 
table=" << _table_id
-                             << ", wal=" << wal
-                             << ", retry_num=" << 
config::group_commit_replay_wal_retry_num;
-                std::string rename_path = _get_tmp_path(wal);
-                LOG(INFO) << "rename wal from " << wal << " to " << 
rename_path;
-                std::rename(wal.c_str(), rename_path.c_str());
-                need_erase_wals.push_back(wal);
-                continue;
-            }
-            if (_need_replay(info)) {
-                need_replay_wals.push_back(wal);
+    for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++) 
{
+        auto wal_info = it->second;
+        if (wal_info->get_retry_num() >= 
config::group_commit_replay_wal_retry_num) {
+            LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", 
table=" << _table_id
+                         << ", wal=" << it->first << ", retry_num=" << 
wal_info->get_retry_num();
+            auto st = _rename_to_tmp_path(it->first);
+            if (!st.ok()) {
+                LOG(WARNING) << "rename " << it->first << " fail"
+                             << ",st:" << st.to_string();
             }
+            need_erase_wals.push_back(it->first);
+            continue;
         }
-        std::sort(need_replay_wals.begin(), need_replay_wals.end());
-        for (const auto& wal : need_erase_wals) {
-            if (_replay_wal_map.erase(wal)) {
-                LOG(INFO) << "erase wal " << wal << " from _replay_wal_map";
-            } else {
-                LOG(WARNING) << "fail to erase wal " << wal << " from 
_replay_wal_map";
-            }
+        if (_need_replay(wal_info)) {
+            need_replay_wals.push_back(it->first);
         }
     }
+    for (const auto& wal : need_erase_wals) {
+        _replay_wal_map.erase(wal);
+    }
+    std::sort(need_replay_wals.begin(), need_replay_wals.end());
     for (const auto& wal : need_replay_wals) {
+        _replaying_queue.emplace_back(_replay_wal_map[wal]);
+        _replay_wal_map.erase(wal);
+    }
+}
+
+Status WalTable::relay_wal_one_by_one() {
+    std::vector<std::shared_ptr<WalInfo>> need_retry_wals;
+    std::vector<std::shared_ptr<WalInfo>> need_delete_wals;
+    while (!_replaying_queue.empty()) {
+        std::shared_ptr<WalInfo> wal_info = nullptr;
         {
             std::lock_guard<std::mutex> lock(_replay_wal_lock);
-            if (_stop.load()) {
-                break;
+            wal_info = _replaying_queue.front();
+            _replaying_queue.pop_front();
+        }
+        wal_info->add_retry_num();
+        auto st = _replay_wal_internal(wal_info->get_wal_path());
+        if (!st.ok()) {
+            LOG(WARNING) << "failed replay wal, db=" << _db_id << ", table=" 
<< _table_id
+                         << ", wal=" << wal_info->get_wal_path() << ", st=" << 
st.to_string();
+            if (!st.is<ErrorCode::NOT_FOUND>()) {
+                need_retry_wals.push_back(wal_info);
             } else {
-                auto it = _replay_wal_map.find(wal);
-                if (it != _replay_wal_map.end()) {
-                    auto& [retry_num, start_time, replaying] = it->second;
-                    replaying = true;
-                }
+                need_delete_wals.push_back(wal_info);
             }
+        } else {
+            need_delete_wals.push_back(wal_info);
+        }
+        VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id
+                    << ", wal=" << wal_info->get_wal_path() << ", st=" << 
st.to_string();
+    }
+    {
+        std::lock_guard<std::mutex> lock(_replay_wal_lock);
+        for (auto retry_wal_info : need_retry_wals) {
+            _replay_wal_map.emplace(retry_wal_info->get_wal_path(), 
retry_wal_info);
         }
-        auto st = _replay_wal_internal(wal);
+    }
+    for (auto delete_wal_info : need_delete_wals) {
+        auto st = _delete_wal(delete_wal_info->get_wal_id());
         if (!st.ok()) {
-            std::lock_guard<std::mutex> lock(_replay_wal_lock);
-            auto it = _replay_wal_map.find(wal);
-            if (it != _replay_wal_map.end()) {
-                auto& [retry_num, start_time, replaying] = it->second;
-                replaying = false;
-            }
-            LOG(WARNING) << "failed replay wal, drop this round, db=" << _db_id
-                         << ", table=" << _table_id << ", wal=" << wal << ", 
st=" << st.to_string();
-            break;
+            LOG(WARNING) << "fail to delete wal " << 
delete_wal_info->get_wal_path();
         }
-        VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id 
<< ", label=" << wal
-                    << ", st=" << st.to_string();
     }
     return Status::OK();
 }
-
-std::string WalTable::_get_tmp_path(const std::string wal) {
-    std::vector<std::string> path_element;
-    doris::vectorized::WalReader::string_split(wal, "/", path_element);
-    std::stringstream ss;
-    int index = 0;
-    while (index < path_element.size() - 3) {
-        ss << path_element[index] << "/";
-        index++;
+Status WalTable::replay_wals() {
+    {
+        std::lock_guard<std::mutex> lock(_replay_wal_lock);
+        if (_replay_wal_map.empty()) {
+            LOG(INFO) << "_replay_wal_map is empty,skip relaying";
+            return Status::OK();
+        }
+        if (!_replaying_queue.empty()) {
+            LOG(INFO) << "_replaying_queue is not empty,skip relaying";
+            return Status::OK();
+        }
     }
-    ss << "tmp/";
-    while (index < path_element.size()) {
-        if (index != path_element.size() - 1) {
-            ss << path_element[index] << "_";
-        } else {
-            ss << path_element[index];
+    VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << 
_table_id
+               << ", wal size=" << _replay_wal_map.size();
+    pick_relay_wals();
+    RETURN_IF_ERROR(relay_wal_one_by_one());
+    return Status::OK();
+}
+
+Status WalTable::_rename_to_tmp_path(const std::string wal) {
+    io::Path wal_path = wal;
+    std::list<std::string> path_element;
+    for (int i = 0; i < 3; ++i) {
+        if (!wal_path.has_parent_path()) {
+            return Status::InternalError("parent path is not enough when 
rename " + wal);
         }
-        index++;
+        path_element.push_front(wal_path.filename().string());
+        wal_path = wal_path.parent_path();
     }
-    return ss.str();
+    wal_path.append(_exec_env->wal_mgr()->tmp);
+    for (auto path : path_element) {
+        wal_path.append(path);
+    }
+    bool exists = false;
+    
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), 
&exists));
+    if (!exists) {
+        
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
+    }
+    auto res = std::rename(wal.c_str(), wal_path.string().c_str());
+    if (res < 0) {
+        return Status::InternalError("rename fail on path " + wal);
+    }
+    LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
+    return Status::OK();
 }
 
-bool WalTable::_need_replay(const doris::WalTable::replay_wal_info& info) {
+bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {
 #ifndef BE_TEST
-    auto& [retry_num, start_ts, replaying] = info;
-    auto replay_interval =
-            pow(2, retry_num) * 
config::group_commit_replay_wal_retry_interval_seconds * 1000;
-    return UnixMillis() - start_ts >= replay_interval;
+    auto replay_interval = pow(2, wal_info->get_retry_num()) *
+                           
config::group_commit_replay_wal_retry_interval_seconds * 1000;
+    return UnixMillis() - wal_info->get_start_time_ms() >= replay_interval;
 #else
     return true;
 #endif
 }
 
-Status WalTable::_abort_txn(int64_t db_id, int64_t wal_id) {
+Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) {
     TLoadTxnRollbackRequest request;
     request.__set_auth_code(0); // this is a fake, fe not check it now
     request.__set_db_id(db_id);
@@ -181,47 +202,23 @@ Status WalTable::_abort_txn(int64_t db_id, int64_t 
wal_id) {
 
 Status WalTable::_replay_wal_internal(const std::string& wal) {
     LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" << 
_table_id << ", wal=" << wal;
-    // start a new stream load
-    {
-        std::lock_guard<std::mutex> lock(_replay_wal_lock);
-        auto it = _replay_wal_map.find(wal);
-        if (it != _replay_wal_map.end()) {
-            auto& [retry_num, start_time, replaying] = it->second;
-            ++retry_num;
-            replaying = true;
-        } else {
-            LOG(WARNING) << "can not find wal in stream load replay map. db=" 
<< _db_id
-                         << ", table=" << _table_id << ", wal=" << wal;
-            return Status::OK();
-        }
-    }
     std::shared_ptr<std::pair<int64_t, std::string>> pair = nullptr;
-    RETURN_IF_ERROR(_get_wal_info(wal, pair));
+    RETURN_IF_ERROR(_parse_wal_path(wal, pair));
     auto wal_id = pair->first;
     auto label = pair->second;
 #ifndef BE_TEST
-    auto st = _abort_txn(_db_id, wal_id);
+    auto st = _try_abort_txn(_db_id, wal_id);
     if (!st.ok()) {
         LOG(WARNING) << "abort txn " << wal_id << " fail";
     }
-    auto get_st = _get_column_info(_db_id, _table_id);
-    if (!get_st.ok()) {
-        if (get_st.is<ErrorCode::NOT_FOUND>()) {
-            {
-                std::lock_guard<std::mutex> lock(_replay_wal_lock);
-                _replay_wal_map.erase(wal);
-            }
-            RETURN_IF_ERROR(_delete_wal(wal_id));
-        }
-        return get_st;
-    }
+    RETURN_IF_ERROR(_get_column_info(_db_id, _table_id));
 #endif
-    RETURN_IF_ERROR(_send_request(wal_id, wal, label));
+    RETURN_IF_ERROR(_replay_one_txn_with_stremaload(wal_id, wal, label));
     return Status::OK();
 }
 
-Status WalTable::_get_wal_info(const std::string& wal,
-                               std::shared_ptr<std::pair<int64_t, 
std::string>>& pair) {
+Status WalTable::_parse_wal_path(const std::string& wal,
+                                 std::shared_ptr<std::pair<int64_t, 
std::string>>& pair) {
     std::vector<std::string> path_element;
     doris::vectorized::WalReader::string_split(wal, "/", path_element);
     auto pos = path_element[path_element.size() - 1].find("_");
@@ -236,171 +233,104 @@ Status WalTable::_get_wal_info(const std::string& wal,
     return Status::OK();
 }
 
-void http_request_done(struct evhttp_request* req, void* arg) {
-    std::stringstream out;
-    std::string status;
-    std::string msg;
-    std::string wal_id;
-    size_t len = 0;
-    if (req != nullptr) {
-        auto input = evhttp_request_get_input_buffer(req);
-        char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
-        while (request_line != nullptr) {
-            std::string s(request_line);
-            out << request_line;
-            request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
-        }
-        auto out_str = out.str();
-        LOG(INFO) << "replay wal out_str:" << out_str;
-        rapidjson::Document doc;
-        if (!out_str.empty()) {
-            doc.Parse(out.str().c_str());
-            status = std::string(doc["Status"].GetString());
-            msg = std::string(doc["Message"].GetString());
-            LOG(INFO) << "replay wal status:" << status << ",msg:" << msg;
-            if (status.find("Fail") != status.npos) {
-                if (msg.find("Label") != msg.npos &&
-                    msg.find("has already been used") != msg.npos) {
-                    retry = false;
-                } else {
-                    retry = true;
-                }
-            } else {
-                retry = false;
-            }
-        } else {
-            retry = true;
-        }
-    } else {
-        LOG(WARNING) << "req is null";
-    }
-
-    if (arg != nullptr) {
-        event_base_loopbreak((struct event_base*)arg);
-    } else {
-        LOG(WARNING) << "arg is null";
-    }
-}
-
-Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const 
std::string& label) {
-#ifndef BE_TEST
-    struct event_base* base = nullptr;
-    struct evhttp_connection* conn = nullptr;
-    struct evhttp_request* req = nullptr;
-    retry = false;
-    event_init();
-    base = event_base_new();
-    conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port);
-    evhttp_connection_set_base(conn, base);
-    req = evhttp_request_new(http_request_done, base);
-    evhttp_add_header(req->output_headers, HTTP_LABEL_KEY.c_str(), 
label.c_str());
-    evhttp_add_header(req->output_headers, HTTP_AUTH_CODE.c_str(), 
std::to_string(wal_id).c_str());
-    evhttp_add_header(req->output_headers, HTTP_WAL_ID_KY.c_str(), 
std::to_string(wal_id).c_str());
+Status WalTable::_construct_sql_str(const std::string& wal, const std::string& 
label,
+                                    std::string& sql_str, std::vector<size_t>& 
index_vector) {
     std::string columns;
     RETURN_IF_ERROR(_read_wal_header(wal, columns));
     std::vector<std::string> column_id_element;
     doris::vectorized::WalReader::string_split(columns, ",", 
column_id_element);
-    std::vector<size_t> index_vector;
     std::stringstream ss_name;
     std::stringstream ss_id;
     int index_raw = 0;
     for (auto column_id_str : column_id_element) {
         try {
             int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
-            auto it = _column_id_name_map.find(column_id);
-            auto it2 = _column_id_index_map.find(column_id);
-            if (it != _column_id_name_map.end() && it2 != 
_column_id_index_map.end()) {
-                ss_name << "`" << it->second << "`,";
-                ss_id << "c" << 
std::to_string(_column_id_index_map[column_id]) << ",";
+            auto it = _column_id_info_map.find(column_id);
+            if (it != _column_id_info_map.end()) {
+                ss_name << "`" << it->second->first << "`,";
+                ss_id << "c" << std::to_string(it->second->second) << ",";
                 index_vector.emplace_back(index_raw);
-                _column_id_name_map.erase(column_id);
-                _column_id_index_map.erase(column_id);
             }
             index_raw++;
         } catch (const std::invalid_argument& e) {
             return Status::InvalidArgument("Invalid format, {}", e.what());
         }
     }
-    _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector);
     auto name = ss_name.str().substr(0, ss_name.str().size() - 1);
     auto id = ss_id.str().substr(0, ss_id.str().size() - 1);
     std::stringstream ss;
     ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL 
" << label << " ("
        << name << ") select " << id << " from http_stream(\"format\" = 
\"wal\", \"table_id\" = \""
        << std::to_string(_table_id) << "\")";
-    evhttp_add_header(req->output_headers, HTTP_SQL.c_str(), ss.str().c_str());
-    evbuffer* output = evhttp_request_get_output_buffer(req);
-    evbuffer_add_printf(output, "replay wal %s", 
std::to_string(wal_id).c_str());
-
-    evhttp_make_request(conn, req, EVHTTP_REQ_PUT, "/api/_http_stream");
-    evhttp_connection_set_timeout(req->evcon, 300);
-
-    event_base_dispatch(base);
-    evhttp_connection_free(conn);
-    event_base_free(base);
+    sql_str = ss.str().data();
+    return Status::OK();
+}
+Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
+                                     const std::string& label) {
+    std::string sql_str;
+    std::vector<size_t> index_vector;
+    RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str, index_vector));
+    _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector);
+    std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
+    ctx->sql_str = sql_str;
+    ctx->wal_id = wal_id;
+    ctx->auth.auth_code = wal_id;
+    ctx->label = label;
+    auto st = _http_stream_action->process_put(nullptr, ctx);
+    if (st.ok()) {
+        // wait stream load finish
+        RETURN_IF_ERROR(ctx->future.get());
+        if (ctx->status.ok()) {
+            auto commit_st = 
_exec_env->stream_load_executor()->commit_txn(ctx.get());
+            st = commit_st;
+        } else if (!ctx->status.ok()) {
+            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
+                         << ", errmsg=" << ctx->status;
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+            st = ctx->status;
+        }
+    }
+    _exec_env->wal_mgr()->erase_wal_column_index(wal_id);
+    LOG(INFO) << "relay wal id=" << wal_id << ",st=" << st.to_string();
+    return st;
+}
 
+Status WalTable::_replay_one_txn_with_stremaload(int64_t wal_id, const 
std::string& wal,
+                                                 const std::string& label) {
+    bool success = false;
+#ifndef BE_TEST
+    auto st = _handle_stream_load(wal_id, wal, label);
+    auto msg = st.msg();
+    success = st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() ||
+              msg.find("LabelAlreadyUsedException") != msg.npos;
 #else
-    std::stringstream out;
-    out << k_request_line;
-    auto out_str = out.str();
-    rapidjson::Document doc;
-    doc.Parse(out_str.c_str());
-    auto status = std::string(doc["Status"].GetString());
-    if (status.find("Fail") != status.npos) {
-        retry = true;
-    } else {
-        retry = false;
-    }
+    success = k_stream_load_exec_status.ok();
 #endif
-    if (retry) {
-        LOG(INFO) << "fail to replay wal =" << wal;
-        std::lock_guard<std::mutex> lock(_replay_wal_lock);
-        auto it = _replay_wal_map.find(wal);
-        if (it != _replay_wal_map.end()) {
-            auto& [retry_num, start_time, replaying] = it->second;
-            replaying = false;
-        } else {
-            _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), 
false});
-        }
-    } else {
+    if (success) {
         LOG(INFO) << "success to replay wal =" << wal;
-        RETURN_IF_ERROR(_delete_wal(wal_id));
-        std::lock_guard<std::mutex> lock(_replay_wal_lock);
-        if (_replay_wal_map.erase(wal)) {
-            LOG(INFO) << "erase " << wal << " from _replay_wal_map";
-        } else {
-            LOG(WARNING) << "fail to erase " << wal << " from _replay_wal_map";
-        }
+    } else {
+        LOG(INFO) << "fail to replay wal =" << wal;
+        return Status::InternalError("fail to replay wal =" + wal);
     }
-    _exec_env->wal_mgr()->erase_wal_column_index(wal_id);
     return Status::OK();
 }
 
 void WalTable::stop() {
-    bool done = true;
     do {
         {
             std::lock_guard<std::mutex> lock(_replay_wal_lock);
             if (!this->_stop.load()) {
                 this->_stop.store(true);
             }
-            auto it = _replay_wal_map.begin();
-            for (; it != _replay_wal_map.end(); it++) {
-                auto& [retry_num, start_time, replaying] = it->second;
-                if (replaying) {
-                    break;
-                }
-            }
-            if (it != _replay_wal_map.end()) {
-                done = false;
-            } else {
-                done = true;
+            if (_replay_wal_map.empty() && _replaying_queue.empty()) {
+                break;
             }
-        }
-        if (!done) {
+            LOG(INFO) << "stopping wal_table,wait for relay wal task done, now 
"
+                      << _replay_wal_map.size() << " wals wait to replay, "
+                      << _replaying_queue.size() << " wals are replaying";
             std::this_thread::sleep_for(std::chrono::milliseconds(1000));
         }
-    } while (!done);
+    } while (true);
 }
 
 size_t WalTable::size() {
@@ -429,13 +359,13 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t 
tb_id) {
         }
         std::vector<TColumnInfo> column_element = result.columns;
         int64_t column_index = 1;
-        _column_id_name_map.clear();
-        _column_id_index_map.clear();
+        _column_id_info_map.clear();
         for (auto column : column_element) {
-            auto column_name = column.columnName;
-            auto column_id = column.columnId;
-            _column_id_name_map.emplace(column_id, column_name);
-            _column_id_index_map.emplace(column_id, column_index);
+            auto column_name = column.column_name;
+            auto column_id = column.column_id;
+            std::shared_ptr<ColumnInfo> column_pair =
+                    std::make_shared<ColumnInfo>(std::make_pair(column_name, 
column_index));
+            _column_id_info_map.emplace(column_id, column_pair);
             column_index++;
         }
     }
diff --git a/be/src/olap/wal_table.h b/be/src/olap/wal_table.h
index e3d66d577a2..251e8d51a61 100644
--- a/be/src/olap/wal_table.h
+++ b/be/src/olap/wal_table.h
@@ -24,6 +24,8 @@
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/FrontendService_types.h"
 #include "gen_cpp/HeartbeatService_types.h"
+#include "http/action/http_stream.h"
+#include "olap/wal_info.h"
 #include "runtime/exec_env.h"
 #include "runtime/stream_load/stream_load_context.h"
 namespace doris {
@@ -32,25 +34,32 @@ public:
     WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id);
     ~WalTable();
     // used when be start and there are wals need to do recovery
-    void add_wals(std::vector<std::string> wals);
+    void add_wal(int64_t wal_id, std::string wal);
+    void pick_relay_wals();
+    Status relay_wal_one_by_one();
     Status replay_wals();
     size_t size();
     void stop();
 
 public:
-    // <retry_num, start_time_ms, is_doing_replay>
-    using replay_wal_info = std::tuple<int64_t, int64_t, bool>;
+    // <column_name, column_index>
+    using ColumnInfo = std::pair<std::string, int64_t>;
 
 private:
-    Status _get_wal_info(const std::string& wal, 
std::shared_ptr<std::pair<int64_t, std::string>>&);
-    std::string _get_tmp_path(const std::string wal);
-    Status _send_request(int64_t wal_id, const std::string& wal, const 
std::string& label);
-    Status _abort_txn(int64_t db_id, int64_t wal_id);
+    Status _parse_wal_path(const std::string& wal,
+                           std::shared_ptr<std::pair<int64_t, std::string>>&);
+    Status _rename_to_tmp_path(const std::string wal);
+    Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string& 
wal,
+                                           const std::string& label);
+    Status _try_abort_txn(int64_t db_id, int64_t wal_id);
     Status _get_column_info(int64_t db_id, int64_t tb_id);
     Status _read_wal_header(const std::string& wal, std::string& columns);
-    bool _need_replay(const replay_wal_info& info);
+    bool _need_replay(std::shared_ptr<WalInfo>);
     Status _replay_wal_internal(const std::string& wal);
     Status _delete_wal(int64_t wal_id);
+    Status _construct_sql_str(const std::string& wal, const std::string& label,
+                              std::string& sql_str, std::vector<size_t>& 
index_vector);
+    Status _handle_stream_load(int64_t wal_id, const std::string& wal, const 
std::string& label);
 
 private:
     ExecEnv* _exec_env;
@@ -60,9 +69,10 @@ private:
     std::string _split = "_";
     mutable std::mutex _replay_wal_lock;
     // key is wal_id
-    std::map<std::string, replay_wal_info> _replay_wal_map;
+    std::map<std::string, std::shared_ptr<WalInfo>> _replay_wal_map;
+    std::list<std::shared_ptr<WalInfo>> _replaying_queue;
     std::atomic<bool> _stop;
-    std::map<int64_t, std::string> _column_id_name_map;
-    std::map<int64_t, int64_t> _column_id_index_map;
+    std::map<int64_t, std::shared_ptr<ColumnInfo>> _column_id_info_map;
+    std::shared_ptr<HttpStreamAction> _http_stream_action;
 };
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 28df650f13c..acb40ae6c78 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -51,7 +51,11 @@ Status 
LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block, bool
     if (block->rows() > 0) {
         _block_queue.push_back(block);
         if (write_wal) {
-            RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get()));
+            auto st = _v_wal_writer->write_wal(block.get());
+            if (!st.ok()) {
+                _cancel_without_lock(st);
+                return st;
+            }
         }
         _all_block_queues_bytes->fetch_add(block->bytes(), 
std::memory_order_relaxed);
     }
@@ -278,11 +282,17 @@ Status GroupCommitTable::_create_group_commit_load(
         _load_block_queues.emplace(instance_id, load_block_queue);
         _need_plan_fragment = false;
         _exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id,
-                                                   
WalManager::WAL_STATUS::PREPARE);
+                                                   
WalManager::WalStatus::PREPARE);
         //create wal
-        RETURN_IF_ERROR(
-                load_block_queue->create_wal(_db_id, _table_id, txn_id, label, 
_exec_env->wal_mgr(),
-                                             params.desc_tbl.slotDescriptors, 
be_exe_version));
+        if (!is_pipeline) {
+            RETURN_IF_ERROR(load_block_queue->create_wal(
+                    _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
+                    params.desc_tbl.slotDescriptors, be_exe_version));
+        } else {
+            RETURN_IF_ERROR(load_block_queue->create_wal(
+                    _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
+                    pipeline_params.desc_tbl.slotDescriptors, be_exe_version));
+        }
         _cv.notify_all();
     }
     st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, 
params,
@@ -367,10 +377,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
     } else {
         std::string wal_path;
         RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path));
-        RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id,
-                                                              
std::vector<std::string> {wal_path}));
-        _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id,
-                                                   
WalManager::WAL_STATUS::REPLAY);
+        RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, 
txn_id, wal_path));
+        _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id, 
WalManager::WalStatus::REPLAY);
     }
     std::stringstream ss;
     ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id 
<< ", label=" << label
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index b530242743e..e57996af9e1 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -139,6 +139,7 @@ public:
     int64_t table_id = -1;
     int64_t schema_version = -1;
     std::string label;
+    std::string sql_str;
     // optional
     std::string sub_label;
     double max_filter_ratio = 0.0;
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp 
b/be/src/vec/exec/format/wal/wal_reader.cpp
index 01846acc04a..32fd66cc309 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -88,8 +88,6 @@ void WalReader::string_split(const std::string& str, const 
std::string& splits,
 Status WalReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                               std::unordered_set<std::string>* missing_cols) {
     RETURN_IF_ERROR(_wal_reader->read_header(_version, _col_ids));
-    std::vector<std::string> col_element;
-    string_split(_col_ids, ",", col_element);
     
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_column_index(_wal_id, 
_column_index));
     return Status::OK();
 }
diff --git a/be/src/vec/sink/writer/vwal_writer.cpp 
b/be/src/vec/sink/writer/vwal_writer.cpp
index 2dc945a2a2f..2ab37c340a0 100644
--- a/be/src/vec/sink/writer/vwal_writer.cpp
+++ b/be/src/vec/sink/writer/vwal_writer.cpp
@@ -56,7 +56,7 @@ VWalWriter::~VWalWriter() {}
 
 Status VWalWriter::init() {
     RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer));
-    _wal_manager->add_wal_status_queue(_tb_id, _wal_id, 
WalManager::WAL_STATUS::CREATE);
+    _wal_manager->add_wal_status_queue(_tb_id, _wal_id, 
WalManager::WalStatus::CREATE);
     std::stringstream ss;
     for (auto slot_desc : _slot_descs) {
         if (slot_desc.col_unique_id < 0) {
diff --git a/be/test/olap/wal_manager_test.cpp 
b/be/test/olap/wal_manager_test.cpp
index 93d8636eb22..c5c216f12a7 100644
--- a/be/test/olap/wal_manager_test.cpp
+++ b/be/test/olap/wal_manager_test.cpp
@@ -44,8 +44,7 @@
 namespace doris {
 
 extern TLoadTxnBeginResult k_stream_load_begin_result;
-extern Status k_stream_load_plan_status;
-extern std::string k_request_line;
+extern Status k_stream_load_exec_status;
 
 ExecEnv* _env = nullptr;
 std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
@@ -67,7 +66,6 @@ public:
         _env->_store_paths = {StorePath(std::filesystem::current_path(), 0)};
         _env->_wal_manager = WalManager::create_shared(_env, wal_dir);
         k_stream_load_begin_result = TLoadTxnBeginResult();
-        k_stream_load_plan_status = Status::OK();
     }
     void TearDown() override {
         
static_cast<void>(io::global_local_filesystem()->delete_directory(wal_dir));
@@ -88,7 +86,7 @@ public:
 
 TEST_F(WalManagerTest, recovery_normal) {
     _env->wal_mgr()->wal_limit_test_bytes = 1099511627776;
-    k_request_line = "{\"Status\": \"Success\",    \"Message\": \"Test\"}";
+    k_stream_load_exec_status = Status::OK();
 
     std::string db_id = "1";
     int64_t tb_1_id = 1;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 0fcd5050fc8..da82c4406c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -1110,7 +1110,7 @@ public class NativeInsertStmt extends InsertStmt {
     }
 
     public void analyzeGroupCommit(Analyzer analyzer) throws AnalysisException 
{
-        if (isGroupCommitStreamLoadSql && (targetTable instanceof OlapTable)
+        if (isGroupCommitStreamLoadSql && targetTable != null && (targetTable 
instanceof OlapTable)
                 && !((OlapTable) 
targetTable).getTableProperty().getUseSchemaLightChange()) {
             throw new AnalysisException(
                     "table light_schema_change is false, can't do http_stream 
with group commit mode");
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index d672597a0db..209947c8b78 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -688,16 +688,6 @@ struct TStreamLoadWithLoadStatusResult {
     6: optional i64 unselected_rows
 }
 
-struct TCheckWalRequest {
-    1: optional i64 wal_id
-    2: optional i64 db_id
-}
-
-struct TCheckWalResult {
-    1: optional Status.TStatus status
-    2: optional bool need_recovery
-}
-
 struct TKafkaRLTaskProgress {
     1: required map<i32,i64> partitionCmtOffset
 }
@@ -1303,8 +1293,8 @@ struct TGetBackendMetaResult {
 }
 
 struct TColumnInfo {
-  1: optional string columnName
-  2: optional i64 columnId
+  1: optional string column_name
+  2: optional i64 column_id
 }
 
 struct TGetColumnInfoRequest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to