This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new afcc6170f6c [fix](txn_manager) Add ingested rowsets to unused rowsets 
when removing txn (#37417)
afcc6170f6c is described below

commit afcc6170f6ca8ae8553e6a4a4d21000a0cb905ea
Author: walter <w41te...@gmail.com>
AuthorDate: Wed Jul 10 14:25:44 2024 +0800

    [fix](txn_manager) Add ingested rowsets to unused rowsets when removing txn 
(#37417)
    
    Generally speaking, as long as a rowset has a version, it can be
    considered not to be in a pending state. However, if the rowset was
    created through ingesting binlogs, it will have a version but should
    still be considered in a pending state because the ingesting txn has not
    yet been committed.
    
    This PR updates the condition for determining the pending state. If a
    rowset is COMMITTED, the txn should be allowed to roll back even if a
    version exists.
    
    Cherry-pick #36551
---
 be/src/olap/rowset/rowset.cpp            | 11 +++++++++-
 be/src/olap/rowset/rowset.h              |  1 +
 be/src/olap/storage_engine.cpp           | 10 +++++++--
 be/src/olap/txn_manager.cpp              |  7 +++---
 be/test/olap/test_data/rowset_meta3.json | 22 +++++++++++++++++++
 be/test/olap/txn_manager_test.cpp        | 37 ++++++++++++++++++++++++++++++++
 6 files changed, 82 insertions(+), 6 deletions(-)

diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index f4667d3fb63..b5b68f4d38e 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -30,7 +30,16 @@ static bvar::Adder<size_t> 
g_total_rowset_num("doris_total_rowset_num");
 
 Rowset::Rowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& 
rowset_meta)
         : _rowset_meta(rowset_meta), _refs_by_reader(0) {
-    _is_pending = !_rowset_meta->has_version();
+    _is_pending = true;
+
+    // Generally speaking, as long as a rowset has a version, it can be 
considered not to be in a pending state.
+    // However, if the rowset was created through ingesting binlogs, it will 
have a version but should still be
+    // considered in a pending state because the ingesting txn has not yet 
been committed.
+    if (_rowset_meta->has_version() && _rowset_meta->start_version() > 0 &&
+        _rowset_meta->rowset_state() != COMMITTED) {
+        _is_pending = false;
+    }
+
     if (_is_pending) {
         _is_cumulative = false;
     } else {
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 6194703176f..87cfe0b0bea 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -163,6 +163,7 @@ public:
     int64_t newest_write_timestamp() const { return 
rowset_meta()->newest_write_timestamp(); }
     bool is_segments_overlapping() const { return 
rowset_meta()->is_segments_overlapping(); }
     KeysType keys_type() { return _schema->keys_type(); }
+    RowsetStatePB rowset_meta_state() const { return 
rowset_meta()->rowset_state(); }
 
     // remove all files in this rowset
     // TODO should we rename the method to remove_files() to be more specific?
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 164f312a8da..05fd873fcc6 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -710,8 +710,14 @@ void StorageEngine::clear_transaction_task(const 
TTransactionId transaction_id,
                           << ", tablet_uid=" << tablet_info.first.tablet_uid;
                 continue;
             }
-            
static_cast<void>(StorageEngine::instance()->txn_manager()->delete_txn(
-                    partition_id, tablet, transaction_id));
+            Status s = 
StorageEngine::instance()->txn_manager()->delete_txn(partition_id, tablet,
+                                                                            
transaction_id);
+            if (!s.ok()) {
+                LOG(WARNING) << "failed to clear transaction. txn_id=" << 
transaction_id
+                             << ", partition_id=" << partition_id
+                             << ", tablet_id=" << tablet_info.first.tablet_id
+                             << ", status=" << s.to_string();
+            }
         }
     }
     LOG(INFO) << "finish to clear transaction task. transaction_id=" << 
transaction_id;
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index ac9367be23b..2ed1ac5674d 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -604,13 +604,14 @@ Status TxnManager::delete_txn(OlapMeta* meta, 
TPartitionId partition_id,
         auto& load_info = load_itr->second;
         auto& rowset = load_info->rowset;
         if (rowset != nullptr && meta != nullptr) {
-            if (rowset->version().first > 0) {
+            if (!rowset->is_pending()) {
                 return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
                         "could not delete transaction from engine, just remove 
it from memory not "
                         "delete from disk, because related rowset already 
published. partition_id: "
-                        "{}, transaction_id: {}, tablet: {}, rowset id: {}, 
version:{}",
+                        "{}, transaction_id: {}, tablet: {}, rowset id: {}, 
version: {}, state: {}",
                         key.first, key.second, tablet_info.to_string(),
-                        rowset->rowset_id().to_string(), 
rowset->version().to_string());
+                        rowset->rowset_id().to_string(), 
rowset->version().to_string(),
+                        RowsetStatePB_Name(rowset->rowset_meta_state()));
             } else {
                 static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, 
rowset->rowset_id()));
 #ifndef BE_TEST
diff --git a/be/test/olap/test_data/rowset_meta3.json 
b/be/test/olap/test_data/rowset_meta3.json
new file mode 100644
index 00000000000..f6048f93950
--- /dev/null
+++ b/be/test/olap/test_data/rowset_meta3.json
@@ -0,0 +1,22 @@
+{
+    "rowset_id": 10002,
+    "partition_id": 10001,
+    "tablet_id": 12046,
+    "tablet_schema_hash": 365187263,
+    "rowset_type": "BETA_ROWSET",
+    "rowset_state": "COMMITTED",
+    "start_version": 0,
+    "end_version": 1,
+    "num_rows": 0,
+    "total_disk_size": 0,
+    "data_disk_size": 0,
+    "index_disk_size": 0,
+    "empty": true,
+    "creation_time": 1552911435,
+    "tablet_uid": {
+        "hi": 10,
+        "lo": 10
+    },
+    "num_segments": 1,
+    "has_variant_type_in_schema": false
+}
diff --git a/be/test/olap/txn_manager_test.cpp 
b/be/test/olap/txn_manager_test.cpp
index d33570e8a8d..77f1a16eb5b 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -54,6 +54,7 @@ static StorageEngine* k_engine = nullptr;
 
 const std::string rowset_meta_path = 
"./be/test/olap/test_data/rowset_meta.json";
 const std::string rowset_meta_path_2 = 
"./be/test/olap/test_data/rowset_meta2.json";
+const std::string rowset_meta_path_3 = 
"./be/test/olap/test_data/rowset_meta3.json";
 
 class TxnManagerTest : public testing::Test {
 public:
@@ -169,6 +170,22 @@ public:
         EXPECT_EQ(rowset_meta2->rowset_id(), rowset_id);
         EXPECT_EQ(Status::OK(), RowsetFactory::create_rowset(_schema, 
rowset_meta_path_2,
                                                              rowset_meta2, 
&_rowset_diff_id));
+
+        // init rowset meta 3
+        _json_rowset_meta = "";
+        std::ifstream infile3(rowset_meta_path_3);
+        char buffer3[1024];
+        while (!infile3.eof()) {
+            infile3.getline(buffer3, 1024);
+            _json_rowset_meta = _json_rowset_meta + buffer3 + "\n";
+        }
+        _json_rowset_meta = _json_rowset_meta.substr(0, 
_json_rowset_meta.size() - 1);
+        rowset_id.init(10002);
+        RowsetMetaSharedPtr rowset_meta3(new RowsetMeta());
+        rowset_meta3->init_from_json(_json_rowset_meta);
+        EXPECT_EQ(rowset_meta3->rowset_id(), rowset_id);
+        EXPECT_EQ(Status::OK(), RowsetFactory::create_rowset(_schema, 
rowset_meta_path_3,
+                                                             rowset_meta3, 
&_rowset_ingested));
         _tablet_uid = TabletUid(10, 10);
     }
 
@@ -190,6 +207,7 @@ private:
     RowsetSharedPtr _rowset;
     RowsetSharedPtr _rowset_same_id;
     RowsetSharedPtr _rowset_diff_id;
+    RowsetSharedPtr _rowset_ingested;
 };
 
 TEST_F(TxnManagerTest, PrepareNewTxn) {
@@ -363,4 +381,23 @@ TEST_F(TxnManagerTest, TabletVersionCache) {
     EXPECT_EQ(tx6, 890);
 }
 
+TEST_F(TxnManagerTest, DeleteCommittedTxnForIngestingBinlog) {
+    auto guard = 
k_engine->pending_local_rowsets().add(_rowset_ingested->rowset_id());
+    auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, 
tablet_id, _tablet_uid,
+                                   load_id, _rowset_ingested, 
std::move(guard), false);
+    ASSERT_TRUE(st.ok()) << st;
+    RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
+    st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, 
_rowset_ingested->rowset_id(),
+                                            rowset_meta);
+    ASSERT_TRUE(st.ok()) << st;
+    EXPECT_EQ(rowset_meta->rowset_id(), _rowset_ingested->rowset_id());
+    st = _txn_mgr->delete_txn(_meta, partition_id, transaction_id, tablet_id, 
_tablet_uid);
+    ASSERT_TRUE(st.ok()) << st;
+    RowsetMetaSharedPtr rowset_meta2(new RowsetMeta());
+    st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, 
_rowset_ingested->rowset_id(),
+                                            rowset_meta2);
+    ASSERT_FALSE(st.ok()) << st;
+    
EXPECT_FALSE(k_engine->pending_local_rowsets().contains(_rowset_ingested->rowset_id()));
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to