This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 69524ccf98f [improve](group commit) Group commit support commit by 
data size (#29428)
69524ccf98f is described below

commit 69524ccf98f57877440d8e09d878fb76f3e011ad
Author: meiyi <myime...@gmail.com>
AuthorDate: Tue Jan 2 23:20:23 2024 +0800

    [improve](group commit) Group commit support commit by data size (#29428)
---
 be/src/runtime/group_commit_mgr.cpp                | 41 ++++++++------
 be/src/runtime/group_commit_mgr.h                  | 64 +++++++++++++---------
 .../apache/doris/analysis/NativeInsertStmt.java    |  2 -
 .../apache/doris/service/FrontendServiceImpl.java  |  2 +
 gensrc/thrift/FrontendService.thrift               |  1 +
 5 files changed, 66 insertions(+), 44 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 0c3589edbe0..2971138d5b6 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -36,11 +36,11 @@ Status LoadBlockQueue::add_block(RuntimeState* 
runtime_state,
     while (!runtime_state->is_cancelled() && status.ok() &&
            _all_block_queues_bytes->load(std::memory_order_relaxed) >
                    config::group_commit_queue_mem_limit) {
-        _put_cond.wait_for(
-                l, 
std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME));
+        _put_cond.wait_for(l,
+                           
std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME));
         auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
                 std::chrono::steady_clock::now() - start);
-        if (duration.count() > LoadBlockQueue::WAL_MEM_BACK_PRESSURE_TIME_OUT) 
{
+        if (duration.count() > LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIMEOUT) 
{
             return Status::TimedOut(
                     "Wal memory back pressure wait too much time! Load block 
queue txn id: {}, "
                     "label: {}, instance id: {}",
@@ -60,8 +60,14 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
                 return st;
             }
         }
+        _data_bytes += block->bytes();
         _all_block_queues_bytes->fetch_add(block->bytes(), 
std::memory_order_relaxed);
     }
+    if (_data_bytes >= _group_commit_data_bytes) {
+        VLOG_DEBUG << "group commit meets commit condition for data size, 
label=" << label
+                   << ", instance_id=" << load_instance_id << ", data_bytes=" 
<< _data_bytes;
+        _need_commit = true;
+    }
     _get_cond.notify_all();
     return Status::OK();
 }
@@ -71,25 +77,25 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
     *find_block = false;
     *eos = false;
     std::unique_lock l(mutex);
