This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new afcc6170f6c [fix](txn_manager) Add ingested rowsets to unused rowsets when removing txn (#37417) afcc6170f6c is described below commit afcc6170f6ca8ae8553e6a4a4d21000a0cb905ea Author: walter <w41te...@gmail.com> AuthorDate: Wed Jul 10 14:25:44 2024 +0800 [fix](txn_manager) Add ingested rowsets to unused rowsets when removing txn (#37417) Generally speaking, as long as a rowset has a version, it can be considered not to be in a pending state. However, if the rowset was created through ingesting binlogs, it will have a version but should still be considered in a pending state because the ingesting txn has not yet been committed. This PR updates the condition for determining the pending state. If a rowset is COMMITTED, the txn should be allowed to roll back even if a version exists. Cherry-pick #36551 --- be/src/olap/rowset/rowset.cpp | 11 +++++++++- be/src/olap/rowset/rowset.h | 1 + be/src/olap/storage_engine.cpp | 10 +++++++-- be/src/olap/txn_manager.cpp | 7 +++--- be/test/olap/test_data/rowset_meta3.json | 22 +++++++++++++++++++ be/test/olap/txn_manager_test.cpp | 37 ++++++++++++++++++++++++++++++++ 6 files changed, 82 insertions(+), 6 deletions(-) diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp index f4667d3fb63..b5b68f4d38e 100644 --- a/be/src/olap/rowset/rowset.cpp +++ b/be/src/olap/rowset/rowset.cpp @@ -30,7 +30,16 @@ static bvar::Adder<size_t> g_total_rowset_num("doris_total_rowset_num"); Rowset::Rowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta) : _rowset_meta(rowset_meta), _refs_by_reader(0) { - _is_pending = !_rowset_meta->has_version(); + _is_pending = true; + + // Generally speaking, as long as a rowset has a version, it can be considered not to be in a pending state. + // However, if the rowset was created through ingesting binlogs, it will have a version but should still be + // considered in a pending state because the ingesting txn has not yet been committed. + if (_rowset_meta->has_version() && _rowset_meta->start_version() > 0 && + _rowset_meta->rowset_state() != COMMITTED) { + _is_pending = false; + } + if (_is_pending) { _is_cumulative = false; } else { diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 6194703176f..87cfe0b0bea 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -163,6 +163,7 @@ public: int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); } bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); } KeysType keys_type() { return _schema->keys_type(); } + RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); } // remove all files in this rowset // TODO should we rename the method to remove_files() to be more specific? diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 164f312a8da..05fd873fcc6 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -710,8 +710,14 @@ void StorageEngine::clear_transaction_task(const TTransactionId transaction_id, << ", tablet_uid=" << tablet_info.first.tablet_uid; continue; } - static_cast<void>(StorageEngine::instance()->txn_manager()->delete_txn( - partition_id, tablet, transaction_id)); + Status s = StorageEngine::instance()->txn_manager()->delete_txn(partition_id, tablet, + transaction_id); + if (!s.ok()) { + LOG(WARNING) << "failed to clear transaction. txn_id=" << transaction_id + << ", partition_id=" << partition_id + << ", tablet_id=" << tablet_info.first.tablet_id + << ", status=" << s.to_string(); + } } } LOG(INFO) << "finish to clear transaction task. transaction_id=" << transaction_id; diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index ac9367be23b..2ed1ac5674d 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -604,13 +604,14 @@ Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, auto& load_info = load_itr->second; auto& rowset = load_info->rowset; if (rowset != nullptr && meta != nullptr) { - if (rowset->version().first > 0) { + if (!rowset->is_pending()) { return Status::Error<TRANSACTION_ALREADY_COMMITTED>( "could not delete transaction from engine, just remove it from memory not " "delete from disk, because related rowset already published. partition_id: " - "{}, transaction_id: {}, tablet: {}, rowset id: {}, version:{}", + "{}, transaction_id: {}, tablet: {}, rowset id: {}, version: {}, state: {}", key.first, key.second, tablet_info.to_string(), - rowset->rowset_id().to_string(), rowset->version().to_string()); + rowset->rowset_id().to_string(), rowset->version().to_string(), + RowsetStatePB_Name(rowset->rowset_meta_state())); } else { static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id())); #ifndef BE_TEST diff --git a/be/test/olap/test_data/rowset_meta3.json b/be/test/olap/test_data/rowset_meta3.json new file mode 100644 index 00000000000..f6048f93950 --- /dev/null +++ b/be/test/olap/test_data/rowset_meta3.json @@ -0,0 +1,22 @@ +{ + "rowset_id": 10002, + "partition_id": 10001, + "tablet_id": 12046, + "tablet_schema_hash": 365187263, + "rowset_type": "BETA_ROWSET", + "rowset_state": "COMMITTED", + "start_version": 0, + "end_version": 1, + "num_rows": 0, + "total_disk_size": 0, + "data_disk_size": 0, + "index_disk_size": 0, + "empty": true, + "creation_time": 1552911435, + "tablet_uid": { + "hi": 10, + "lo": 10 + }, + "num_segments": 1, + "has_variant_type_in_schema": false +} diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp index d33570e8a8d..77f1a16eb5b 100644 --- a/be/test/olap/txn_manager_test.cpp +++ b/be/test/olap/txn_manager_test.cpp @@ -54,6 +54,7 @@ static StorageEngine* k_engine = nullptr; const std::string rowset_meta_path = "./be/test/olap/test_data/rowset_meta.json"; const std::string rowset_meta_path_2 = "./be/test/olap/test_data/rowset_meta2.json"; +const std::string rowset_meta_path_3 = "./be/test/olap/test_data/rowset_meta3.json"; class TxnManagerTest : public testing::Test { public: @@ -169,6 +170,22 @@ public: EXPECT_EQ(rowset_meta2->rowset_id(), rowset_id); EXPECT_EQ(Status::OK(), RowsetFactory::create_rowset(_schema, rowset_meta_path_2, rowset_meta2, &_rowset_diff_id)); + + // init rowset meta 3 + _json_rowset_meta = ""; + std::ifstream infile3(rowset_meta_path_3); + char buffer3[1024]; + while (!infile3.eof()) { + infile3.getline(buffer3, 1024); + _json_rowset_meta = _json_rowset_meta + buffer3 + "\n"; + } + _json_rowset_meta = _json_rowset_meta.substr(0, _json_rowset_meta.size() - 1); + rowset_id.init(10002); + RowsetMetaSharedPtr rowset_meta3(new RowsetMeta()); + rowset_meta3->init_from_json(_json_rowset_meta); + EXPECT_EQ(rowset_meta3->rowset_id(), rowset_id); + EXPECT_EQ(Status::OK(), RowsetFactory::create_rowset(_schema, rowset_meta_path_3, + rowset_meta3, &_rowset_ingested)); _tablet_uid = TabletUid(10, 10); } @@ -190,6 +207,7 @@ private: RowsetSharedPtr _rowset; RowsetSharedPtr _rowset_same_id; RowsetSharedPtr _rowset_diff_id; + RowsetSharedPtr _rowset_ingested; }; TEST_F(TxnManagerTest, PrepareNewTxn) { @@ -363,4 +381,23 @@ TEST_F(TxnManagerTest, TabletVersionCache) { EXPECT_EQ(tx6, 890); } +TEST_F(TxnManagerTest, DeleteCommittedTxnForIngestingBinlog) { + auto guard = k_engine->pending_local_rowsets().add(_rowset_ingested->rowset_id()); + auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid, + load_id, _rowset_ingested, std::move(guard), false); + ASSERT_TRUE(st.ok()) << st; + RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); + st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset_ingested->rowset_id(), + rowset_meta); + ASSERT_TRUE(st.ok()) << st; + EXPECT_EQ(rowset_meta->rowset_id(), _rowset_ingested->rowset_id()); + st = _txn_mgr->delete_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid); + ASSERT_TRUE(st.ok()) << st; + RowsetMetaSharedPtr rowset_meta2(new RowsetMeta()); + st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset_ingested->rowset_id(), + rowset_meta2); + ASSERT_FALSE(st.ok()) << st; + EXPECT_FALSE(k_engine->pending_local_rowsets().contains(_rowset_ingested->rowset_id())); +} + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org