This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b57369103c6 [fix](Cooldown) Enhance calculate logic of _has_data_to_cooldown (#30244) b57369103c6 is described below commit b57369103c65a9186acc673560c2dd4a8c4cf5af Author: AlexYue <yj976240...@gmail.com> AuthorDate: Wed Jan 24 13:51:44 2024 +0800 [fix](Cooldown) Enhance calculate logic of _has_data_to_cooldown (#30244) --- be/src/olap/tablet.cpp | 15 ++++- be/test/olap/tablet_cooldown_test.cpp | 112 +++++++++++++++++++++++++--------- 2 files changed, 97 insertions(+), 30 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 8c7934ae7d9..e6445b5f5b6 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2228,8 +2228,21 @@ bool Tablet::_has_data_to_cooldown() { int64_t min_local_version = std::numeric_limits<int64_t>::max(); RowsetSharedPtr rowset; std::shared_lock meta_rlock(_meta_lock); + // Ususally once the tablet has done cooldown successfully then the first + // rowset would always be remote rowset + bool has_cooldowned = false; + for (const auto& [_, rs] : _rs_version_map) { + if (!rs->is_local()) { + has_cooldowned = true; + break; + } + } for (auto& [v, rs] : _rs_version_map) { - if (rs->is_local() && v.first < min_local_version && rs->data_disk_size() > 0) { + auto predicate = rs->is_local() && v.first < min_local_version; + if (!has_cooldowned) { + predicate = predicate && (rs->data_disk_size() > 0); + } + if (predicate) { // this is a local rowset and has data min_local_version = v.first; rowset = rs; diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index c5ee17870db..87b3c610dd6 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -219,9 +219,6 @@ public: ASSERT_TRUE(st.ok()) << st; st = io::global_local_filesystem()->create_directory(config::storage_root_path); ASSERT_TRUE(st.ok()) << st; - EXPECT_TRUE(io::global_local_filesystem() - ->create_directory(get_remote_path(remote_tablet_path(kTabletId))) - .ok()); std::vector<StorePath> paths {{config::storage_root_path, -1}}; @@ -308,28 +305,10 @@ static TDescriptorTable create_descriptor_tablet_with_sequence_col() { return desc_tbl_builder.desc_tbl(); } -void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_hash, - int64_t tablet_id, int64_t txn_id, int64_t partition_id) { - // create tablet - std::unique_ptr<RuntimeProfile> profile; - profile = std::make_unique<RuntimeProfile>("CreateTablet"); - TCreateTabletReq request; - create_tablet_request_with_sequence_col(tablet_id, schema_hash, &request); - request.__set_replica_id(replica_id); - Status st = engine_ref->create_tablet(request, profile.get()); - ASSERT_EQ(Status::OK(), st); - - TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col(); - ObjectPool obj_pool; - DescriptorTbl* desc_tbl = nullptr; - static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl)); - TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); - auto param = std::make_shared<OlapTableSchemaParam>(); - - // write data - PUniqueId load_id; - load_id.set_hi(0); - load_id.set_lo(0); +static void write_rowset(TabletSharedPtr* tablet, PUniqueId load_id, int64_t replica_id, + int32_t schema_hash, int64_t tablet_id, int64_t txn_id, + int64_t partition_id, TupleDescriptor* tuple_desc, bool with_data = true) { + auto profile = std::make_unique<RuntimeProfile>("LoadChannels"); WriteRequest write_req; write_req.tablet_id = tablet_id; @@ -340,9 +319,8 @@ void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_ha write_req.tuple_desc = tuple_desc; write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; - write_req.table_schema_param = param; + write_req.table_schema_param = std::make_shared<OlapTableSchemaParam>(); - profile = std::make_unique<RuntimeProfile>("LoadChannels"); auto delta_writer = std::make_unique<DeltaWriter>(*engine_ref, &write_req, profile.get(), TUniqueId {}); ASSERT_NE(delta_writer, nullptr); @@ -353,9 +331,10 @@ void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_ha slot_desc->get_data_type_ptr(), slot_desc->col_name())); } - + Status st; auto columns = block.mutate_columns(); - { + + if (with_data) { int8_t c1 = 123; columns[0]->insert_data((const char*)&c1, sizeof(c1)); @@ -400,6 +379,42 @@ void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_ha st = (*tablet)->add_inc_rowset(rowset); ASSERT_EQ(Status::OK(), st); } +} + +void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_hash, + int64_t tablet_id, int64_t txn_id, int64_t partition_id, bool with_data = true) { + EXPECT_TRUE(io::global_local_filesystem() + ->delete_directory(get_remote_path(remote_tablet_path(tablet_id))) + .ok()); + EXPECT_TRUE(io::global_local_filesystem() + ->create_directory(get_remote_path(remote_tablet_path(tablet_id))) + .ok()); + // create tablet + std::unique_ptr<RuntimeProfile> profile; + profile = std::make_unique<RuntimeProfile>("CreateTablet"); + TCreateTabletReq request; + create_tablet_request_with_sequence_col(tablet_id, schema_hash, &request); + request.__set_replica_id(replica_id); + Status st = engine_ref->create_tablet(request, profile.get()); + ASSERT_EQ(Status::OK(), st); + if (!with_data) { + *tablet = engine_ref->tablet_manager()->get_tablet(tablet_id); + return; + } + + TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col(); + ObjectPool obj_pool; + DescriptorTbl* desc_tbl = nullptr; + static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl)); + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + + write_rowset(tablet, std::move(load_id), replica_id, schema_hash, tablet_id, txn_id, + partition_id, tuple_desc); + EXPECT_EQ(1, (*tablet)->num_rows()); } @@ -428,4 +443,43 @@ TEST_F(TabletCooldownTest, normal) { ASSERT_EQ(segments.size(), 1); } +TEST_F(TabletCooldownTest, cooldown_data) { + TabletSharedPtr tablet1; + createTablet(&tablet1, kReplicaId + 1, kSchemaHash + 1, kTabletId + 1, kTxnId + 1, + kPartitionId + 1, false); + // test cooldown + tablet1->set_storage_policy_id(kStoragePolicyId); + // Tablet with only rowset[0-1] will not be as suitable as cooldown candidate + ASSERT_FALSE(tablet1->_has_data_to_cooldown()); + + TabletSharedPtr tablet2; + createTablet(&tablet2, kReplicaId + 2, kSchemaHash + 2, kTabletId + 2, kTxnId + 2, + kPartitionId + 2); + // test cooldown + tablet2->set_storage_policy_id(kStoragePolicyId); + Status st = tablet2->cooldown(); // rowset [0-1] + ASSERT_NE(Status::OK(), st); + tablet2->update_cooldown_conf(1, kReplicaId + 2); + // cooldown for upload node + st = tablet2->cooldown(); // rowset [0-1] + ASSERT_EQ(Status::OK(), st); + st = tablet2->cooldown(); // rowset [2-2] + ASSERT_EQ(Status::OK(), st); + // Write one empty local rowset into tablet2 to test if this rowset would be uploaded or not + TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col(); + ObjectPool obj_pool; + DescriptorTbl* desc_tbl = nullptr; + static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl)); + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + + PUniqueId load_id; + load_id.set_hi(1); + load_id.set_lo(1); + write_rowset(&tablet2, std::move(load_id), kReplicaId + 2, kSchemaHash + 2, kTabletId + 2, + kTxnId + 3, kPartitionId + 2, tuple_desc, false); + + st = tablet2->cooldown(); // rowset [3-3] + ASSERT_EQ(Status::OK(), st); +} + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org