-    if (!need_commit) {
+    if (!_need_commit) {
         auto left_milliseconds =
                 _group_commit_interval_ms - 
std::chrono::duration_cast<std::chrono::milliseconds>(
                                                     
std::chrono::steady_clock::now() - _start_time)
                                                     .count();
         if (left_milliseconds <= 0) {
-            need_commit = true;
+            _need_commit = true;
         }
     }
     while (!runtime_state->is_cancelled() && status.ok() && 
_block_queue.empty() &&
-           (!need_commit || (need_commit && !_load_ids.empty()))) {
+           (!_need_commit || (_need_commit && !_load_ids.empty()))) {
         auto left_milliseconds = _group_commit_interval_ms;
         auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
                                 std::chrono::steady_clock::now() - _start_time)
                                 .count();
-        if (!need_commit) {
+        if (!_need_commit) {
             left_milliseconds = _group_commit_interval_ms - duration;
             if (left_milliseconds <= 0) {
-                need_commit = true;
+                _need_commit = true;
                 break;
             }
         } else {
@@ -120,7 +126,7 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
         _block_queue.pop_front();
         _all_block_queues_bytes->fetch_sub(block->bytes(), 
std::memory_order_relaxed);
     }
-    if (_block_queue.empty() && need_commit && _load_ids.empty()) {
+    if (_block_queue.empty() && _need_commit && _load_ids.empty()) {
         *eos = true;
     } else {
         *eos = false;
@@ -139,7 +145,7 @@ void LoadBlockQueue::remove_load_id(const UniqueId& 
load_id) {
 
 Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
     std::unique_lock l(mutex);
-    if (need_commit) {
+    if (_need_commit) {
         return Status::InternalError("block queue is set need commit, id=" +
                                      load_instance_id.to_string());
     }
@@ -175,7 +181,7 @@ Status GroupCommitTable::get_first_block_load_queue(
         for (int i = 0; i < 3; i++) {
             bool is_schema_version_match = true;
             for (auto it = _load_block_queues.begin(); it != 
_load_block_queues.end(); ++it) {
-                if (!it->second->need_commit) {
+                if (!it->second->need_commit()) {
                     if (base_schema_version == it->second->schema_version) {
                         if (it->second->add_load_id(load_id).ok()) {
                             load_block_queue = it->second;
@@ -282,7 +288,8 @@ Status GroupCommitTable::_create_group_commit_load(
     {
         load_block_queue = std::make_shared<LoadBlockQueue>(
                 instance_id, label, txn_id, schema_version, 
_all_block_queues_bytes,
-                result.wait_internal_group_commit_finish, 
result.group_commit_interval_ms);
+                result.wait_internal_group_commit_finish, 
result.group_commit_interval_ms,
+                result.group_commit_data_bytes);
         std::unique_lock l(_lock);
         _load_block_queues.emplace(instance_id, load_block_queue);
         _need_plan_fragment = false;
@@ -377,7 +384,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
     if (status.ok() && st.ok() &&
         (result_status.ok() || 
result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
         RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(
-                txn_id, load_block_queue->block_queue_pre_allocated.load()));
+                txn_id, load_block_queue->block_queue_pre_allocated()));
         RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, 
txn_id));
     } else {
         std::string wal_path;
@@ -485,7 +492,7 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t 
tb_id, int64_t wal_id,
                                   const std::string& import_label, WalManager* 
wal_manager,
                                   std::vector<TSlotDescriptor>& slot_desc, int 
be_exe_version) {
     RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->add_wal_path(db_id, 
tb_id, wal_id,
-                                                                    
import_label, wal_base_path));
+                                                                    
import_label, _wal_base_path));
     _v_wal_writer = std::make_shared<vectorized::VWalWriter>(
             tb_id, wal_id, import_label, wal_manager, slot_desc, 
be_exe_version);
     return _v_wal_writer->init();
@@ -502,17 +509,17 @@ bool LoadBlockQueue::has_enough_wal_disk_space(size_t 
pre_allocated) {
     auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
     size_t available_bytes = 0;
     {
-        Status st = wal_mgr->get_wal_dir_available_size(wal_base_path, 
&available_bytes);
+        Status st = wal_mgr->get_wal_dir_available_size(_wal_base_path, 
&available_bytes);
         if (!st.ok()) {
             LOG(WARNING) << "get wal disk available size filed!";
         }
     }
     if (pre_allocated < available_bytes) {
-        Status st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, 
pre_allocated, true);
+        Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, 
pre_allocated, true);
         if (!st.ok()) {
             LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << 
st.to_string();
         }
-        block_queue_pre_allocated.fetch_add(pre_allocated);
+        _block_queue_pre_allocated.fetch_add(pre_allocated);
         return true;
     } else {
         return false;
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 125256535fe..49bdd5f87a7 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -45,15 +45,17 @@ public:
     LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, 
int64_t txn_id,
                    int64_t schema_version,
                    std::shared_ptr<std::atomic_size_t> all_block_queues_bytes,
-                   bool wait_internal_group_commit_finish, int64_t 
group_commit_interval_ms)
+                   bool wait_internal_group_commit_finish, int64_t 
group_commit_interval_ms,
+                   int64_t group_commit_data_bytes)
             : load_instance_id(load_instance_id),
               label(label),
               txn_id(txn_id),
               schema_version(schema_version),
               
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
+              _group_commit_interval_ms(group_commit_interval_ms),
               _start_time(std::chrono::steady_clock::now()),
-              _all_block_queues_bytes(all_block_queues_bytes),
-              _group_commit_interval_ms(group_commit_interval_ms) {};
+              _group_commit_data_bytes(group_commit_data_bytes),
+              _all_block_queues_bytes(all_block_queues_bytes) {};
 
     Status add_block(RuntimeState* runtime_state, 
std::shared_ptr<vectorized::Block> block,
                      bool write_wal);
@@ -62,44 +64,54 @@ public:
     Status add_load_id(const UniqueId& load_id);
     void remove_load_id(const UniqueId& load_id);
     void cancel(const Status& st);
+    bool need_commit() { return _need_commit; }
+
     Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const 
std::string& import_label,
                       WalManager* wal_manager, std::vector<TSlotDescriptor>& 
slot_desc,
                       int be_exe_version);
     Status close_wal();
     bool has_enough_wal_disk_space(size_t pre_allocated);
+    size_t block_queue_pre_allocated() { return 
_block_queue_pre_allocated.load(); }
 
-    // 1s
-    static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
-    // 120s
-    static constexpr size_t WAL_MEM_BACK_PRESSURE_TIME_OUT = 120000;
     UniqueId load_instance_id;
     std::string label;
     int64_t txn_id;
     int64_t schema_version;
-    bool need_commit = false;
     bool wait_internal_group_commit_finish = false;
+
+    // the execute status of this internal group commit
     std::mutex mutex;
-    bool process_finish = false;
     std::condition_variable internal_group_commit_finish_cv;
+    bool process_finish = false;
     Status status = Status::OK();
-    std::string wal_base_path;
-    std::atomic_size_t block_queue_pre_allocated = 0;
 
 private:
     void _cancel_without_lock(const Status& st);
-    std::chrono::steady_clock::time_point _start_time;
 
-    std::condition_variable _put_cond;
-    std::condition_variable _get_cond;
     // the set of load ids of all blocks in this queue
     std::set<UniqueId> _load_ids;
     std::list<std::shared_ptr<vectorized::Block>> _block_queue;
 
-    // memory consumption of all tables' load block queues, used for back 
pressure.
-    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
-    // group commit interval in ms, can be changed by 'ALTER TABLE my_table 
SET ("group_commit_interval_ms"="1000");'
-    int64_t _group_commit_interval_ms;
+    // wal
+    std::string _wal_base_path;
     std::shared_ptr<vectorized::VWalWriter> _v_wal_writer;
+    std::atomic_size_t _block_queue_pre_allocated = 0;
+
+    // commit
+    bool _need_commit = false;
+    // commit by time interval, can be changed by 'ALTER TABLE my_table SET 
("group_commit_interval_ms"="1000");'
+    int64_t _group_commit_interval_ms;
+    std::chrono::steady_clock::time_point _start_time;
+    // commit by data size
+    int64_t _group_commit_data_bytes;
+    int64_t _data_bytes = 0;
+
+    // memory back pressure, memory consumption of all tables' load block 
queues
+    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
+    std::condition_variable _put_cond;
+    std::condition_variable _get_cond;
+    static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIME = 1000;      // 1s
+    static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIMEOUT = 120000; // 120s
 };
 
 class GroupCommitTable {
@@ -108,9 +120,9 @@ public:
                      int64_t table_id, std::shared_ptr<std::atomic_size_t> 
all_block_queue_bytes)
             : _exec_env(exec_env),
               _thread_pool(thread_pool),
+              _all_block_queues_bytes(all_block_queue_bytes),
               _db_id(db_id),
-              _table_id(table_id),
-              _all_block_queues_bytes(all_block_queue_bytes) {};
+              _table_id(table_id) {};
     Status get_first_block_load_queue(int64_t table_id, int64_t 
base_schema_version,
                                       const UniqueId& load_id,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
@@ -131,15 +143,17 @@ private:
 
     ExecEnv* _exec_env = nullptr;
     ThreadPool* _thread_pool = nullptr;
+    // memory consumption of all tables' load block queues, used for memory 
back pressure.
+    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
+
     int64_t _db_id;
     int64_t _table_id;
+
     std::mutex _lock;
     std::condition_variable _cv;
     // fragment_instance_id to load_block_queue
     std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> 
_load_block_queues;
     bool _need_plan_fragment = false;
-    // memory consumption of all tables' load block queues, used for back 
pressure.
-    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
 };
 
 class GroupCommitMgr {
@@ -159,13 +173,13 @@ public:
 
 private:
     ExecEnv* _exec_env = nullptr;
+    std::unique_ptr<doris::ThreadPool> _thread_pool;
+    // memory consumption of all tables' load block queues, used for memory 
back pressure.
+    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
 
     std::mutex _lock;
     // TODO remove table when unused
     std::unordered_map<int64_t, std::shared_ptr<GroupCommitTable>> _table_map;
-    std::unique_ptr<doris::ThreadPool> _thread_pool;
-    // memory consumption of all tables' load block queues, used for back 
pressure.
-    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 576ebba9b0e..176833b865c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -69,7 +69,6 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -150,7 +149,6 @@ public class NativeInsertStmt extends InsertStmt {
     private boolean isGroupCommit = false;
     private int baseSchemaVersion = -1;
     private TUniqueId loadId = null;
-    private ByteString execPlanFragmentParamsBytes = null;
     private long tableId = -1;
     public boolean isGroupCommitStreamLoadSql = false;
     private GroupCommitPlanner groupCommitPlanner;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 136084f5f1c..cf42cb921f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1983,6 +1983,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             result.setTableId(parsedStmt.getTargetTable().getId());
             result.setBaseSchemaVersion(((OlapTable) 
parsedStmt.getTargetTable()).getBaseSchemaVersion());
             result.setGroupCommitIntervalMs(((OlapTable) 
parsedStmt.getTargetTable()).getGroupCommitIntervalMs());
+            // TODO get from table property
+            result.setGroupCommitDataBytes(134217728L);
             
result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish);
         } catch (UserException e) {
             LOG.warn("exec sql error", e);
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 209947c8b78..75e67722113 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -670,6 +670,7 @@ struct TStreamLoadPutResult {
     6: optional i64 table_id
     7: optional bool wait_internal_group_commit_finish = false
     8: optional i64 group_commit_interval_ms
+    9: optional i64 group_commit_data_bytes
 }
 
 struct TStreamLoadMultiTablePutResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to