This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit cbffdbb8bfef54bb7201c1cbca8c2a216c73520f
Author: huanghaibin <284824...@qq.com>
AuthorDate: Thu Jan 11 19:56:47 2024 +0800

    [bug](group_commit) fix relay wal problem on materialized-view (#29848)
---
 be/src/olap/wal/wal_table.cpp                                      | 1 +
 be/src/runtime/group_commit_mgr.cpp                                | 4 ++--
 .../src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java    | 2 +-
 .../src/main/java/org/apache/doris/httpv2/rest/LoadAction.java     | 7 ++++++-
 4 files changed, 10 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index 54500273daa..a6d7a4054c8 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -234,6 +234,7 @@ Status WalTable::_construct_sql_str(const std::string& wal, 
const std::string& l
             auto it = column_info_map.find(column_id);
             if (it != column_info_map.end()) {
                 ss_name << "`" << it->second << "`,";
+                column_info_map.erase(column_id);
             }
         } catch (const std::invalid_argument& e) {
             return Status::InvalidArgument("Invalid format, {}", e.what());
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 6d2ec020e7b..692f7c6846b 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -61,6 +61,8 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
     if (block->rows() > 0) {
         if (!config::group_commit_wait_replay_wal_finish) {
             _block_queue.push_back(block);
+            _data_bytes += block->bytes();
+            _all_block_queues_bytes->fetch_add(block->bytes(), 
std::memory_order_relaxed);
         } else {
             LOG(INFO) << "skip adding block to queue on txn " << txn_id;
         }
@@ -71,8 +73,6 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
                 return st;
             }
         }
-        _data_bytes += block->bytes();
-        _all_block_queues_bytes->fetch_add(block->bytes(), 
std::memory_order_relaxed);
     }
     if (_data_bytes >= _group_commit_data_bytes) {
         VLOG_DEBUG << "group commit meets commit condition for data size, 
label=" << label
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index d40020fd57e..eef902dea82 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -607,7 +607,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         List<Long> aliveBeIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
         long expireTime = System.currentTimeMillis() + 
Config.check_wal_queue_timeout_threshold;
         while (true) {
-            LOG.info("wai for wal queue size to be empty");
+            LOG.info("wait for wal queue size to be empty");
             boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
                     .isPreviousWalFinished(tableId, aliveBeIds);
             if (walFinished) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 787f77f1da1..2b4f20a57cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -333,7 +333,12 @@ public class LoadAction extends RestBaseController {
                 .setEnableRoundRobin(true)
                 .needLoadAvailable().build();
         policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
-        List<Long> backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+        List<Long> backendIds;
+        if (groupCommit) {
+            backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1);
+        } else {
+            backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+        }
         if (backendIds.isEmpty()) {
             throw new 
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + 
policy);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to