This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ebabec1d3e6 [fix](migrate) Copy binlog files (#41083)
ebabec1d3e6 is described below

commit ebabec1d3e60b5b4ee8d40ea26dd52a8909f14fe
Author: walter <w41te...@gmail.com>
AuthorDate: Sat Sep 21 09:19:26 2024 +0800

    [fix](migrate) Copy binlog files (#41083)
---
 be/src/olap/rowset/rowset_meta_manager.cpp         |  63 +++++++++++++
 be/src/olap/rowset/rowset_meta_manager.h           |   3 +
 be/src/olap/tablet.cpp                             |   5 +
 be/src/olap/tablet.h                               |   1 +
 be/src/olap/tablet_manager.cpp                     |   1 +
 be/src/olap/task/engine_storage_migration_task.cpp | 102 ++++++++++++++++++++-
 be/src/olap/task/engine_storage_migration_task.h   |   5 +-
 7 files changed, 176 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp 
b/be/src/olap/rowset/rowset_meta_manager.cpp
index 9d1cbd88589..9ba6f9540db 100644
--- a/be/src/olap/rowset/rowset_meta_manager.cpp
+++ b/be/src/olap/rowset/rowset_meta_manager.cpp
@@ -357,6 +357,69 @@ Status 
RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletU
     return status;
 }
 
+Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid 
tablet_uid,
+                                                  Version version, 
RowsetBinlogMetasPB* metas_pb) {
+    Status status;
+    auto tablet_uid_str = tablet_uid.to_string();
+    auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
+    auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first);
+    auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1);
+    auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key](
+                                 std::string_view key, std::string_view value) 
-> bool {
+        VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", 
key, value);
+        if (key.compare(end_key) > 0) { // the binlog meta key is binary 
comparable.
+            // All binlog meta has been scanned
+            return false;
+        }
+
+        if (!starts_with_binlog_meta(key)) {
+            auto err_msg = fmt::format("invalid binlog meta key:{}", key);
+            status = Status::InternalError(err_msg);
+            LOG(WARNING) << err_msg;
+            return false;
+        }
+
+        BinlogMetaEntryPB binlog_meta_entry_pb;
+        if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) {
+            auto err_msg = fmt::format("fail to parse binlog meta value:{}", 
value);
+            status = Status::InternalError(err_msg);
+            LOG(WARNING) << err_msg;
+            return false;
+        }
+
+        const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
+        auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
+        binlog_meta_pb->set_rowset_id(rowset_id);
+        binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
+        binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
+        binlog_meta_pb->set_meta_key(std::string {key});
+        binlog_meta_pb->set_meta(std::string {value});
+
+        auto binlog_data_key =
+                make_binlog_data_key(tablet_uid_str, 
binlog_meta_entry_pb.version(), rowset_id);
+        std::string binlog_data;
+        status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, 
&binlog_data);
+        if (!status.ok()) {
+            LOG(WARNING) << status.to_string();
+            return false;
+        }
+        binlog_meta_pb->set_data_key(binlog_data_key);
+        binlog_meta_pb->set_data(binlog_data);
+
+        return false;
+    };
+
+    Status iterStatus =
+            meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, 
traverse_func);
+    if (!iterStatus.ok()) {
+        LOG(WARNING) << fmt::format(
+                "fail to iterate binlog meta. prefix_key:{}, version:{}, 
status:{}", prefix_key,
+                version.to_string(), iterStatus.to_string());
+        return iterStatus;
+    }
+    return status;
+}
+
 Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const 
TabletUid tablet_uid,
                                                        RowsetBinlogMetasPB* 
metas_pb) {
     Status status;
diff --git a/be/src/olap/rowset/rowset_meta_manager.h 
b/be/src/olap/rowset/rowset_meta_manager.h
index b61e8c02769..eb04128fded 100644
--- a/be/src/olap/rowset/rowset_meta_manager.h
+++ b/be/src/olap/rowset/rowset_meta_manager.h
@@ -72,6 +72,9 @@ public:
     static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid 
tablet_uid,
                                           const std::vector<int64_t>& 
binlog_versions,
                                           RowsetBinlogMetasPB* metas_pb);
+    // get all binlog metas of a tablet in version.
+    static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid 
tablet_uid,
+                                          Version version, 
RowsetBinlogMetasPB* metas_pb);
     static Status remove_binlog(OlapMeta* meta, const std::string& suffix);
     static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
                                       RowsetBinlogMetasPB* metas_pb);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index b23404583f7..51eabe5495e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2512,6 +2512,11 @@ Status Tablet::get_rowset_binlog_metas(const 
std::vector<int64_t>& binlog_versio
                                                       binlog_versions, 
metas_pb);
 }
 
