xiaokang commented on code in PR #46235: URL: https://github.com/apache/doris/pull/46235#discussion_r1900317860
########## be/src/runtime/fragment_mgr.cpp: ########## @@ -228,6 +228,100 @@ static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries return result; } +inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) { + uint32_t value = HashUtil::hash(&query_id.lo, 8, 0); + value = HashUtil::hash(&query_id.hi, 8, value); + return value % capacity; +} + +inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) { + uint32_t value = HashUtil::hash(&key.first.lo, 8, 0); + value = HashUtil::hash(&key.first.hi, 8, value); + return value % capacity; +} + +template <typename Key, typename Value, typename ValueType> +ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() { + _internal_map.resize(config::num_query_ctx_map_partitions); + for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) { + _internal_map[i] = {std::make_unique<std::shared_mutex>(), + phmap::flat_hash_map<Key, Value>()}; + } +} + +template <typename Key, typename Value, typename ValueType> +Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + auto search = map.find(query_id); + if (search != map.end()) { + return search->second; + } + return std::shared_ptr<ValueType>(nullptr); + } +} + +template <typename Key, typename Value, typename ValueType> +Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists( + const Key& query_id, std::shared_ptr<ValueType>& query_ctx, ApplyFunction&& function) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + auto search = map.find(query_id); + if (search != map.end()) { + query_ctx = search->second.lock(); + } + if (!query_ctx) { + return function(map); + } + return Status::OK(); + } +} + +template <typename Key, typename Value, typename ValueType> +void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + map.erase(query_id); + } +} + +template <typename Key, typename Value, typename ValueType> +void ConcurrentContextMap<Key, Value, ValueType>::insert(const Key& query_id, + std::shared_ptr<ValueType> query_ctx) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); Review Comment: Is read lock enough for insert? ########## be/src/runtime/fragment_mgr.cpp: ########## @@ -228,6 +228,100 @@ static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries return result; } +inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) { + uint32_t value = HashUtil::hash(&query_id.lo, 8, 0); + value = HashUtil::hash(&query_id.hi, 8, value); + return value % capacity; +} + +inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) { + uint32_t value = HashUtil::hash(&key.first.lo, 8, 0); + value = HashUtil::hash(&key.first.hi, 8, value); + return value % capacity; +} + +template <typename Key, typename Value, typename ValueType> +ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() { + _internal_map.resize(config::num_query_ctx_map_partitions); + for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) { + _internal_map[i] = {std::make_unique<std::shared_mutex>(), + phmap::flat_hash_map<Key, Value>()}; + } +} + +template <typename Key, typename Value, typename ValueType> +Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + auto search = map.find(query_id); + if (search != map.end()) { + return search->second; + } + return std::shared_ptr<ValueType>(nullptr); + } +} + +template <typename Key, typename Value, typename ValueType> +Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists( + const Key& query_id, std::shared_ptr<ValueType>& query_ctx, ApplyFunction&& function) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + auto search = map.find(query_id); + if (search != map.end()) { + query_ctx = search->second.lock(); + } + if (!query_ctx) { + return function(map); + } + return Status::OK(); + } +} + +template <typename Key, typename Value, typename ValueType> +void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + map.erase(query_id); + } +} + +template <typename Key, typename Value, typename ValueType> +void ConcurrentContextMap<Key, Value, ValueType>::insert(const Key& query_id, + std::shared_ptr<ValueType> query_ctx) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + map.insert({query_id, query_ctx}); + } +} + +template <typename Key, typename Value, typename ValueType> +void ConcurrentContextMap<Key, Value, ValueType>::clear() { + for (auto& pair : _internal_map) { + std::shared_lock lock(*pair.first); Review Comment: Is read lock enough for clear? ########## be/src/runtime/fragment_mgr.cpp: ########## @@ -228,6 +228,100 @@ static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries return result; } +inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) { + uint32_t value = HashUtil::hash(&query_id.lo, 8, 0); + value = HashUtil::hash(&query_id.hi, 8, value); + return value % capacity; +} + +inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) { + uint32_t value = HashUtil::hash(&key.first.lo, 8, 0); + value = HashUtil::hash(&key.first.hi, 8, value); + return value % capacity; +} + +template <typename Key, typename Value, typename ValueType> +ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() { + _internal_map.resize(config::num_query_ctx_map_partitions); + for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) { + _internal_map[i] = {std::make_unique<std::shared_mutex>(), + phmap::flat_hash_map<Key, Value>()}; + } +} + +template <typename Key, typename Value, typename ValueType> +Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + auto search = map.find(query_id); + if (search != map.end()) { + return search->second; + } + return std::shared_ptr<ValueType>(nullptr); + } +} + +template <typename Key, typename Value, typename ValueType> +Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists( + const Key& query_id, std::shared_ptr<ValueType>& query_ctx, ApplyFunction&& function) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); + auto& map = _internal_map[id].second; + auto search = map.find(query_id); + if (search != map.end()) { + query_ctx = search->second.lock(); + } + if (!query_ctx) { + return function(map); + } + return Status::OK(); + } +} + +template <typename Key, typename Value, typename ValueType> +void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) { + auto id = get_map_id(query_id, _internal_map.size()); + { + std::shared_lock lock(*_internal_map[id].first); Review Comment: Is read lock enough for erase? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org