This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ebabec1d3e6 [fix](migrate) Copy binlog files (#41083) ebabec1d3e6 is described below commit ebabec1d3e60b5b4ee8d40ea26dd52a8909f14fe Author: walter <w41te...@gmail.com> AuthorDate: Sat Sep 21 09:19:26 2024 +0800 [fix](migrate) Copy binlog files (#41083) --- be/src/olap/rowset/rowset_meta_manager.cpp | 63 +++++++++++++ be/src/olap/rowset/rowset_meta_manager.h | 3 + be/src/olap/tablet.cpp | 5 + be/src/olap/tablet.h | 1 + be/src/olap/tablet_manager.cpp | 1 + be/src/olap/task/engine_storage_migration_task.cpp | 102 ++++++++++++++++++++- be/src/olap/task/engine_storage_migration_task.h | 5 +- 7 files changed, 176 insertions(+), 4 deletions(-) diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp b/be/src/olap/rowset/rowset_meta_manager.cpp index 9d1cbd88589..9ba6f9540db 100644 --- a/be/src/olap/rowset/rowset_meta_manager.cpp +++ b/be/src/olap/rowset/rowset_meta_manager.cpp @@ -357,6 +357,69 @@ Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletU return status; } +Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, + Version version, RowsetBinlogMetasPB* metas_pb) { + Status status; + auto tablet_uid_str = tablet_uid.to_string(); + auto prefix_key = make_binlog_meta_key_prefix(tablet_uid); + auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first); + auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1); + auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key]( + std::string_view key, std::string_view value) -> bool { + VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", key, value); + if (key.compare(end_key) > 0) { // the binlog meta key is binary comparable. + // All binlog meta has been scanned + return false; + } + + if (!starts_with_binlog_meta(key)) { + auto err_msg = fmt::format("invalid binlog meta key:{}", key); + status = Status::InternalError(err_msg); + LOG(WARNING) << err_msg; + return false; + } + + BinlogMetaEntryPB binlog_meta_entry_pb; + if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) { + auto err_msg = fmt::format("fail to parse binlog meta value:{}", value); + status = Status::InternalError(err_msg); + LOG(WARNING) << err_msg; + return false; + } + + const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2(); + auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas(); + binlog_meta_pb->set_rowset_id(rowset_id); + binlog_meta_pb->set_version(binlog_meta_entry_pb.version()); + binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments()); + binlog_meta_pb->set_meta_key(std::string {key}); + binlog_meta_pb->set_meta(std::string {value}); + + auto binlog_data_key = + make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id); + std::string binlog_data; + status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data); + if (!status.ok()) { + LOG(WARNING) << status.to_string(); + return false; + } + binlog_meta_pb->set_data_key(binlog_data_key); + binlog_meta_pb->set_data(binlog_data); + + return false; + }; + + Status iterStatus = + meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, traverse_func); + if (!iterStatus.ok()) { + LOG(WARNING) << fmt::format( + "fail to iterate binlog meta. prefix_key:{}, version:{}, status:{}", prefix_key, + version.to_string(), iterStatus.to_string()); + return iterStatus; + } + return status; +} + Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, RowsetBinlogMetasPB* metas_pb) { Status status; diff --git a/be/src/olap/rowset/rowset_meta_manager.h b/be/src/olap/rowset/rowset_meta_manager.h index b61e8c02769..eb04128fded 100644 --- a/be/src/olap/rowset/rowset_meta_manager.h +++ b/be/src/olap/rowset/rowset_meta_manager.h @@ -72,6 +72,9 @@ public: static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, const std::vector<int64_t>& binlog_versions, RowsetBinlogMetasPB* metas_pb); + // get all binlog metas of a tablet in version. + static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, + Version version, RowsetBinlogMetasPB* metas_pb); static Status remove_binlog(OlapMeta* meta, const std::string& suffix); static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, RowsetBinlogMetasPB* metas_pb); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index b23404583f7..51eabe5495e 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2512,6 +2512,11 @@ Status Tablet::get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versio binlog_versions, metas_pb); } +Status Tablet::get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb) { + return RowsetMetaManager::get_rowset_binlog_metas(_data_dir->get_meta(), tablet_uid(), + binlog_versions, metas_pb); +} + std::string Tablet::get_segment_filepath(std::string_view rowset_id, std::string_view segment_index) const { return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 800c720a1c4..33253e82ced 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -436,6 +436,7 @@ public: std::string_view rowset_id) const; Status get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versions, RowsetBinlogMetasPB* metas_pb); + Status get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb); std::string get_segment_filepath(std::string_view rowset_id, std::string_view segment_index) const; std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index e7679da0603..468a6b2fb12 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -972,6 +972,7 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, if (binlog_meta_filesize > 0) { contain_binlog = true; RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb)); + VLOG_DEBUG << "load rowset binlog metas from file. file_path=" << binlog_metas_file; } RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file)); } diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 7c870a5e8ea..21be34a334d 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -37,6 +37,7 @@ #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" +#include "olap/pb_helper.h" #include "olap/rowset/rowset_meta.h" #include "olap/snapshot_manager.h" #include "olap/storage_engine.h" @@ -262,9 +263,11 @@ Status EngineStorageMigrationTask::_migrate() { } std::vector<RowsetSharedPtr> temp_consistent_rowsets(consistent_rowsets); + RowsetBinlogMetasPB rowset_binlog_metas_pb; do { // migrate all index and data files but header file - res = _copy_index_and_data_files(full_path, temp_consistent_rowsets); + res = _copy_index_and_data_files(full_path, temp_consistent_rowsets, + &rowset_binlog_metas_pb); if (!res.ok()) { break; } @@ -292,7 +295,8 @@ Status EngineStorageMigrationTask::_migrate() { // we take the lock to complete it to avoid long-term competition with other tasks if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) { // force to copy the remaining data and index - res = _copy_index_and_data_files(full_path, temp_consistent_rowsets); + res = _copy_index_and_data_files(full_path, temp_consistent_rowsets, + &rowset_binlog_metas_pb); if (!res.ok()) { break; } @@ -307,6 +311,16 @@ Status EngineStorageMigrationTask::_migrate() { } } + // save rowset binlog metas + if (rowset_binlog_metas_pb.rowset_binlog_metas_size() > 0) { + auto rowset_binlog_metas_pb_filename = + fmt::format("{}/rowset_binlog_metas.pb", full_path); + res = write_pb(rowset_binlog_metas_pb_filename, rowset_binlog_metas_pb); + if (!res.ok()) { + break; + } + } + // generate new tablet meta and write to hdr file res = _gen_and_write_header_to_hdr_file(shard, full_path, consistent_rowsets, end_version); if (!res.ok()) { @@ -350,10 +364,92 @@ void EngineStorageMigrationTask::_generate_new_header( } Status EngineStorageMigrationTask::_copy_index_and_data_files( - const string& full_path, const std::vector<RowsetSharedPtr>& consistent_rowsets) const { + const string& full_path, const std::vector<RowsetSharedPtr>& consistent_rowsets, + RowsetBinlogMetasPB* all_binlog_metas_pb) const { + RowsetBinlogMetasPB rowset_binlog_metas_pb; for (const auto& rs : consistent_rowsets) { RETURN_IF_ERROR(rs->copy_files_to(full_path, rs->rowset_id())); + + Version binlog_versions = rs->version(); + RETURN_IF_ERROR(_tablet->get_rowset_binlog_metas(binlog_versions, &rowset_binlog_metas_pb)); + } + + // copy index binlog files. + for (const auto& rowset_binlog_meta : rowset_binlog_metas_pb.rowset_binlog_metas()) { + auto num_segments = rowset_binlog_meta.num_segments(); + std::string_view rowset_id = rowset_binlog_meta.rowset_id(); + + RowsetMetaPB rowset_meta_pb; + if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) { + auto err_msg = fmt::format("fail to parse binlog meta data value:{}", + rowset_binlog_meta.data()); + LOG(WARNING) << err_msg; + return Status::InternalError(err_msg); + } + const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema(); + TabletSchema tablet_schema; + tablet_schema.init_from_pb(tablet_schema_pb); + + // copy segment files and index files + for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { + std::string segment_file_path = _tablet->get_segment_filepath(rowset_id, segment_index); + auto snapshot_segment_file_path = + fmt::format("{}/{}_{}.binlog", full_path, rowset_id, segment_index); + + Status status = io::global_local_filesystem()->copy_path(segment_file_path, + snapshot_segment_file_path); + if (!status.ok()) { + LOG(WARNING) << "fail to copy binlog segment file. [src=" << segment_file_path + << ", dest=" << snapshot_segment_file_path << "]" << status; + return status; + } + VLOG_DEBUG << "copy " << segment_file_path << " to " << snapshot_segment_file_path; + + if (tablet_schema.get_inverted_index_storage_format() == + InvertedIndexStorageFormatPB::V1) { + for (const auto& index : tablet_schema.indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + auto index_id = index.index_id(); + auto index_file = + _tablet->get_segment_index_filepath(rowset_id, segment_index, index_id); + auto snapshot_segment_index_file_path = + fmt::format("{}/{}_{}_{}.binlog-index", full_path, rowset_id, + segment_index, index_id); + VLOG_DEBUG << "copy " << index_file << " to " + << snapshot_segment_index_file_path; + status = io::global_local_filesystem()->copy_path( + index_file, snapshot_segment_index_file_path); + if (!status.ok()) { + LOG(WARNING) + << "fail to copy binlog index file. [src=" << index_file + << ", dest=" << snapshot_segment_index_file_path << "]" << status; + return status; + } + } + } else if (tablet_schema.has_inverted_index()) { + auto index_file = InvertedIndexDescriptor::get_index_file_path_v2( + InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path)); + auto snapshot_segment_index_file_path = + fmt::format("{}/{}_{}.binlog-index", full_path, rowset_id, segment_index); + VLOG_DEBUG << "copy " << index_file << " to " << snapshot_segment_index_file_path; + status = io::global_local_filesystem()->copy_path(index_file, + snapshot_segment_index_file_path); + if (!status.ok()) { + LOG(WARNING) << "fail to copy binlog index file. [src=" << index_file + << ", dest=" << snapshot_segment_index_file_path << "]" << status; + return status; + } + } + } } + + std::move(rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->begin(), + rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->end(), + google::protobuf::RepeatedFieldBackInserter( + all_binlog_metas_pb->mutable_rowset_binlog_metas())); + return Status::OK(); } diff --git a/be/src/olap/task/engine_storage_migration_task.h b/be/src/olap/task/engine_storage_migration_task.h index 8858854de92..7578b7de94f 100644 --- a/be/src/olap/task/engine_storage_migration_task.h +++ b/be/src/olap/task/engine_storage_migration_task.h @@ -17,6 +17,8 @@ #pragma once +#include <gen_cpp/olap_file.pb.h> + #include <mutex> #include <shared_mutex> #include <string> @@ -69,7 +71,8 @@ private: // TODO: hkp // rewrite this function Status _copy_index_and_data_files(const std::string& full_path, - const std::vector<RowsetSharedPtr>& consistent_rowsets) const; + const std::vector<RowsetSharedPtr>& consistent_rowsets, + RowsetBinlogMetasPB* all_binlog_metas_pb) const; private: StorageEngine& _engine; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org