+Status Tablet::get_rowset_binlog_metas(Version binlog_versions, 
RowsetBinlogMetasPB* metas_pb) {
+    return RowsetMetaManager::get_rowset_binlog_metas(_data_dir->get_meta(), 
tablet_uid(),
+                                                      binlog_versions, 
metas_pb);
+}
+
 std::string Tablet::get_segment_filepath(std::string_view rowset_id,
                                          std::string_view segment_index) const 
{
     return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, 
segment_index);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 800c720a1c4..33253e82ced 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -436,6 +436,7 @@ public:
                                        std::string_view rowset_id) const;
     Status get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versions,
                                    RowsetBinlogMetasPB* metas_pb);
+    Status get_rowset_binlog_metas(Version binlog_versions, 
RowsetBinlogMetasPB* metas_pb);
     std::string get_segment_filepath(std::string_view rowset_id,
                                      std::string_view segment_index) const;
     std::string get_segment_filepath(std::string_view rowset_id, int64_t 
segment_index) const;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index e7679da0603..468a6b2fb12 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -972,6 +972,7 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, 
TTabletId tablet_id,
         if (binlog_meta_filesize > 0) {
             contain_binlog = true;
             RETURN_IF_ERROR(read_pb(binlog_metas_file, 
&rowset_binlog_metas_pb));
+            VLOG_DEBUG << "load rowset binlog metas from file. file_path=" << 
binlog_metas_file;
         }
         
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file));
     }
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index 7c870a5e8ea..21be34a334d 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -37,6 +37,7 @@
 #include "olap/data_dir.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
+#include "olap/pb_helper.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/snapshot_manager.h"
 #include "olap/storage_engine.h"
@@ -262,9 +263,11 @@ Status EngineStorageMigrationTask::_migrate() {
     }
 
     std::vector<RowsetSharedPtr> temp_consistent_rowsets(consistent_rowsets);
