This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 69524ccf98f [improve](group commit) Group commit support commit by data size (#29428) 69524ccf98f is described below commit 69524ccf98f57877440d8e09d878fb76f3e011ad Author: meiyi <myime...@gmail.com> AuthorDate: Tue Jan 2 23:20:23 2024 +0800 [improve](group commit) Group commit support commit by data size (#29428) --- be/src/runtime/group_commit_mgr.cpp | 41 ++++++++------ be/src/runtime/group_commit_mgr.h | 64 +++++++++++++--------- .../apache/doris/analysis/NativeInsertStmt.java | 2 - .../apache/doris/service/FrontendServiceImpl.java | 2 + gensrc/thrift/FrontendService.thrift | 1 + 5 files changed, 66 insertions(+), 44 deletions(-) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 0c3589edbe0..2971138d5b6 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -36,11 +36,11 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, while (!runtime_state->is_cancelled() && status.ok() && _all_block_queues_bytes->load(std::memory_order_relaxed) > config::group_commit_queue_mem_limit) { - _put_cond.wait_for( - l, std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME)); + _put_cond.wait_for(l, + std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME)); auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now() - start); - if (duration.count() > LoadBlockQueue::WAL_MEM_BACK_PRESSURE_TIME_OUT) { + if (duration.count() > LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIMEOUT) { return Status::TimedOut( "Wal memory back pressure wait too much time! Load block queue txn id: {}, " "label: {}, instance id: {}", @@ -60,8 +60,14 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, return st; } } + _data_bytes += block->bytes(); _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); } + if (_data_bytes >= _group_commit_data_bytes) { + VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label + << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes; + _need_commit = true; + } _get_cond.notify_all(); return Status::OK(); } @@ -71,25 +77,25 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* *find_block = false; *eos = false; std::unique_lock l(mutex); - if (!need_commit) { + if (!_need_commit) { auto left_milliseconds = _group_commit_interval_ms - std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now() - _start_time) .count(); if (left_milliseconds <= 0) { - need_commit = true; + _need_commit = true; } } while (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() && - (!need_commit || (need_commit && !_load_ids.empty()))) { + (!_need_commit || (_need_commit && !_load_ids.empty()))) { auto left_milliseconds = _group_commit_interval_ms; auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now() - _start_time) .count(); - if (!need_commit) { + if (!_need_commit) { left_milliseconds = _group_commit_interval_ms - duration; if (left_milliseconds <= 0) { - need_commit = true; + _need_commit = true; break; } } else { @@ -120,7 +126,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* _block_queue.pop_front(); _all_block_queues_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed); } - if (_block_queue.empty() && need_commit && _load_ids.empty()) { + if (_block_queue.empty() && _need_commit && _load_ids.empty()) { *eos = true; } else { *eos = false; @@ -139,7 +145,7 @@ void LoadBlockQueue::remove_load_id(const UniqueId& load_id) { Status LoadBlockQueue::add_load_id(const UniqueId& load_id) { std::unique_lock l(mutex); - if (need_commit) { + if (_need_commit) { return Status::InternalError("block queue is set need commit, id=" + load_instance_id.to_string()); } @@ -175,7 +181,7 @@ Status GroupCommitTable::get_first_block_load_queue( for (int i = 0; i < 3; i++) { bool is_schema_version_match = true; for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) { - if (!it->second->need_commit) { + if (!it->second->need_commit()) { if (base_schema_version == it->second->schema_version) { if (it->second->add_load_id(load_id).ok()) { load_block_queue = it->second; @@ -282,7 +288,8 @@ Status GroupCommitTable::_create_group_commit_load( { load_block_queue = std::make_shared<LoadBlockQueue>( instance_id, label, txn_id, schema_version, _all_block_queues_bytes, - result.wait_internal_group_commit_finish, result.group_commit_interval_ms); + result.wait_internal_group_commit_finish, result.group_commit_interval_ms, + result.group_commit_data_bytes); std::unique_lock l(_lock); _load_block_queues.emplace(instance_id, load_block_queue); _need_plan_fragment = false; @@ -377,7 +384,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ if (status.ok() && st.ok() && (result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) { RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal( - txn_id, load_block_queue->block_queue_pre_allocated.load())); + txn_id, load_block_queue->block_queue_pre_allocated())); RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id)); } else { std::string wal_path; @@ -485,7 +492,7 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) { RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->add_wal_path(db_id, tb_id, wal_id, - import_label, wal_base_path)); + import_label, _wal_base_path)); _v_wal_writer = std::make_shared<vectorized::VWalWriter>( tb_id, wal_id, import_label, wal_manager, slot_desc, be_exe_version); return _v_wal_writer->init(); @@ -502,17 +509,17 @@ bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) { auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr(); size_t available_bytes = 0; { - Status st = wal_mgr->get_wal_dir_available_size(wal_base_path, &available_bytes); + Status st = wal_mgr->get_wal_dir_available_size(_wal_base_path, &available_bytes); if (!st.ok()) { LOG(WARNING) << "get wal disk available size filed!"; } } if (pre_allocated < available_bytes) { - Status st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path, pre_allocated, true); + Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, true); if (!st.ok()) { LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string(); } - block_queue_pre_allocated.fetch_add(pre_allocated); + _block_queue_pre_allocated.fetch_add(pre_allocated); return true; } else { return false; diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 125256535fe..49bdd5f87a7 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -45,15 +45,17 @@ public: LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id, int64_t schema_version, std::shared_ptr<std::atomic_size_t> all_block_queues_bytes, - bool wait_internal_group_commit_finish, int64_t group_commit_interval_ms) + bool wait_internal_group_commit_finish, int64_t group_commit_interval_ms, + int64_t group_commit_data_bytes) : load_instance_id(load_instance_id), label(label), txn_id(txn_id), schema_version(schema_version), wait_internal_group_commit_finish(wait_internal_group_commit_finish), + _group_commit_interval_ms(group_commit_interval_ms), _start_time(std::chrono::steady_clock::now()), - _all_block_queues_bytes(all_block_queues_bytes), - _group_commit_interval_ms(group_commit_interval_ms) {}; + _group_commit_data_bytes(group_commit_data_bytes), + _all_block_queues_bytes(all_block_queues_bytes) {}; Status add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block, bool write_wal); @@ -62,44 +64,54 @@ public: Status add_load_id(const UniqueId& load_id); void remove_load_id(const UniqueId& load_id); void cancel(const Status& st); + bool need_commit() { return _need_commit; } + Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector<TSlotDescriptor>& slot_desc, int be_exe_version); Status close_wal(); bool has_enough_wal_disk_space(size_t pre_allocated); + size_t block_queue_pre_allocated() { return _block_queue_pre_allocated.load(); } - // 1s - static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000; - // 120s - static constexpr size_t WAL_MEM_BACK_PRESSURE_TIME_OUT = 120000; UniqueId load_instance_id; std::string label; int64_t txn_id; int64_t schema_version; - bool need_commit = false; bool wait_internal_group_commit_finish = false; + + // the execute status of this internal group commit std::mutex mutex; - bool process_finish = false; std::condition_variable internal_group_commit_finish_cv; + bool process_finish = false; Status status = Status::OK(); - std::string wal_base_path; - std::atomic_size_t block_queue_pre_allocated = 0; private: void _cancel_without_lock(const Status& st); - std::chrono::steady_clock::time_point _start_time; - std::condition_variable _put_cond; - std::condition_variable _get_cond; // the set of load ids of all blocks in this queue std::set<UniqueId> _load_ids; std::list<std::shared_ptr<vectorized::Block>> _block_queue; - // memory consumption of all tables' load block queues, used for back pressure. - std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes; - // group commit interval in ms, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");' - int64_t _group_commit_interval_ms; + // wal + std::string _wal_base_path; std::shared_ptr<vectorized::VWalWriter> _v_wal_writer; + std::atomic_size_t _block_queue_pre_allocated = 0; + + // commit + bool _need_commit = false; + // commit by time interval, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");' + int64_t _group_commit_interval_ms; + std::chrono::steady_clock::time_point _start_time; + // commit by data size + int64_t _group_commit_data_bytes; + int64_t _data_bytes = 0; + + // memory back pressure, memory consumption of all tables' load block queues + std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes; + std::condition_variable _put_cond; + std::condition_variable _get_cond; + static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIME = 1000; // 1s + static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIMEOUT = 120000; // 120s }; class GroupCommitTable { @@ -108,9 +120,9 @@ public: int64_t table_id, std::shared_ptr<std::atomic_size_t> all_block_queue_bytes) : _exec_env(exec_env), _thread_pool(thread_pool), + _all_block_queues_bytes(all_block_queue_bytes), _db_id(db_id), - _table_id(table_id), - _all_block_queues_bytes(all_block_queue_bytes) {}; + _table_id(table_id) {}; Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, @@ -131,15 +143,17 @@ private: ExecEnv* _exec_env = nullptr; ThreadPool* _thread_pool = nullptr; + // memory consumption of all tables' load block queues, used for memory back pressure. + std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes; + int64_t _db_id; int64_t _table_id; + std::mutex _lock; std::condition_variable _cv; // fragment_instance_id to load_block_queue std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues; bool _need_plan_fragment = false; - // memory consumption of all tables' load block queues, used for back pressure. - std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes; }; class GroupCommitMgr { @@ -159,13 +173,13 @@ public: private: ExecEnv* _exec_env = nullptr; + std::unique_ptr<doris::ThreadPool> _thread_pool; + // memory consumption of all tables' load block queues, used for memory back pressure. + std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes; std::mutex _lock; // TODO remove table when unused std::unordered_map<int64_t, std::shared_ptr<GroupCommitTable>> _table_map; - std::unique_ptr<doris::ThreadPool> _thread_pool; - // memory consumption of all tables' load block queues, used for back pressure. - std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes; }; } // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 576ebba9b0e..176833b865c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -69,7 +69,6 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.protobuf.ByteString; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -150,7 +149,6 @@ public class NativeInsertStmt extends InsertStmt { private boolean isGroupCommit = false; private int baseSchemaVersion = -1; private TUniqueId loadId = null; - private ByteString execPlanFragmentParamsBytes = null; private long tableId = -1; public boolean isGroupCommitStreamLoadSql = false; private GroupCommitPlanner groupCommitPlanner; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 136084f5f1c..cf42cb921f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1983,6 +1983,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setTableId(parsedStmt.getTargetTable().getId()); result.setBaseSchemaVersion(((OlapTable) parsedStmt.getTargetTable()).getBaseSchemaVersion()); result.setGroupCommitIntervalMs(((OlapTable) parsedStmt.getTargetTable()).getGroupCommitIntervalMs()); + // TODO get from table property + result.setGroupCommitDataBytes(134217728L); result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish); } catch (UserException e) { LOG.warn("exec sql error", e); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 209947c8b78..75e67722113 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -670,6 +670,7 @@ struct TStreamLoadPutResult { 6: optional i64 table_id 7: optional bool wait_internal_group_commit_finish = false 8: optional i64 group_commit_interval_ms + 9: optional i64 group_commit_data_bytes } struct TStreamLoadMultiTablePutResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org