This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 687eb44b9756ab90538cfcf009699a96be26eff9
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Fri Jul 7 16:18:47 2023 +0800

    [enhancement](merge-on-write) add more version and txn information for mow 
publish (#21257)
---
 be/src/olap/task/engine_publish_version_task.cpp | 28 ++++++++++++++----
 be/src/olap/txn_manager.cpp                      | 36 ++++++++++++++++++++++++
 be/src/olap/txn_manager.h                        |  5 ++++
 be/test/olap/txn_manager_test.cpp                | 20 +++++++++++++
 4 files changed, 83 insertions(+), 6 deletions(-)

diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index 249a7a424c..275f34c36e 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -147,6 +147,13 @@ Status EnginePublishVersionTask::finish() {
             // here and wait pre version publish or lock timeout
             if (tablet->keys_type() == KeysType::UNIQUE_KEYS &&
                 tablet->enable_unique_key_merge_on_write()) {
+                bool first_time_update = false;
+                if 
(StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
+                            tablet_info.tablet_id, version.second) < 0) {
+                    first_time_update = true;
+                    
StorageEngine::instance()->txn_manager()->update_tablet_version_txn(
+                            tablet_info.tablet_id, version.second, 
transaction_id);
+                }
                 Version max_version;
                 TabletState tablet_state;
                 {
@@ -156,12 +163,6 @@ Status EnginePublishVersionTask::finish() {
                 }
                 if (tablet_state == TabletState::TABLET_RUNNING &&
                     version.first != max_version.second + 1) {
-                    LOG_EVERY_SECOND(INFO)
-                            << "uniq key with merge-on-write version not 
continuous, "
-                               "current max version="
-                            << max_version.second << ", publish_version=" << 
version.first
-                            << ", tablet_id=" << tablet->tablet_id()
-                            << ", transaction_id=" << 
_publish_version_req.transaction_id;
                     // If a tablet migrates out and back, the previously failed
                     // publish task may retry on the new tablet, so check
                     // whether the version exists. if not exist, then set
@@ -171,6 +172,21 @@ Status EnginePublishVersionTask::finish() {
                         _discontinuous_version_tablets->emplace_back(
                                 partition_id, tablet_info.tablet_id, 
version.first);
                         res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>();
+                        int64_t missed_version = max_version.second + 1;
+                        int64_t missed_txn_id =
+                                
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
+                                        tablet->tablet_id(), missed_version);
+                        auto msg = fmt::format(
+                                "uniq key with merge-on-write version not 
continuous, "
+                                "missed version={}, it's transaction_id={}, 
current publish "
+                                "version={}, tablet_id={}, transaction_id={}",
+                                missed_version, missed_txn_id, version.second, 
tablet->tablet_id(),
+                                _publish_version_req.transaction_id);
+                        if (first_time_update) {
+                            LOG(INFO) << msg;
+                        } else {
+                            LOG_EVERY_SECOND(INFO) << msg;
+                        }
                     }
                     continue;
                 }
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index ebcb287389..dd65059dc3 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -82,6 +82,9 @@ TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t 
txn_shard_size)
     _txn_mutex = new std::shared_mutex[_txn_shard_size];
     _txn_tablet_delta_writer_map = new 
txn_tablet_delta_writer_map_t[_txn_map_shard_size];
     _txn_tablet_delta_writer_map_locks = new 
std::shared_mutex[_txn_map_shard_size];
+    // For debugging
+    _tablet_version_cache =
+            new ShardedLRUCache("TabletVersionCache", 100000, 
LRUCacheType::NUMBER, 32);
 }
 
 // prepare txn should always be allowed because ingest task will be retried
@@ -745,4 +748,37 @@ void TxnManager::clear_txn_tablet_delta_writer(int64_t 
transaction_id) {
     VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id;
 }
 
+int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t 
version) {
+    char key[16];
+    memcpy(key, &tablet_id, sizeof(int64_t));
+    memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
+    CacheKey cache_key((const char*)&key, sizeof(key));
+
+    auto handle = _tablet_version_cache->lookup(cache_key);
+    if (handle == nullptr) {
+        return -1;
+    }
+    int64_t res = *(int64_t*)_tablet_version_cache->value(handle);
+    _tablet_version_cache->release(handle);
+    return res;
+}
+
+void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, 
int64_t txn_id) {
+    char key[16];
+    memcpy(key, &tablet_id, sizeof(int64_t));
+    memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
+    CacheKey cache_key((const char*)&key, sizeof(key));
+
+    int64_t* value = new int64_t;
+    *value = txn_id;
+    auto deleter = [](const doris::CacheKey& key, void* value) {
+        int64_t* cache_value = (int64_t*)value;
+        delete cache_value;
+    };
+
+    auto handle = _tablet_version_cache->insert(cache_key, value, 
sizeof(txn_id), deleter,
+                                                CachePriority::NORMAL, 
sizeof(txn_id));
+    _tablet_version_cache->release(handle);
+}
+
 } // namespace doris
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index fcebe2d9d1..42476c5acb 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -90,6 +90,7 @@ public:
         delete[] _txn_mutex;
         delete[] _txn_tablet_delta_writer_map;
         delete[] _txn_tablet_delta_writer_map_locks;
+        delete _tablet_version_cache;
     }
 
     // add a txn to manager
@@ -172,6 +173,9 @@ public:
                                        DeleteBitmapPtr delete_bitmap,
                                        const RowsetIdUnorderedSet& rowset_ids);
 
+    int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version);
+    void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t 
txn_id);
+
 private:
     using TxnKey = std::pair<int64_t, int64_t>; // partition_id, 
transaction_id;
 
@@ -232,6 +236,7 @@ private:
     std::shared_mutex* _txn_mutex;
 
     txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map;
+    ShardedLRUCache* _tablet_version_cache;
     std::shared_mutex* _txn_tablet_delta_writer_map_locks;
     DISALLOW_COPY_AND_ASSIGN(TxnManager);
 }; // TxnManager
diff --git a/be/test/olap/txn_manager_test.cpp 
b/be/test/olap/txn_manager_test.cpp
index 146836cdc2..e19bdf9990 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -332,4 +332,24 @@ TEST_F(TxnManagerTest, DeleteCommittedTxn) {
     EXPECT_TRUE(status != Status::OK());
 }
 
+TEST_F(TxnManagerTest, TabletVersionCache) {
+    std::unique_ptr<TxnManager> txn_mgr = std::make_unique<TxnManager>(64, 
1024);
+    txn_mgr->update_tablet_version_txn(123, 100, 456);
+    txn_mgr->update_tablet_version_txn(124, 100, 567);
+    int64_t tx1 = txn_mgr->get_txn_by_tablet_version(123, 100);
+    EXPECT_EQ(tx1, 456);
+    int64_t tx2 = txn_mgr->get_txn_by_tablet_version(124, 100);
+    EXPECT_EQ(tx2, 567);
+    int64_t tx3 = txn_mgr->get_txn_by_tablet_version(124, 101);
+    EXPECT_EQ(tx3, -1);
+    txn_mgr->update_tablet_version_txn(123, 101, 888);
+    txn_mgr->update_tablet_version_txn(124, 101, 890);
+    int64_t tx4 = txn_mgr->get_txn_by_tablet_version(123, 100);
+    EXPECT_EQ(tx4, 456);
+    int64_t tx5 = txn_mgr->get_txn_by_tablet_version(123, 101);
+    EXPECT_EQ(tx5, 888);
+    int64_t tx6 = txn_mgr->get_txn_by_tablet_version(124, 101);
+    EXPECT_EQ(tx6, 890);
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to