This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 687eb44b9756ab90538cfcf009699a96be26eff9 Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Fri Jul 7 16:18:47 2023 +0800 [enhancement](merge-on-write) add more version and txn information for mow publish (#21257) --- be/src/olap/task/engine_publish_version_task.cpp | 28 ++++++++++++++---- be/src/olap/txn_manager.cpp | 36 ++++++++++++++++++++++++ be/src/olap/txn_manager.h | 5 ++++ be/test/olap/txn_manager_test.cpp | 20 +++++++++++++ 4 files changed, 83 insertions(+), 6 deletions(-) diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 249a7a424c..275f34c36e 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -147,6 +147,13 @@ Status EnginePublishVersionTask::finish() { // here and wait pre version publish or lock timeout if (tablet->keys_type() == KeysType::UNIQUE_KEYS && tablet->enable_unique_key_merge_on_write()) { + bool first_time_update = false; + if (StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version( + tablet_info.tablet_id, version.second) < 0) { + first_time_update = true; + StorageEngine::instance()->txn_manager()->update_tablet_version_txn( + tablet_info.tablet_id, version.second, transaction_id); + } Version max_version; TabletState tablet_state; { @@ -156,12 +163,6 @@ Status EnginePublishVersionTask::finish() { } if (tablet_state == TabletState::TABLET_RUNNING && version.first != max_version.second + 1) { - LOG_EVERY_SECOND(INFO) - << "uniq key with merge-on-write version not continuous, " - "current max version=" - << max_version.second << ", publish_version=" << version.first - << ", tablet_id=" << tablet->tablet_id() - << ", transaction_id=" << _publish_version_req.transaction_id; // If a tablet migrates out and back, the previously failed // publish task may retry on the new tablet, so check // whether the version exists. if not exist, then set @@ -171,6 +172,21 @@ Status EnginePublishVersionTask::finish() { _discontinuous_version_tablets->emplace_back( partition_id, tablet_info.tablet_id, version.first); res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(); + int64_t missed_version = max_version.second + 1; + int64_t missed_txn_id = + StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version( + tablet->tablet_id(), missed_version); + auto msg = fmt::format( + "uniq key with merge-on-write version not continuous, " + "missed version={}, it's transaction_id={}, current publish " + "version={}, tablet_id={}, transaction_id={}", + missed_version, missed_txn_id, version.second, tablet->tablet_id(), + _publish_version_req.transaction_id); + if (first_time_update) { + LOG(INFO) << msg; + } else { + LOG_EVERY_SECOND(INFO) << msg; + } } continue; } diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index ebcb287389..dd65059dc3 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -82,6 +82,9 @@ TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size) _txn_mutex = new std::shared_mutex[_txn_shard_size]; _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size]; _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size]; + // For debugging + _tablet_version_cache = + new ShardedLRUCache("TabletVersionCache", 100000, LRUCacheType::NUMBER, 32); } // prepare txn should always be allowed because ingest task will be retried @@ -745,4 +748,37 @@ void TxnManager::clear_txn_tablet_delta_writer(int64_t transaction_id) { VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id; } +int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version) { + char key[16]; + memcpy(key, &tablet_id, sizeof(int64_t)); + memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); + CacheKey cache_key((const char*)&key, sizeof(key)); + + auto handle = _tablet_version_cache->lookup(cache_key); + if (handle == nullptr) { + return -1; + } + int64_t res = *(int64_t*)_tablet_version_cache->value(handle); + _tablet_version_cache->release(handle); + return res; +} + +void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id) { + char key[16]; + memcpy(key, &tablet_id, sizeof(int64_t)); + memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); + CacheKey cache_key((const char*)&key, sizeof(key)); + + int64_t* value = new int64_t; + *value = txn_id; + auto deleter = [](const doris::CacheKey& key, void* value) { + int64_t* cache_value = (int64_t*)value; + delete cache_value; + }; + + auto handle = _tablet_version_cache->insert(cache_key, value, sizeof(txn_id), deleter, + CachePriority::NORMAL, sizeof(txn_id)); + _tablet_version_cache->release(handle); +} + } // namespace doris diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index fcebe2d9d1..42476c5acb 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -90,6 +90,7 @@ public: delete[] _txn_mutex; delete[] _txn_tablet_delta_writer_map; delete[] _txn_tablet_delta_writer_map_locks; + delete _tablet_version_cache; } // add a txn to manager @@ -172,6 +173,9 @@ public: DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids); + int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version); + void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id); + private: using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id; @@ -232,6 +236,7 @@ private: std::shared_mutex* _txn_mutex; txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map; + ShardedLRUCache* _tablet_version_cache; std::shared_mutex* _txn_tablet_delta_writer_map_locks; DISALLOW_COPY_AND_ASSIGN(TxnManager); }; // TxnManager diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp index 146836cdc2..e19bdf9990 100644 --- a/be/test/olap/txn_manager_test.cpp +++ b/be/test/olap/txn_manager_test.cpp @@ -332,4 +332,24 @@ TEST_F(TxnManagerTest, DeleteCommittedTxn) { EXPECT_TRUE(status != Status::OK()); } +TEST_F(TxnManagerTest, TabletVersionCache) { + std::unique_ptr<TxnManager> txn_mgr = std::make_unique<TxnManager>(64, 1024); + txn_mgr->update_tablet_version_txn(123, 100, 456); + txn_mgr->update_tablet_version_txn(124, 100, 567); + int64_t tx1 = txn_mgr->get_txn_by_tablet_version(123, 100); + EXPECT_EQ(tx1, 456); + int64_t tx2 = txn_mgr->get_txn_by_tablet_version(124, 100); + EXPECT_EQ(tx2, 567); + int64_t tx3 = txn_mgr->get_txn_by_tablet_version(124, 101); + EXPECT_EQ(tx3, -1); + txn_mgr->update_tablet_version_txn(123, 101, 888); + txn_mgr->update_tablet_version_txn(124, 101, 890); + int64_t tx4 = txn_mgr->get_txn_by_tablet_version(123, 100); + EXPECT_EQ(tx4, 456); + int64_t tx5 = txn_mgr->get_txn_by_tablet_version(123, 101); + EXPECT_EQ(tx5, 888); + int64_t tx6 = txn_mgr->get_txn_by_tablet_version(124, 101); + EXPECT_EQ(tx6, 890); +} + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org