This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit eea9b56f69f6619b35797fc2d7b742fb5ef31f58
Author: meiyi <myime...@gmail.com>
AuthorDate: Tue Mar 5 18:55:03 2024 +0800

    [fix](group commit) handle group commit create plan error (#31757)
---
 be/src/olap/wal/wal_table.cpp                      | 38 ++++++++--------------
 be/src/olap/wal/wal_table.h                        |  2 +-
 be/src/runtime/group_commit_mgr.cpp                | 22 +++++--------
 be/src/runtime/group_commit_mgr.h                  |  2 +-
 be/test/olap/wal/wal_manager_test.cpp              |  2 --
 .../apache/doris/service/FrontendServiceImpl.java  |  5 ++-
 6 files changed, 27 insertions(+), 44 deletions(-)

diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index 0389c6fed21..14e3779748c 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -45,10 +45,6 @@ WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t 
table_id)
 }
 WalTable::~WalTable() {}
 
-#ifdef BE_TEST
-Status k_stream_load_exec_status;
-#endif
-
 void WalTable::add_wal(int64_t wal_id, std::string wal) {
     std::lock_guard<std::mutex> lock(_replay_wal_lock);
     LOG(INFO) << "add replay wal=" << wal;
@@ -94,19 +90,18 @@ Status WalTable::_relay_wal_one_by_one() {
     for (auto wal_info : _replaying_queue) {
         wal_info->add_retry_num();
         auto st = _replay_wal_internal(wal_info->get_wal_path());
-        if (!st.ok()) {
-            doris::wal_fail << 1;
-            LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path()
-                         << ", st=" << st.to_string();
-            if (!st.is<ErrorCode::NOT_FOUND>() && 
!st.is<ErrorCode::DATA_QUALITY_ERROR>()) {
-                need_retry_wals.push_back(wal_info);
-            } else {
-                need_delete_wals.push_back(wal_info);
-            }
-        } else {
+        auto msg = st.msg();
+        if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || 
st.is<ErrorCode::NOT_FOUND>() ||
+            st.is<ErrorCode::DATA_QUALITY_ERROR>() ||
+            msg.find("LabelAlreadyUsedException") != msg.npos) {
             LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
                       << ", st=" << st.to_string();
             need_delete_wals.push_back(wal_info);
+        } else {
+            doris::wal_fail << 1;
+            LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path()
+                         << ", st=" << st.to_string();
+            need_retry_wals.push_back(wal_info);
         }
     }
     {
@@ -200,7 +195,7 @@ Status WalTable::_replay_wal_internal(const std::string& 
wal) {
         [[maybe_unused]] auto st = _try_abort_txn(_db_id, label);
     }
 #endif
-    return _replay_one_txn_with_stremaload(wal_id, wal, label);
+    return _replay_one_wal_with_streamload(wal_id, wal, label);
 }
 
 Status WalTable::_construct_sql_str(const std::string& wal, const std::string& 
label,
@@ -241,6 +236,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const 
std::string& wal,
     ctx->sql_str = sql_str;
     ctx->wal_id = wal_id;
     ctx->label = label;
+    ctx->need_commit_self = false;
     ctx->auth.token = "relay_wal"; // this is a fake, fe not check it now
     ctx->auth.user = "admin";
     ctx->group_commit = false;
@@ -263,19 +259,13 @@ Status WalTable::_handle_stream_load(int64_t wal_id, 
const std::string& wal,
     return st;
 }
 
-Status WalTable::_replay_one_txn_with_stremaload(int64_t wal_id, const 
std::string& wal,
+Status WalTable::_replay_one_wal_with_streamload(int64_t wal_id, const 
std::string& wal,
                                                  const std::string& label) {
-    bool success = false;
 #ifndef BE_TEST
-    auto st = _handle_stream_load(wal_id, wal, label);
-    auto msg = st.msg();
-    success = st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() ||
-              msg.find("LabelAlreadyUsedException") != msg.npos;
+    return _handle_stream_load(wal_id, wal, label);
 #else
-    success = k_stream_load_exec_status.ok();
-    auto st = Status::OK();
+    return Status::OK();
 #endif
-    return success ? Status::OK() : st;
 }
 
 void WalTable::stop() {
diff --git a/be/src/olap/wal/wal_table.h b/be/src/olap/wal/wal_table.h
index f6ed3d865b9..0e781cd9153 100644
--- a/be/src/olap/wal/wal_table.h
+++ b/be/src/olap/wal/wal_table.h
@@ -50,7 +50,7 @@ private:
     Status _get_column_info(int64_t db_id, int64_t tb_id,
                             std::map<int64_t, std::string>& column_info_map);
 
-    Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string& 
wal,
+    Status _replay_one_wal_with_streamload(int64_t wal_id, const std::string& 
wal,
                                            const std::string& label);
     Status _handle_stream_load(int64_t wal_id, const std::string& wal, const 
std::string& label);
     Status _construct_sql_str(const std::string& wal, const std::string& label,
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 1e231055cff..887dd856bb4 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -209,13 +209,16 @@ Status GroupCommitTable::get_first_block_load_queue(
                         "schema version not match, maybe a schema change is in 
process. Please "
                         "retry this load manually.");
             }
-            if (!_need_plan_fragment) {
-                _need_plan_fragment = true;
+            if (!_is_creating_plan_fragment) {
+                _is_creating_plan_fragment = true;
                 RETURN_IF_ERROR(_thread_pool->submit_func([&] {
                     auto st = _create_group_commit_load(load_block_queue, 
be_exe_version);
                     if (!st.ok()) {
-                        LOG(WARNING) << "fail to create block queue,st=" << 
st.to_string();
+                        LOG(WARNING) << "create group commit load error, st=" 
<< st.to_string();
                         load_block_queue.reset();
+                        std::unique_lock l(_lock);
+                        _is_creating_plan_fragment = false;
+                        _cv.notify_all();
                     }
                 }));
             }
@@ -241,13 +244,6 @@ Status GroupCommitTable::get_first_block_load_queue(
 Status GroupCommitTable::_create_group_commit_load(
         std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) 
{
     Status st = Status::OK();
-    std::unique_ptr<int, std::function<void(int*)>> 
finish_plan_func((int*)0x01, [&](int*) {
-        if (!st.ok()) {
-            std::unique_lock l(_lock);
-            _need_plan_fragment = false;
-            _cv.notify_all();
-        }
-    });
     TStreamLoadPutRequest request;
     UniqueId load_id = UniqueId::gen_uid();
     TUniqueId tload_id;
@@ -281,13 +277,13 @@ Status GroupCommitTable::_create_group_commit_load(
             10000L);
     if (!st.ok()) {
         LOG(WARNING) << "create group commit load rpc error, st=" << 
st.to_string();
+        return st;
     }
-    RETURN_IF_ERROR(st);
     st = Status::create<false>(result.status);
     if (!st.ok()) {
         LOG(WARNING) << "create group commit load error, st=" << 
st.to_string();
+        return st;
     }
-    RETURN_IF_ERROR(st);
     auto schema_version = result.base_schema_version;
     auto is_pipeline = result.__isset.pipeline_params;
     auto& params = result.params;
@@ -326,7 +322,7 @@ Status GroupCommitTable::_create_group_commit_load(
                     be_exe_version));
         }
         _load_block_queues.emplace(instance_id, load_block_queue);
-        _need_plan_fragment = false;
+        _is_creating_plan_fragment = false;
         _cv.notify_all();
     }
     st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, 
params,
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 5357ba208f7..dc135ea1063 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -156,7 +156,7 @@ private:
     std::condition_variable _cv;
     // fragment_instance_id to load_block_queue
     std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> 
_load_block_queues;
-    bool _need_plan_fragment = false;
+    bool _is_creating_plan_fragment = false;
 };
 
 class GroupCommitMgr {
diff --git a/be/test/olap/wal/wal_manager_test.cpp 
b/be/test/olap/wal/wal_manager_test.cpp
index d264cb621c3..2dc59cec9d9 100644
--- a/be/test/olap/wal/wal_manager_test.cpp
+++ b/be/test/olap/wal/wal_manager_test.cpp
@@ -44,7 +44,6 @@
 namespace doris {
 
 extern TLoadTxnBeginResult k_stream_load_begin_result;
-extern Status k_stream_load_exec_status;
 
 ExecEnv* _env = nullptr;
 std::filesystem::path wal_dir = std::filesystem::current_path().string() + 
"/wal_test";
@@ -103,7 +102,6 @@ public:
 
 TEST_F(WalManagerTest, recovery_normal) {
     _env->wal_mgr()->wal_limit_test_bytes = 1099511627776;
-    k_stream_load_exec_status = Status::OK();
 
     std::string db_id = "1";
     int64_t tb_1_id = 1;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 850686116e2..1316425df4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1687,12 +1687,11 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         try {
             loadTxnRollbackImpl(request);
         } catch (MetaNotFoundException e) {
-            String msg = "failed to rollback txn" + request.getTxnId();
-            LOG.warn(msg, e);
+            LOG.warn("failed to rollback txn, id: {}, label: {}", 
request.getTxnId(), request.getLabel(), e);
             status.setStatusCode(TStatusCode.NOT_FOUND);
             status.addToErrorMsgs(e.getMessage());
         } catch (UserException e) {
-            LOG.warn("failed to rollback txn {}: {}", request.getTxnId(), 
e.getMessage());
+            LOG.warn("failed to rollback txn, id: {}, label: {}", 
request.getTxnId(), request.getLabel(), e);
             status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
             status.addToErrorMsgs(e.getMessage());
         } catch (Throwable e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to