This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new a5703ef [Performance] Support sharding txn_map_lock into more small map locks to make good performance for txn manage task (#3222) a5703ef is described below commit a5703ef114ee5be2657481f6393e2bca054fd48f Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Thu Apr 9 09:35:15 2020 -0500 [Performance] Support sharding txn_map_lock into more small map locks to make good performance for txn manage task (#3222) This PR is to enhance the performance for txn manage task, when there are so many txn in BE, the only one txn_map_lock and additional _txn_locks may cause poor performance, and now we remove the additional _txn_locks and split the txn_map_lock into many small locks. --- be/src/common/config.h | 12 +- be/src/olap/storage_engine.cpp | 2 +- be/src/olap/tablet_manager.cpp | 1 + be/src/olap/tablet_manager.h | 2 +- be/src/olap/txn_manager.cpp | 236 ++++++++++++++------------ be/src/olap/txn_manager.h | 75 ++++++-- be/test/olap/olap_snapshot_converter_test.cpp | 1 - be/test/olap/tablet_mgr_test.cpp | 6 +- be/test/olap/txn_manager_test.cpp | 48 +++--- 9 files changed, 231 insertions(+), 152 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 93a0bff..9b06918 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -491,9 +491,9 @@ namespace config { // brpc config, 200M CONF_Int64(brpc_max_body_size, "209715200") - // max number of txns in txn manager + // max number of txns for every txn_partition_map in txn manager // this is a self protection to avoid too many txns saving in manager - CONF_mInt64(max_runnings_transactions, "2000"); + CONF_mInt64(max_runnings_transactions_per_txn_map, "100"); // tablet_map_lock shard size, the value is 2^n, n=0,1,2,3,4 // this is a an enhancement for better performance to manage tablet @@ -501,6 +501,14 @@ namespace config { CONF_String(plugin_path, "${DORIS_HOME}/plugin") + // txn_map_lock shard size, the value is 2^n, n=0,1,2,3,4 + // this is a an enhancement for better performance to manage txn + CONF_Int32(txn_map_shard_size, "128"); + + // txn_lock shard size, the value is 2^n, n=0,1,2,3,4 + // this is a an enhancement for better performance to publish txn + CONF_Int32(txn_shard_size, "1024") + } // namespace config } // namespace doris diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 16b9025..a8c8967 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -114,7 +114,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) _is_all_cluster_id_exist(true), _index_stream_lru_cache(NULL), _tablet_manager(new TabletManager(config::tablet_map_shard_size)), - _txn_manager(new TxnManager()), + _txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)), _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)), _memtable_flush_executor(nullptr), _block_manager(nullptr), diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 793fd45..c31ec5f 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -70,6 +70,7 @@ TabletManager::TabletManager(int32_t tablet_map_lock_shard_size) : _tablet_map_lock_shard_size(tablet_map_lock_shard_size), _last_update_stat_ms(0) { DCHECK_GT(_tablet_map_lock_shard_size, 0); + DCHECK_EQ(_tablet_map_lock_shard_size & (tablet_map_lock_shard_size - 1), 0); _tablet_map_lock_array = new RWMutex[_tablet_map_lock_shard_size]; _tablet_map_array = new tablet_map_t[_tablet_map_lock_shard_size]; } diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 263b14b..b47be43 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -193,7 +193,7 @@ private: // tablet_id -> TabletInstances typedef std::unordered_map<int64_t, TableInstances> tablet_map_t; - int32_t _tablet_map_lock_shard_size; + const int32_t _tablet_map_lock_shard_size; // _tablet_map_lock_array[i] protect _tablet_map_array[i], i=0,1,2...,and i < _tablet_map_lock_shard_size RWMutex *_tablet_map_lock_array; tablet_map_t *_tablet_map_array; diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index c098c5b..8fdec4d 100755 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "olap/storage_engine.h" +#include "txn_manager.h" #include <signal.h> @@ -36,6 +36,7 @@ #include "olap/base_compaction.h" #include "olap/cumulative_compaction.h" #include "olap/lru_cache.h" +#include "olap/storage_engine.h" #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" #include "olap/push_handler.h" @@ -70,10 +71,17 @@ using std::vector; namespace doris { -TxnManager::TxnManager() { - for (int i = 0; i < _txn_lock_num; ++i) { - _txn_locks[i] = std::make_shared<RWMutex>(); - } +TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size) + : _txn_map_shard_size(txn_map_shard_size), + _txn_shard_size(txn_shard_size) { + DCHECK_GT(_txn_map_shard_size, 0); + DCHECK_GT(_txn_shard_size, 0); + DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0); + DCHECK_EQ(_txn_shard_size & (_txn_shard_size - 1), 0); + _txn_map_locks = new RWMutex[_txn_map_shard_size]; + _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size]; + _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size]; + _txn_mutex = new Mutex[_txn_shard_size]; } OLAPStatus TxnManager::prepare_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, @@ -111,12 +119,12 @@ OLAPStatus TxnManager::prepare_txn( TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid, const PUniqueId& load_id) { - pair<int64_t, int64_t> key(partition_id, transaction_id); + TxnKey key(partition_id, transaction_id); TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid); - WriteLock wrlock(_get_txn_lock(transaction_id)); - WriteLock txn_wrlock(&_txn_map_lock); - auto it = _txn_tablet_map.find(key); - if (it != _txn_tablet_map.end()) { + WriteLock txn_wrlock(&_get_txn_map_lock(transaction_id)); + txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); + if (it != txn_tablet_map.end()) { auto load_itr = it->second.find(tablet_info); if (load_itr != it->second.end()) { // found load for txn,tablet @@ -137,8 +145,9 @@ OLAPStatus TxnManager::prepare_txn( // check if there are too many transactions on running. // if yes, reject the request. - if (_txn_partition_map.size() > config::max_runnings_transactions) { - LOG(WARNING) << "too many transactions: " << _txn_tablet_map.size() << ", limit: " << config::max_runnings_transactions; + txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); + if (txn_partition_map.size() > config::max_runnings_transactions_per_txn_map) { + LOG(WARNING) << "too many transactions: " << txn_tablet_map.size() << ", limit: " << config::max_runnings_transactions_per_txn_map; return OLAP_ERR_TOO_MANY_TRANSACTIONS; } @@ -146,7 +155,7 @@ OLAPStatus TxnManager::prepare_txn( // case 1: user start a new txn, rowset_ptr = null // case 2: loading txn from meta env TabletTxnInfo load_info(load_id, nullptr); - _txn_tablet_map[key][tablet_info] = load_info; + txn_tablet_map[key][tablet_info] = load_info; _insert_txn_partition_map_unlocked(transaction_id, partition_id); VLOG(3) << "add transaction to engine successfully." @@ -175,12 +184,13 @@ OLAPStatus TxnManager::commit_txn( << ", tablet: " << tablet_info.to_string(); return OLAP_ERR_ROWSET_INVALID; } - WriteLock wrlock(_get_txn_lock(transaction_id)); + { // get tx - ReadLock rdlock(&_txn_map_lock); - auto it = _txn_tablet_map.find(key); - if (it != _txn_tablet_map.end()) { + ReadLock rdlock(&_get_txn_map_lock(transaction_id)); + txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); + if (it != txn_tablet_map.end()) { auto load_itr = it->second.find(tablet_info); if (load_itr != it->second.end()) { // found load for txn,tablet @@ -232,9 +242,10 @@ OLAPStatus TxnManager::commit_txn( } { - WriteLock wrlock(&_txn_map_lock); + WriteLock wrlock(&_get_txn_map_lock(transaction_id)); TabletTxnInfo load_info(load_id, rowset_ptr); - _txn_tablet_map[key][tablet_info] = load_info; + txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + txn_tablet_map[key][tablet_info] = load_info; _insert_txn_partition_map_unlocked(transaction_id, partition_id); LOG(INFO) << "commit transaction to engine successfully." << " partition_id: " << key.first @@ -253,11 +264,12 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT pair<int64_t, int64_t> key(partition_id, transaction_id); TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid); RowsetSharedPtr rowset_ptr = nullptr; - WriteLock wrlock(_get_txn_lock(transaction_id)); + MutexLock txn_lock(&_get_txn_lock(transaction_id)); { - ReadLock rlock(&_txn_map_lock); - auto it = _txn_tablet_map.find(key); - if (it != _txn_tablet_map.end()) { + ReadLock rlock(&_get_txn_map_lock(transaction_id)); + txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); + if (it != txn_tablet_map.end()) { auto load_itr = it->second.find(tablet_info); if (load_itr != it->second.end()) { // found load for txn,tablet @@ -287,9 +299,10 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT return OLAP_ERR_TRANSACTION_NOT_EXIST; } { - WriteLock wrlock(&_txn_map_lock); - auto it = _txn_tablet_map.find(key); - if (it != _txn_tablet_map.end()) { + WriteLock wrlock(&_get_txn_map_lock(transaction_id)); + txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); + if (it != txn_tablet_map.end()) { it->second.erase(tablet_info); LOG(INFO) << "publish txn successfully." << " partition_id: " << key.first @@ -297,7 +310,7 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT << ", tablet: " << tablet_info.to_string() << ", rowsetid: " << rowset_ptr->rowset_id(); if (it->second.empty()) { - _txn_tablet_map.erase(it); + txn_tablet_map.erase(it); _clear_txn_partition_map_unlocked(transaction_id, partition_id); } } @@ -313,10 +326,10 @@ OLAPStatus TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId tr TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) { pair<int64_t, int64_t> key(partition_id, transaction_id); TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid); - WriteLock wrlock(_get_txn_lock(transaction_id)); - WriteLock txn_wrlock(&_txn_map_lock); - auto it = _txn_tablet_map.find(key); - if (it != _txn_tablet_map.end()) { + WriteLock wrlock(&_get_txn_map_lock(transaction_id)); + txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); + if (it != txn_tablet_map.end()) { auto load_itr = it->second.find(tablet_info); if (load_itr != it->second.end()) { // found load for txn,tablet @@ -334,7 +347,7 @@ OLAPStatus TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId tr << ", transaction_id: " << key.second << ", tablet: " << tablet_info.to_string(); if (it->second.empty()) { - _txn_tablet_map.erase(it); + txn_tablet_map.erase(it); _clear_txn_partition_map_unlocked(transaction_id, partition_id); } } @@ -347,10 +360,10 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, TTr TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) { pair<int64_t, int64_t> key(partition_id, transaction_id); TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid); - WriteLock wrlock(_get_txn_lock(transaction_id)); - WriteLock txn_wrlock(&_txn_map_lock); - auto it = _txn_tablet_map.find(key); - if (it == _txn_tablet_map.end()) { + WriteLock txn_wrlock(&_get_txn_map_lock(transaction_id)); + txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); + if (it == txn_tablet_map.end()) { return OLAP_ERR_TRANSACTION_NOT_EXIST; } auto load_itr = it->second.find(tablet_info); @@ -384,7 +397,7 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, TTr } it->second.erase(tablet_info); if (it->second.empty()) { - _txn_tablet_map.erase(it); + txn_tablet_map.erase(it); _clear_txn_partition_map_unlocked(transaction_id, partition_id); } return OLAP_SUCCESS; @@ -398,15 +411,18 @@ void TxnManager::get_tablet_related_txns(TTabletId tablet_id, SchemaHash schema_ } TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid); - ReadLock txn_rdlock(&_txn_map_lock); - for (auto& it : _txn_tablet_map) { - if (it.second.find(tablet_info) != it.second.end()) { - *partition_id = it.first.first; - transaction_ids->insert(it.first.second); - VLOG(3) << "find transaction on tablet." - << "partition_id: " << it.first.first - << ", transaction_id: " << it.first.second - << ", tablet: " << tablet_info.to_string(); + for (int32_t i = 0; i < _txn_map_shard_size; i++) { + ReadLock txn_rdlock(&_txn_map_locks[i]); + txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i]; + for (auto& it : txn_tablet_map) { + if (it.second.find(tablet_info) != it.second.end()) { + *partition_id = it.first.first; + transaction_ids->insert(it.first.second); + VLOG(3) << "find transaction on tablet." + << "partition_id: " << it.first.first + << ", transaction_id: " << it.first.second + << ", tablet: " << tablet_info.to_string(); + } } } } @@ -415,33 +431,36 @@ void TxnManager::get_tablet_related_txns(TTabletId tablet_id, SchemaHash schema_ // maybe lock error, because not get txn lock before remove from meta void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) { TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid); - WriteLock txn_wrlock(&_txn_map_lock); - for (auto it = _txn_tablet_map.begin(); it != _txn_tablet_map.end();) { - auto load_itr = it->second.find(tablet_info); - if (load_itr != it->second.end()) { - TabletTxnInfo& load_info = load_itr->second; - if (load_info.rowset != nullptr && meta != nullptr) { - LOG(INFO) << " delete transaction from engine " - << ", tablet: " << tablet_info.to_string() - << ", rowset id: " << load_info.rowset->rowset_id(); - RowsetMetaManager::remove(meta, tablet_uid, - load_info.rowset->rowset_id()); - } - LOG(INFO) << "remove tablet related txn." - << " partition_id: " << it->first.first - << ", transaction_id: " << it->first.second - << ", tablet: " << tablet_info.to_string() << ", rowset: " - << (load_info.rowset != nullptr + for (int32_t i = 0; i < _txn_map_shard_size; i++) { + WriteLock txn_wrlock(&_txn_map_locks[i]); + txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i]; + for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) { + auto load_itr = it->second.find(tablet_info); + if (load_itr != it->second.end()) { + TabletTxnInfo& load_info = load_itr->second; + if (load_info.rowset != nullptr && meta != nullptr) { + LOG(INFO) << " delete transaction from engine " + << ", tablet: " << tablet_info.to_string() + << ", rowset id: " << load_info.rowset->rowset_id(); + RowsetMetaManager::remove(meta, tablet_uid, + load_info.rowset->rowset_id()); + } + LOG(INFO) << "remove tablet related txn." + << " partition_id: " << it->first.first + << ", transaction_id: " << it->first.second + << ", tablet: " << tablet_info.to_string() << ", rowset: " + << (load_info.rowset != nullptr ? load_info.rowset->rowset_id().to_string() : "0"); - it->second.erase(tablet_info); - } - if (it->second.empty()) { - _clear_txn_partition_map_unlocked(it->first.second, - it->first.first); - it = _txn_tablet_map.erase(it); - } else { - ++it; + it->second.erase(tablet_info); + } + if (it->second.empty()) { + _clear_txn_partition_map_unlocked(it->first.second, + it->first.first); + it = txn_tablet_map.erase(it); + } else { + ++it; + } } } } @@ -451,10 +470,10 @@ void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id, std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) { // get tablets in this transaction pair<int64_t, int64_t> key(partition_id, transaction_id); - ReadLock rdlock(_get_txn_lock(transaction_id)); - ReadLock txn_rdlock(&_txn_map_lock); - auto it = _txn_tablet_map.find(key); - if (it == _txn_tablet_map.end()) { + ReadLock txn_rdlock(&_get_txn_map_lock(transaction_id)); + txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); + if (it == txn_tablet_map.end()) { VLOG(3) << "could not find tablet for" << " partition_id=" << partition_id << ", transaction_id=" << transaction_id; @@ -472,10 +491,12 @@ void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id, } void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) { - ReadLock txn_rdlock(&_txn_map_lock); - for (auto& it : _txn_tablet_map) { - for (auto& tablet_load_it : it.second) { - tablet_infos->emplace(tablet_load_it.first); + for (int32_t i = 0; i < _txn_map_shard_size; i++) { + ReadLock txn_rdlock(&_txn_map_locks[i]); + for (auto& it : _txn_tablet_maps[i]) { + for (auto& tablet_load_it : it.second) { + tablet_infos->emplace(tablet_load_it.first); + } } } } @@ -484,10 +505,10 @@ bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_i TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) { pair<int64_t, int64_t> key(partition_id, transaction_id); TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid); - ReadLock rdlock(_get_txn_lock(transaction_id)); - ReadLock txn_rdlock(&_txn_map_lock); - auto it = _txn_tablet_map.find(key); - bool found = it != _txn_tablet_map.end() + ReadLock txn_rdlock(&_get_txn_map_lock(transaction_id)); + txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); + bool found = it != txn_tablet_map.end() && it->second.find(tablet_info) != it->second.end(); return found; @@ -496,18 +517,20 @@ bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_i void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) { int64_t now = UnixSeconds(); // traverse the txn map, and get all expired txns - ReadLock txn_rdlock(&_txn_map_lock); - for (auto& it : _txn_tablet_map) { - auto txn_id = it.first.second; - for (auto& t_map : it.second) { - double diff = difftime(now, t_map.second.creation_time); - if (diff >= config::pending_data_expire_time_sec) { - (*expire_txn_map)[t_map.first].push_back(txn_id); - if (VLOG_IS_ON(3)) { - VLOG(3) << "find expired txn." - << " tablet=" << t_map.first.to_string() - << " transaction_id=" << txn_id - << " exist_sec=" << diff; + for (int32_t i = 0; i < _txn_map_shard_size; i++) { + ReadLock txn_rdlock(&_txn_map_locks[i]); + for (auto& it : _txn_tablet_maps[i]) { + auto txn_id = it.first.second; + for (auto& t_map : it.second) { + double diff = difftime(now, t_map.second.creation_time); + if (diff >= config::pending_data_expire_time_sec) { + (*expire_txn_map)[t_map.first].push_back(txn_id); + if (VLOG_IS_ON(3)) { + VLOG(3) << "find expired txn." + << " tablet=" << t_map.first.to_string() + << " transaction_id=" << txn_id + << " exist_sec=" << diff; + } } } } @@ -515,9 +538,10 @@ void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>> } void TxnManager::get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids) { - ReadLock txn_rdlock(&_txn_map_lock); - auto it = _txn_partition_map.find(transaction_id); - if (it != _txn_partition_map.end()) { + ReadLock txn_rdlock(&_get_txn_map_lock(transaction_id)); + txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); + auto it = txn_partition_map.find(transaction_id); + if (it != txn_partition_map.end()) { for (int64_t partition_id : it->second) { partition_ids->push_back(partition_id); } @@ -525,19 +549,21 @@ void TxnManager::get_partition_ids(const TTransactionId transaction_id, std::vec } void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { - auto find = _txn_partition_map.find(transaction_id); - if (find == _txn_partition_map.end()) { - _txn_partition_map[transaction_id] = std::unordered_set<int64_t>(); + txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); + auto find = txn_partition_map.find(transaction_id); + if (find == txn_partition_map.end()) { + txn_partition_map[transaction_id] = std::unordered_set<int64_t>(); } - _txn_partition_map[transaction_id].insert(partition_id); + txn_partition_map[transaction_id].insert(partition_id); } void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { - auto it = _txn_partition_map.find(transaction_id); - if (it != _txn_partition_map.end()) { + txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); + auto it = txn_partition_map.find(transaction_id); + if (it != txn_partition_map.end()) { it->second.erase(partition_id); if (it->second.empty()) { - _txn_partition_map.erase(it); + txn_partition_map.erase(it); } } } diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 96736d0..75ad152 100755 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -64,15 +64,17 @@ struct TabletTxnInfo { TabletTxnInfo() {} }; + // txn manager is used to manage mapping between tablet and txns class TxnManager { public: - TxnManager(); + TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size); ~TxnManager() { - _txn_tablet_map.clear(); - _txn_partition_map.clear(); - _txn_locks.clear(); + delete [] _txn_tablet_maps; + delete [] _txn_partition_maps; + delete [] _txn_map_locks; + delete [] _txn_mutex; } OLAPStatus prepare_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, @@ -140,9 +142,35 @@ public: void get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids); private: - RWMutex* _get_txn_lock(TTransactionId txn_id) { - return _txn_locks[txn_id % _txn_lock_num].get(); - } + + using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id; + + // implement TxnKey hash function to support TxnKey as a key for unordered_map + struct TxnKeyHash { + template<typename T, typename U> + size_t operator()(const std::pair<T, U> &e) const { + return std::hash<T>()(e.first) ^ std::hash<U>()(e.second); + } + }; + + // implement TxnKey equal function to support TxnKey as a key for unordered_map + struct TxnKeyEqual { + template <class T, typename U> + bool operator()(const std::pair<T, U> &l, const std::pair<T, U> &r) const{ + return l.first == r.first && l.second == r.second; + } + }; + + typedef std::unordered_map<TxnKey, std::map<TabletInfo, TabletTxnInfo>, TxnKeyHash, TxnKeyEqual> txn_tablet_map_t; + typedef std::unordered_map<int64_t, std::unordered_set<int64_t>> txn_partition_map_t; + + inline RWMutex& _get_txn_map_lock(TTransactionId transactionId); + + inline txn_tablet_map_t& _get_txn_tablet_map(TTransactionId transactionId); + + inline txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId); + + inline Mutex& _get_txn_lock(TTransactionId transactionId); // insert or remove (transaction_id, partition_id) from _txn_partition_map // get _txn_map_lock before calling @@ -150,20 +178,39 @@ private: void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); private: - RWMutex _txn_map_lock; - using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id; - std::map<TxnKey, std::map<TabletInfo, TabletTxnInfo>> _txn_tablet_map; + const int32_t _txn_map_shard_size; + + const int32_t _txn_shard_size; + + // _txn_map_locks[i] protect _txn_tablet_maps[i], i=0,1,2...,and i < _txn_map_shard_size + txn_tablet_map_t *_txn_tablet_maps; // transaction_id -> corresponding partition ids // This is mainly for the clear txn task received from FE, which may only has transaction id, // so we need this map to find out which partitions are corresponding to a transaction id. - // This map should be constructed/deconstructed/modified alongside with '_txn_tablet_map' - std::unordered_map<int64_t, std::unordered_set<int64_t>> _txn_partition_map; + // The _txn_partition_maps[i] should be constructed/deconstructed/modified alongside with '_txn_tablet_maps[i]' + txn_partition_map_t *_txn_partition_maps; - const int32_t _txn_lock_num = 100; - std::map<int32_t, std::shared_ptr<RWMutex>> _txn_locks; + RWMutex *_txn_map_locks; + Mutex *_txn_mutex; DISALLOW_COPY_AND_ASSIGN(TxnManager); }; // TxnManager +inline RWMutex& TxnManager::_get_txn_map_lock(TTransactionId transactionId) { + return _txn_map_locks[transactionId & (_txn_map_shard_size - 1)]; +} + +inline TxnManager::txn_tablet_map_t& TxnManager::_get_txn_tablet_map(TTransactionId transactionId) { + return _txn_tablet_maps[transactionId & (_txn_map_shard_size - 1)]; +} + +inline TxnManager::txn_partition_map_t& TxnManager::_get_txn_partition_map(TTransactionId transactionId) { + return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)]; +} + +inline Mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) { + return _txn_mutex[transactionId & (_txn_shard_size - 1)]; +} + } #endif // DORIS_BE_SRC_OLAP_TXN_MANAGER_H diff --git a/be/test/olap/olap_snapshot_converter_test.cpp b/be/test/olap/olap_snapshot_converter_test.cpp index 3bcabf6..f554499 100644 --- a/be/test/olap/olap_snapshot_converter_test.cpp +++ b/be/test/olap/olap_snapshot_converter_test.cpp @@ -107,7 +107,6 @@ private: DataDir* _data_dir; OlapMeta* _meta; std::string _json_rowset_meta; - TxnManager _txn_mgr; std::string _engine_data_path; std::string _meta_path; int64_t _tablet_id; diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp index 44e8593..d78fe60 100644 --- a/be/test/olap/tablet_mgr_test.cpp +++ b/be/test/olap/tablet_mgr_test.cpp @@ -78,7 +78,7 @@ public: + "/" + std::to_string(0) + "/" + std::to_string(_tablet_id) + "/" + std::to_string(_schema_hash); - _tablet_mgr = new TabletManager(1); + _tablet_mgr.reset(new TabletManager(1)); } virtual void TearDown() { @@ -86,18 +86,16 @@ public: if (boost::filesystem::exists(_engine_data_path)) { ASSERT_TRUE(boost::filesystem::remove_all(_engine_data_path)); } - delete _tablet_mgr; } private: DataDir* _data_dir; std::string _json_rowset_meta; - TxnManager _txn_mgr; std::string _engine_data_path; int64_t _tablet_id; int32_t _schema_hash; string _tablet_data_path; - TabletManager* _tablet_mgr; + std::unique_ptr<TabletManager> _tablet_mgr; }; TEST_F(TabletMgrTest, CreateTablet) { diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp index 4f25d0b..7725810 100644 --- a/be/test/olap/txn_manager_test.cpp +++ b/be/test/olap/txn_manager_test.cpp @@ -93,8 +93,8 @@ public: } virtual void SetUp() { - config::max_runnings_transactions = 1000; - + config::max_runnings_transactions_per_txn_map = 500; + _txn_mgr.reset(new TxnManager(64, 1024)); std::vector<StorePath> paths; paths.emplace_back("_engine_data_path", -1); EngineOptions options; @@ -162,7 +162,7 @@ public: private: OlapMeta* _meta; std::string _json_rowset_meta; - TxnManager _txn_mgr; + std::unique_ptr<TxnManager> _txn_mgr; TPartitionId partition_id = 1123; TTransactionId transaction_id = 111; TTabletId tablet_id = 222; @@ -176,7 +176,7 @@ private: }; TEST_F(TxnManagerTest, PrepareNewTxn) { - OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id, + OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id); ASSERT_TRUE(status == OLAP_SUCCESS); } @@ -185,9 +185,9 @@ TEST_F(TxnManagerTest, PrepareNewTxn) { // 2. commit txn // 3. should be success TEST_F(TxnManagerTest, CommitTxnWithPrepare) { - OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id, + OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id); - _txn_mgr.commit_txn(_meta, partition_id, transaction_id, + _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false); ASSERT_TRUE(status == OLAP_SUCCESS); RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta()); @@ -199,7 +199,7 @@ TEST_F(TxnManagerTest, CommitTxnWithPrepare) { // 1. commit without prepare // 2. should success TEST_F(TxnManagerTest, CommitTxnWithNoPrepare) { - OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id, + OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false); ASSERT_TRUE(status == OLAP_SUCCESS); } @@ -207,10 +207,10 @@ TEST_F(TxnManagerTest, CommitTxnWithNoPrepare) { // 1. commit twice with different rowset id // 2. should failed TEST_F(TxnManagerTest, CommitTxnTwiceWithDiffRowsetId) { - OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id, + OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false); ASSERT_TRUE(status == OLAP_SUCCESS); - status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id, + status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset_diff_id, false); ASSERT_TRUE(status != OLAP_SUCCESS); } @@ -218,30 +218,30 @@ TEST_F(TxnManagerTest, CommitTxnTwiceWithDiffRowsetId) { // 1. commit twice with same rowset id // 2. should success TEST_F(TxnManagerTest, CommitTxnTwiceWithSameRowsetId) { - OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id, + OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false); ASSERT_TRUE(status == OLAP_SUCCESS); - status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id, + status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset_same_id, false); ASSERT_TRUE(status == OLAP_SUCCESS); } // 1. prepare twice should be success TEST_F(TxnManagerTest, PrepareNewTxnTwice) { - OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id, + OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id); ASSERT_TRUE(status == OLAP_SUCCESS); - status = _txn_mgr.prepare_txn(partition_id, transaction_id, + status = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id); ASSERT_TRUE(status == OLAP_SUCCESS); } // 1. txn could be rollbacked if it is not committed TEST_F(TxnManagerTest, RollbackNotCommittedTxn) { - OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id, + OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id); ASSERT_TRUE(status == OLAP_SUCCESS); - status = _txn_mgr.rollback_txn(partition_id, transaction_id, + status = _txn_mgr->rollback_txn(partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid); ASSERT_TRUE(status == OLAP_SUCCESS); RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta()); @@ -251,10 +251,10 @@ TEST_F(TxnManagerTest, RollbackNotCommittedTxn) { // 1. txn could not be rollbacked if it is committed TEST_F(TxnManagerTest, RollbackCommittedTxn) { - OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id, + OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false); ASSERT_TRUE(status == OLAP_SUCCESS); - status = _txn_mgr.rollback_txn(partition_id, transaction_id, + status = _txn_mgr->rollback_txn(partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid); ASSERT_FALSE(status == OLAP_SUCCESS); RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta()); @@ -265,12 +265,12 @@ TEST_F(TxnManagerTest, RollbackCommittedTxn) { // 1. publish version success TEST_F(TxnManagerTest, PublishVersionSuccessful) { - OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id, + OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false); ASSERT_TRUE(status == OLAP_SUCCESS); Version new_version(10,11); VersionHash new_versionhash = 123; - status = _txn_mgr.publish_txn(_meta, partition_id, transaction_id, + status = _txn_mgr->publish_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, new_version, new_versionhash); ASSERT_TRUE(status == OLAP_SUCCESS); @@ -286,28 +286,28 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) { TEST_F(TxnManagerTest, PublishNotExistedTxn) { Version new_version(10,11); VersionHash new_versionhash = 123; - OLAPStatus status = _txn_mgr.publish_txn(_meta, partition_id, transaction_id, + OLAPStatus status = _txn_mgr->publish_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, new_version, new_versionhash); ASSERT_TRUE(status != OLAP_SUCCESS); } TEST_F(TxnManagerTest, DeletePreparedTxn) { - OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id, + OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id); ASSERT_TRUE(status == OLAP_SUCCESS); - status = _txn_mgr.delete_txn(_meta, partition_id, transaction_id, + status = _txn_mgr->delete_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid); ASSERT_TRUE(status == OLAP_SUCCESS); } TEST_F(TxnManagerTest, DeleteCommittedTxn) { - OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id, + OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false); ASSERT_TRUE(status == OLAP_SUCCESS); RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta()); status = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _alpha_rowset->rowset_id(), rowset_meta); ASSERT_TRUE(status == OLAP_SUCCESS); - status = _txn_mgr.delete_txn(_meta, partition_id, transaction_id, + status = _txn_mgr->delete_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, _tablet_uid); ASSERT_TRUE(status == OLAP_SUCCESS); RowsetMetaSharedPtr rowset_meta2(new AlphaRowsetMeta()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org