+    RowsetBinlogMetasPB rowset_binlog_metas_pb;
     do {
         // migrate all index and data files but header file
-        res = _copy_index_and_data_files(full_path, temp_consistent_rowsets);
+        res = _copy_index_and_data_files(full_path, temp_consistent_rowsets,
+                                         &rowset_binlog_metas_pb);
         if (!res.ok()) {
             break;
         }
@@ -292,7 +295,8 @@ Status EngineStorageMigrationTask::_migrate() {
             // we take the lock to complete it to avoid long-term competition 
with other tasks
             if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) 
{
                 // force to copy the remaining data and index
-                res = _copy_index_and_data_files(full_path, 
temp_consistent_rowsets);
+                res = _copy_index_and_data_files(full_path, 
temp_consistent_rowsets,
+                                                 &rowset_binlog_metas_pb);
                 if (!res.ok()) {
                     break;
                 }
@@ -307,6 +311,16 @@ Status EngineStorageMigrationTask::_migrate() {
             }
         }
 
+        // save rowset binlog metas
+        if (rowset_binlog_metas_pb.rowset_binlog_metas_size() > 0) {
+            auto rowset_binlog_metas_pb_filename =
+                    fmt::format("{}/rowset_binlog_metas.pb", full_path);
+            res = write_pb(rowset_binlog_metas_pb_filename, 
rowset_binlog_metas_pb);
+            if (!res.ok()) {
+                break;
+            }
+        }
+
         // generate new tablet meta and write to hdr file
         res = _gen_and_write_header_to_hdr_file(shard, full_path, 
consistent_rowsets, end_version);
         if (!res.ok()) {
@@ -350,10 +364,92 @@ void EngineStorageMigrationTask::_generate_new_header(
 }
 
 Status EngineStorageMigrationTask::_copy_index_and_data_files(
-        const string& full_path, const std::vector<RowsetSharedPtr>& 
consistent_rowsets) const {
+        const string& full_path, const std::vector<RowsetSharedPtr>& 
consistent_rowsets,
+        RowsetBinlogMetasPB* all_binlog_metas_pb) const {
+    RowsetBinlogMetasPB rowset_binlog_metas_pb;
     for (const auto& rs : consistent_rowsets) {
         RETURN_IF_ERROR(rs->copy_files_to(full_path, rs->rowset_id()));
+
+        Version binlog_versions = rs->version();
+        RETURN_IF_ERROR(_tablet->get_rowset_binlog_metas(binlog_versions, 
&rowset_binlog_metas_pb));
+    }
+
+    // copy index binlog files.
+    for (const auto& rowset_binlog_meta : 
rowset_binlog_metas_pb.rowset_binlog_metas()) {
+        auto num_segments = rowset_binlog_meta.num_segments();
+        std::string_view rowset_id = rowset_binlog_meta.rowset_id();
+
+        RowsetMetaPB rowset_meta_pb;
+        if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) {
+            auto err_msg = fmt::format("fail to parse binlog meta data 
value:{}",
+                                       rowset_binlog_meta.data());
+            LOG(WARNING) << err_msg;
+            return Status::InternalError(err_msg);
+        }
+        const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema();
+        TabletSchema tablet_schema;
+        tablet_schema.init_from_pb(tablet_schema_pb);
+
+        // copy segment files and index files
+        for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+            std::string segment_file_path = 
_tablet->get_segment_filepath(rowset_id, segment_index);
+            auto snapshot_segment_file_path =
+                    fmt::format("{}/{}_{}.binlog", full_path, rowset_id, 
segment_index);
+
+            Status status = 
io::global_local_filesystem()->copy_path(segment_file_path,
+                                                                     
snapshot_segment_file_path);
+            if (!status.ok()) {
+                LOG(WARNING) << "fail to copy binlog segment file. [src=" << 
segment_file_path
+                             << ", dest=" << snapshot_segment_file_path << "]" 
<< status;
+                return status;
+            }
+            VLOG_DEBUG << "copy " << segment_file_path << " to " << 
snapshot_segment_file_path;
+
+            if (tablet_schema.get_inverted_index_storage_format() ==
+                InvertedIndexStorageFormatPB::V1) {
+                for (const auto& index : tablet_schema.indexes()) {
+                    if (index.index_type() != IndexType::INVERTED) {
+                        continue;
+                    }
+                    auto index_id = index.index_id();
+                    auto index_file =
+                            _tablet->get_segment_index_filepath(rowset_id, 
segment_index, index_id);
+                    auto snapshot_segment_index_file_path =
+                            fmt::format("{}/{}_{}_{}.binlog-index", full_path, 
rowset_id,
+                                        segment_index, index_id);
+                    VLOG_DEBUG << "copy " << index_file << " to "
+                               << snapshot_segment_index_file_path;
+                    status = io::global_local_filesystem()->copy_path(
+                            index_file, snapshot_segment_index_file_path);
+                    if (!status.ok()) {
+                        LOG(WARNING)
+                                << "fail to copy binlog index file. [src=" << 
index_file
+                                << ", dest=" << 
snapshot_segment_index_file_path << "]" << status;
+                        return status;
+                    }
+                }
+            } else if (tablet_schema.has_inverted_index()) {
+                auto index_file = 
InvertedIndexDescriptor::get_index_file_path_v2(
+                        
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
+                auto snapshot_segment_index_file_path =
+                        fmt::format("{}/{}_{}.binlog-index", full_path, 
rowset_id, segment_index);
+                VLOG_DEBUG << "copy " << index_file << " to " << 
snapshot_segment_index_file_path;
+                status = io::global_local_filesystem()->copy_path(index_file,
+                                                                  
snapshot_segment_index_file_path);
+                if (!status.ok()) {
+                    LOG(WARNING) << "fail to copy binlog index file. [src=" << 
index_file
+                                 << ", dest=" << 
snapshot_segment_index_file_path << "]" << status;
+                    return status;
+                }
+            }
+        }
     }
+
+    std::move(rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->begin(),
+              rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->end(),
+              google::protobuf::RepeatedFieldBackInserter(
+                      all_binlog_metas_pb->mutable_rowset_binlog_metas()));
+
     return Status::OK();
 }
 
diff --git a/be/src/olap/task/engine_storage_migration_task.h 
b/be/src/olap/task/engine_storage_migration_task.h
index 8858854de92..7578b7de94f 100644
--- a/be/src/olap/task/engine_storage_migration_task.h
+++ b/be/src/olap/task/engine_storage_migration_task.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <gen_cpp/olap_file.pb.h>
+
 #include <mutex>
 #include <shared_mutex>
 #include <string>
@@ -69,7 +71,8 @@ private:
     // TODO: hkp
     // rewrite this function
     Status _copy_index_and_data_files(const std::string& full_path,
-                                      const std::vector<RowsetSharedPtr>& 
consistent_rowsets) const;
+                                      const std::vector<RowsetSharedPtr>& 
consistent_rowsets,
+                                      RowsetBinlogMetasPB* 
all_binlog_metas_pb) const;
 
 private:
     StorageEngine& _engine;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to