This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d2688ae6021 [feature](merge-cloud) schema change for mow table (#31819) d2688ae6021 is described below commit d2688ae6021314f4e9d2341a7e08f8fe9d43e97a Author: Xin Liao <liaoxin...@126.com> AuthorDate: Wed Mar 6 00:11:50 2024 +0800 [feature](merge-cloud) schema change for mow table (#31819) --- be/src/cloud/cloud_schema_change_job.cpp | 80 +++++++++++++++++++++++++++++++- be/src/cloud/cloud_schema_change_job.h | 3 ++ be/src/olap/base_tablet.cpp | 55 ++++++++++++++++++++++ be/src/olap/base_tablet.h | 3 ++ be/src/olap/tablet.cpp | 55 ---------------------- be/src/olap/tablet.h | 3 -- 6 files changed, 140 insertions(+), 59 deletions(-) diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 2099f22f1cd..cd7e0744324 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -39,6 +39,7 @@ namespace doris { using namespace ErrorCode; static constexpr int ALTER_TABLE_BATCH_SIZE = 4096; +static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2; static std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& changer, bool sc_sorting) { @@ -234,6 +235,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam } // 3. Convert historical data + bool already_exist_any_version = false; for (const auto& rs_reader : sc_params.ref_rowset_readers) { VLOG_TRACE << "Begin to convert a history rowset. version=" << rs_reader->version(); @@ -264,6 +266,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam RETURN_IF_ERROR(RowsetFactory::create_rowset(nullptr, _new_tablet->tablet_path(), existed_rs_meta, &rowset)); _output_rowsets.push_back(std::move(rowset)); + already_exist_any_version = true; continue; } else { return st; @@ -327,7 +330,18 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam _output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1); sc_job->set_output_cumulative_point(_output_cumulative_point); - // TODO(Lchangliang): process delete bitmap if the table is MOW + // process delete bitmap if the table is MOW + if (_new_tablet->enable_unique_key_merge_on_write()) { + int64_t initiator = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) & + std::numeric_limits<int64_t>::max(); + // If there are historical versions of rowsets, we need to recalculate their delete + // bitmaps, otherwise we will miss the delete bitmaps of incremental rowsets + int64_t start_calc_delete_bitmap_version = + already_exist_any_version ? 0 : sc_job->alter_version() + 1; + RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(), + start_calc_delete_bitmap_version, initiator)); + sc_job->set_delete_bitmap_lock_initiator(initiator); + } cloud::FinishTabletJobResponse finish_resp; st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp); @@ -361,4 +375,68 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam } return Status::OK(); } + +Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, + int64_t start_calc_delete_bitmap_version, + int64_t initiator) { + LOG_INFO("process mow table") + .tag("new_tablet_id", _new_tablet->tablet_id()) + .tag("out_rowset_size", _output_rowsets.size()) + .tag("start_calc_delete_bitmap_version", start_calc_delete_bitmap_version) + .tag("alter_version", alter_version); + TabletMetaSharedPtr tmp_meta = std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta())); + tmp_meta->delete_bitmap().delete_bitmap.clear(); + std::shared_ptr<CloudTablet> tmp_tablet = + std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta); + { + std::unique_lock wlock(tmp_tablet->get_header_lock()); + tmp_tablet->add_rowsets(_output_rowsets, true, wlock); + } + + // step 1, process incremental rowset without delete bitmap update lock + std::vector<RowsetSharedPtr> incremental_rowsets; + RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get())); + int64_t max_version = tmp_tablet->max_version().second; + LOG(INFO) << "alter table for mow table, calculate delete bitmap of " + << "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version + << "-" << max_version << " new_table_id: " << _new_tablet->tablet_id(); + if (max_version >= start_calc_delete_bitmap_version) { + RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( + {start_calc_delete_bitmap_version, max_version}, &incremental_rowsets)); + for (auto rowset : incremental_rowsets) { + RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); + } + } + + // step 2, process incremental rowset with delete bitmap update lock + RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock( + *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator)); + RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get())); + int64_t new_max_version = tmp_tablet->max_version().second; + LOG(INFO) << "alter table for mow table, calculate delete bitmap of " + << "incremental rowsets with lock, version: " << max_version + 1 << "-" + << new_max_version << " new_tablet_id: " << _new_tablet->tablet_id(); + std::vector<RowsetSharedPtr> new_incremental_rowsets; + if (new_max_version > max_version) { + RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( + {max_version + 1, new_max_version}, &new_incremental_rowsets)); + { + std::unique_lock wlock(tmp_tablet->get_header_lock()); + tmp_tablet->add_rowsets(_output_rowsets, true, wlock); + } + for (auto rowset : new_incremental_rowsets) { + RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); + } + } + + auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap(); + + // step4, store delete bitmap + RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap( + *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, &delete_bitmap)); + + _new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap; + return Status::OK(); +} + } // namespace doris diff --git a/be/src/cloud/cloud_schema_change_job.h b/be/src/cloud/cloud_schema_change_job.h index 7bb03fda12a..d587111df71 100644 --- a/be/src/cloud/cloud_schema_change_job.h +++ b/be/src/cloud/cloud_schema_change_job.h @@ -39,6 +39,9 @@ public: private: Status _convert_historical_rowsets(const SchemaChangeParams& sc_params); + Status _process_delete_bitmap(int64_t alter_version, int64_t start_calc_delete_bitmap_version, + int64_t initiator); + private: CloudStorageEngine& _cloud_storage_engine; std::shared_ptr<CloudTablet> _base_tablet; diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index fe76d43c7f0..e5a41abdbd9 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1375,4 +1375,59 @@ Status BaseTablet::check_rowid_conversion( return Status::OK(); } +// The caller should hold _rowset_update_lock and _meta_lock lock. +Status BaseTablet::update_delete_bitmap_without_lock(const BaseTabletSPtr& self, + const RowsetSharedPtr& rowset) { + DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", { + if (rand() % 100 < (100 * dp->param("percent", 0.1))) { + LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed"); + return Status::InternalError( + "debug tablet update delete bitmap without lock random failed"); + } + }); + int64_t cur_version = rowset->end_version(); + std::vector<segment_v2::SegmentSharedPtr> segments; + RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments)); + + // If this rowset does not have a segment, there is no need for an update. + if (segments.empty()) { + LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap tablet: " + << self->tablet_id() << " cur max_version: " << cur_version; + return Status::OK(); + } + RowsetIdUnorderedSet cur_rowset_ids; + RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids)); + DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(self->tablet_id()); + RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); + + std::vector<RowsetSharedPtr> specified_rowsets = self->get_rowset_by_ids(&cur_rowset_ids); + OlapStopWatch watch; + auto token = self->calc_delete_bitmap_executor()->create_token(); + RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap, + cur_version - 1, token.get())); + RETURN_IF_ERROR(token->wait()); + size_t total_rows = std::accumulate( + segments.begin(), segments.end(), 0, + [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); + LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " << self->tablet_id() + << ", rowset_ids: " << cur_rowset_ids.size() << ", cur max_version: " << cur_version + << ", transaction_id: " << -1 << ", cost: " << watch.get_elapse_time_us() + << "(us), total rows: " << total_rows; + if (config::enable_merge_on_write_correctness_check) { + // check if all the rowset has ROWSET_SENTINEL_MARK + auto st = self->check_delete_bitmap_correctness(delete_bitmap, cur_version - 1, -1, + cur_rowset_ids, &specified_rowsets); + if (!st.ok()) { + LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!"); + } + self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap); + } + for (auto& iter : delete_bitmap->delete_bitmap) { + self->_tablet_meta->delete_bitmap().merge( + {std::get<0>(iter.first), std::get<1>(iter.first), cur_version}, iter.second); + } + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 867ff9c1e3f..b59a4303a0b 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -229,6 +229,9 @@ public: const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>& location_map); + static Status update_delete_bitmap_without_lock(const BaseTabletSPtr& self, + const RowsetSharedPtr& rowset); + //////////////////////////////////////////////////////////////////////////// // end MoW functions //////////////////////////////////////////////////////////////////////////// diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 431577726ad..d1bb734d903 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2300,61 +2300,6 @@ void Tablet::update_max_version_schema(const TabletSchemaSPtr& tablet_schema) { } } -// The caller should hold _rowset_update_lock and _meta_lock lock. -Status Tablet::update_delete_bitmap_without_lock(const TabletSharedPtr& self, - const RowsetSharedPtr& rowset) { - DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", { - if (rand() % 100 < (100 * dp->param("percent", 0.1))) { - LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed"); - return Status::InternalError( - "debug tablet update delete bitmap without lock random failed"); - } - }); - int64_t cur_version = rowset->end_version(); - std::vector<segment_v2::SegmentSharedPtr> segments; - RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments)); - - // If this rowset does not have a segment, there is no need for an update. - if (segments.empty()) { - LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap tablet: " - << self->tablet_id() << " cur max_version: " << cur_version; - return Status::OK(); - } - RowsetIdUnorderedSet cur_rowset_ids; - RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids)); - DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(self->tablet_id()); - RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); - - std::vector<RowsetSharedPtr> specified_rowsets = self->get_rowset_by_ids(&cur_rowset_ids); - OlapStopWatch watch; - auto token = self->_engine.calc_delete_bitmap_executor()->create_token(); - RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap, - cur_version - 1, token.get())); - RETURN_IF_ERROR(token->wait()); - size_t total_rows = std::accumulate( - segments.begin(), segments.end(), 0, - [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); - LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " << self->tablet_id() - << ", rowset_ids: " << cur_rowset_ids.size() << ", cur max_version: " << cur_version - << ", transaction_id: " << -1 << ", cost: " << watch.get_elapse_time_us() - << "(us), total rows: " << total_rows; - if (config::enable_merge_on_write_correctness_check) { - // check if all the rowset has ROWSET_SENTINEL_MARK - auto st = self->check_delete_bitmap_correctness(delete_bitmap, cur_version - 1, -1, - cur_rowset_ids, &specified_rowsets); - if (!st.ok()) { - LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!"); - } - self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap); - } - for (auto& iter : delete_bitmap->delete_bitmap) { - self->_tablet_meta->delete_bitmap().merge( - {std::get<0>(iter.first), std::get<1>(iter.first), cur_version}, iter.second); - } - - return Status::OK(); -} - CalcDeleteBitmapExecutor* Tablet::calc_delete_bitmap_executor() { return _engine.calc_delete_bitmap_executor(); } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 0f6cfa9c04f..50970c51c5e 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -369,9 +369,6 @@ public: // end cooldown functions //////////////////////////////////////////////////////////////////////////// - static Status update_delete_bitmap_without_lock(const TabletSharedPtr& self, - const RowsetSharedPtr& rowset); - CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override; Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id, DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org