This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9d41edd9eb [Feature](binlog) Add binlog gc && Auth master_token (#20854) 9d41edd9eb is described below commit 9d41edd9eb713b3b502b4001204bd45e2718cdcb Author: Jack Drogon <jack.xsuper...@gmail.com> AuthorDate: Fri Jun 16 11:25:11 2023 +0800 [Feature](binlog) Add binlog gc && Auth master_token (#20854) Signed-off-by: Jack Drogon <jack.xsuper...@gmail.com> --- be/src/agent/agent_server.cpp | 9 + be/src/agent/agent_server.h | 1 + be/src/agent/task_worker_pool.cpp | 39 ++++ be/src/agent/task_worker_pool.h | 4 + be/src/olap/binlog.h | 8 +- be/src/olap/rowset/rowset_meta_manager.cpp | 1 + be/src/olap/storage_engine.cpp | 144 +-------------- be/src/olap/storage_engine.h | 5 +- be/src/olap/tablet.cpp | 83 +++++++++ be/src/olap/tablet.h | 1 + .../java/org/apache/doris/binlog/BinlogGcer.java | 179 ++++++++++++++++++ .../org/apache/doris/binlog/BinlogManager.java | 94 +++++++--- .../org/apache/doris/binlog/BinlogTombstone.java | 103 +++++++++++ .../java/org/apache/doris/binlog/DBBinlog.java | 204 ++++++++++++++++++++- .../java/org/apache/doris/binlog/TableBinlog.java | 114 ++++++++++++ .../java/org/apache/doris/binlog/UpsertRecord.java | 16 +- .../main/java/org/apache/doris/catalog/Env.java | 12 ++ .../org/apache/doris/journal/JournalEntity.java | 6 + .../org/apache/doris/persist/BinlogGcInfo.java | 67 +++++++ .../java/org/apache/doris/persist/EditLog.java | 10 + .../org/apache/doris/persist/OperationType.java | 2 + .../apache/doris/service/FrontendServiceImpl.java | 38 +++- .../java/org/apache/doris/task/AgentBatchTask.java | 10 + .../java/org/apache/doris/task/BinlogGcTask.java | 46 +++++ gensrc/proto/olap_file.proto | 5 +- gensrc/thrift/AgentService.thrift | 10 + gensrc/thrift/FrontendService.thrift | 13 ++ gensrc/thrift/Types.thrift | 3 +- 28 files changed, 1055 insertions(+), 172 deletions(-) diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index a7f2f9aa09..4660902333 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -128,6 +128,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) CREATE_AND_START_THREAD(REPORT_OLAP_TABLE, _report_tablet_workers); CREATE_AND_START_POOL(SUBMIT_TABLE_COMPACTION, _submit_table_compaction_workers); CREATE_AND_START_POOL(PUSH_STORAGE_POLICY, _push_storage_policy_workers); + CREATE_AND_START_THREAD(GC_BINLOG, _gc_binlog_workers); #undef CREATE_AND_START_POOL #undef CREATE_AND_START_THREAD @@ -237,6 +238,14 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, signature); } break; + case TTaskType::GC_BINLOG: + if (task.__isset.gc_binlog_req) { + _gc_binlog_workers->submit_task(task); + } else { + ret_st = Status::InvalidArgument( + "task(signature={}) has wrong request member = gc_binlog_req", signature); + } + break; default: ret_st = Status::InvalidArgument("task(signature={}, type={}) has wrong task type", signature, task_type); diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index 3d98fd025d..daa1823b07 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -89,6 +89,7 @@ private: std::unique_ptr<TaskWorkerPool> _push_storage_policy_workers; std::unique_ptr<TopicSubscriber> _topic_subscriber; + std::unique_ptr<TaskWorkerPool> _gc_binlog_workers; }; } // end namespace doris diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 7da010eca0..fd2c8a17dd 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -213,6 +213,10 @@ void TaskWorkerPool::start() { _worker_count = 1; _cb = std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback, this); break; + case TaskWorkerType::GC_BINLOG: + _worker_count = 1; + _cb = std::bind<void>(&TaskWorkerPool::_gc_binlog_worker_thread_callback, this); + break; default: // pass break; @@ -1705,6 +1709,41 @@ void AlterTableTaskPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, finish_task_request->__set_task_status(status.to_thrift()); } +void TaskWorkerPool::_gc_binlog_worker_thread_callback() { + while (_is_work) { + TAgentTaskRequest agent_task_req; + { + std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); + if (!_is_work) { + return; + } + + agent_task_req = _tasks.front(); + _tasks.pop_front(); + } + + std::unordered_map<int64_t, int64_t> gc_tablet_infos; + if (!agent_task_req.__isset.gc_binlog_req) { + LOG(WARNING) << "gc binlog task is not valid"; + return; + } + if (!agent_task_req.gc_binlog_req.__isset.tablet_gc_binlog_infos) { + LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid"; + return; + } + + auto& tablet_gc_binlog_infos = agent_task_req.gc_binlog_req.tablet_gc_binlog_infos; + for (auto& tablet_info : tablet_gc_binlog_infos) { + // gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash); + gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version); + } + + StorageEngine::instance()->gc_binlogs(gc_tablet_infos); + } +} + CloneTaskPool::CloneTaskPool(ExecEnv* env, ThreadModel thread_model) : TaskWorkerPool(TaskWorkerType::CLONE, env, *env->master_info(), thread_model) { _worker_count = config::clone_worker_count; diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index a33704480d..598c77d3ef 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -80,6 +80,7 @@ public: PUSH_COOLDOWN_CONF, PUSH_STORAGE_POLICY, ALTER_INVERTED_INDEX, + GC_BINLOG, }; enum ReportType { TASK, DISK, TABLET }; @@ -141,6 +142,8 @@ public: return "PUSH_STORAGE_POLICY"; case ALTER_INVERTED_INDEX: return "ALTER_INVERTED_INDEX"; + case GC_BINLOG: + return "GC_BINLOG"; default: return "Unknown"; } @@ -197,6 +200,7 @@ protected: void _submit_table_compaction_worker_thread_callback(); void _push_cooldown_conf_worker_thread_callback(); void _push_storage_policy_worker_thread_callback(); + void _gc_binlog_worker_thread_callback(); void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t signature, const TTaskType::type task_type, TFinishTaskRequest* finish_task_request); diff --git a/be/src/olap/binlog.h b/be/src/olap/binlog.h index 9ae243d8bb..b6b95a9530 100644 --- a/be/src/olap/binlog.h +++ b/be/src/olap/binlog.h @@ -64,8 +64,12 @@ inline auto make_binlog_filename_key(const TabletUid& tablet_uid, std::string_vi return fmt::format("{}meta_{}_{:0>20}_", kBinlogPrefix, tablet_uid.to_string(), version); } -inline auto make_binlog_meta_key_prefix(int64_t tablet_id) { - return fmt::format("{}meta_{}_", kBinlogPrefix, tablet_id); +inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid) { + return fmt::format("{}meta_{}_", kBinlogPrefix, tablet_uid.to_string()); +} + +inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid, int64_t version) { + return fmt::format("{}meta_{}_{:020d}_", kBinlogPrefix, tablet_uid.to_string(), version); } inline bool starts_with_binlog_meta(std::string_view str) { diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp b/be/src/olap/rowset/rowset_meta_manager.cpp index 29e5a4eae3..69f189aa78 100644 --- a/be/src/olap/rowset/rowset_meta_manager.cpp +++ b/be/src/olap/rowset/rowset_meta_manager.cpp @@ -146,6 +146,7 @@ Status RowsetMetaManager::_save_with_binlog(OlapMeta* meta, TabletUid tablet_uid binlog_meta_entry_pb.set_rowset_id(rowset_meta_pb.rowset_id()); binlog_meta_entry_pb.set_num_segments(rowset_meta_pb.num_segments()); binlog_meta_entry_pb.set_creation_time(rowset_meta_pb.creation_time()); + binlog_meta_entry_pb.set_rowset_id_v2(rowset_meta_pb.rowset_id_v2()); std::string binlog_meta_value; if (!binlog_meta_entry_pb.SerializeToString(&binlog_meta_value)) { LOG(WARNING) << "serialize binlog pb failed. rowset id:" << binlog_meta_key; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 1d65da4d59..94f1bb5a87 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -97,13 +97,6 @@ using std::vector; using strings::Substitute; namespace doris { -namespace { -inline int64_t now_ms() { - auto duration = std::chrono::steady_clock::now().time_since_epoch(); - return static_cast<int64_t>( - std::chrono::duration_cast<std::chrono::milliseconds>(duration).count()); -} -} // namespace using namespace ErrorCode; DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS); @@ -686,8 +679,6 @@ Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) { } } - // _gc_binlogs(); - if (usage != nullptr) { *usage = tmp_usage; // update usage } @@ -776,136 +767,13 @@ void StorageEngine::_clean_unused_rowset_metas() { } } -void StorageEngine::_gc_binlogs() { - LOG(INFO) << "start to gc binlogs"; - - auto data_dirs = get_stores(); - struct tablet_info { - std::string tablet_path; - int64_t binlog_ttl_ms; - }; - std::unordered_map<int64_t, tablet_info> tablets_info; - - auto get_tablet_info = [&tablets_info, this](int64_t tablet_id) -> const tablet_info& { - if (auto iter = tablets_info.find(tablet_id); iter != tablets_info.end()) { - return iter->second; - } - - auto tablet = tablet_manager()->get_tablet(tablet_id); - if (tablet == nullptr) { - LOG(WARNING) << "failed to find tablet " << tablet_id; - static tablet_info empty_tablet_info; - return empty_tablet_info; - } - - auto tablet_path = tablet->tablet_path(); - auto binlog_ttl_ms = tablet->binlog_ttl_ms(); - tablets_info.emplace(tablet_id, tablet_info {tablet_path, binlog_ttl_ms}); - return tablets_info[tablet_id]; - }; - - for (auto data_dir : data_dirs) { - std::string prefix_key {kBinlogMetaPrefix}; - OlapMeta* meta = data_dir->get_meta(); - DCHECK(meta != nullptr); - - auto now = now_ms(); - int64_t last_tablet_id = 0; - std::vector<std::string> wait_for_deleted_binlog_keys; - std::vector<std::string> wait_for_deleted_binlog_files; - auto add_to_wait_for_deleted_binlog_keys = - [&wait_for_deleted_binlog_keys](std::string_view key) { - wait_for_deleted_binlog_keys.emplace_back(key); - wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key)); - }; - - auto add_to_wait_for_deleted = [&add_to_wait_for_deleted_binlog_keys, - &wait_for_deleted_binlog_files]( - std::string_view key, std::string_view tablet_path, - int64_t rowset_id, int64_t num_segments) { - add_to_wait_for_deleted_binlog_keys(key); - for (int64_t i = 0; i < num_segments; ++i) { - auto segment_file = fmt::format("{}_{}.dat", rowset_id, i); - wait_for_deleted_binlog_files.emplace_back( - fmt::format("{}/_binlog/{}", tablet_path, segment_file)); - } - }; - - auto check_binlog_ttl = [now, &get_tablet_info, &last_tablet_id, - &add_to_wait_for_deleted_binlog_keys, &add_to_wait_for_deleted]( - const std::string& key, - const std::string& value) mutable -> bool { - LOG(INFO) << fmt::format("check binlog ttl, key:{}, value:{}", key, value); - if (!starts_with_binlog_meta(key)) { - last_tablet_id = -1; - return false; - } - - BinlogMetaEntryPB binlog_meta_entry_pb; - if (!binlog_meta_entry_pb.ParseFromString(value)) { - LOG(WARNING) << "failed to parse binlog meta entry, key:" << key; - return true; - } - - auto tablet_id = binlog_meta_entry_pb.tablet_id(); - last_tablet_id = tablet_id; - const auto& tablet_info = get_tablet_info(tablet_id); - std::string_view tablet_path = tablet_info.tablet_path; - // tablet has been removed, removed all these binlog meta - if (tablet_path.empty()) { - add_to_wait_for_deleted_binlog_keys(key); - return true; - } - - // check by ttl - auto rowset_id = binlog_meta_entry_pb.rowset_id(); - auto binlog_ttl_ms = tablet_info.binlog_ttl_ms; - auto num_segments = binlog_meta_entry_pb.num_segments(); - // binlog has been disabled, remove all - if (binlog_ttl_ms <= 0) { - add_to_wait_for_deleted(key, tablet_path, rowset_id, num_segments); - return true; - } - auto binlog_creation_time_ms = binlog_meta_entry_pb.creation_time(); - if (now - binlog_creation_time_ms > binlog_ttl_ms) { - add_to_wait_for_deleted(key, tablet_path, rowset_id, num_segments); - return true; - } - - // binlog not stale, skip - return false; - }; - - while (last_tablet_id >= 0) { - // every loop iterate one tablet - // get binlog meta by prefix - auto status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, check_binlog_ttl); - if (!status.ok()) { - LOG(WARNING) << "failed to iterate binlog meta, status:" << status; - break; - } - - prefix_key = make_binlog_meta_key_prefix(last_tablet_id); - } - - // first remove binlog files, if failed, just break, then retry next time - // this keep binlog meta in meta store, so that binlog can be removed next time - bool remove_binlog_files_failed = false; - for (auto& file : wait_for_deleted_binlog_files) { - if (unlink(file.c_str()) != 0) { - // file not exist, continue - if (errno == ENOENT) { - continue; - } +void StorageEngine::gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos) { + for (auto [tablet_id, version] : gc_tablet_infos) { + LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {}, version: {}", tablet_id, + version); - remove_binlog_files_failed = true; - LOG(WARNING) << "failed to remove binlog file:" << file << ", errno:" << errno; - break; - } - } - if (remove_binlog_files_failed) { - meta->remove(META_COLUMN_FAMILY_INDEX, wait_for_deleted_binlog_keys); - } + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); + tablet->gc_binlogs(version); } } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index fdf49c3e14..15b1a98a78 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -218,6 +218,8 @@ public: Status process_index_change_task(const TAlterInvertedIndexReq& reqest); + void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos); + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -313,7 +315,8 @@ private: SegCompactionCandidatesSharedPtr segments); Status _handle_index_change(IndexBuilderSharedPtr index_builder); - void _gc_binlogs(); + + void _gc_binlogs(int64_t tablet_id, int64_t version); private: struct CompactionCandidate { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index e7216d9401..dea85713f0 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -74,6 +74,7 @@ #include "io/fs/remote_file_system.h" #include "olap/base_compaction.h" #include "olap/base_tablet.h" +#include "olap/binlog.h" #include "olap/cumulative_compaction.h" #include "olap/cumulative_compaction_policy.h" #include "olap/delete_bitmap_calculator.h" @@ -3445,6 +3446,88 @@ void Tablet::set_binlog_config(BinlogConfig binlog_config) { tablet_meta()->set_binlog_config(std::move(binlog_config)); } +void Tablet::gc_binlogs(int64_t version) { + auto meta = _data_dir->get_meta(); + DCHECK(meta != nullptr); + + const auto& tablet_uid = this->tablet_uid(); + const auto tablet_id = this->tablet_id(); + const auto& tablet_path = this->tablet_path(); + std::string begin_key = make_binlog_meta_key_prefix(tablet_uid); + std::string end_key = make_binlog_meta_key_prefix(tablet_uid, version + 1); + LOG(INFO) << fmt::format("gc binlog meta, tablet_id:{}, begin_key:{}, end_key:{}", tablet_id, + begin_key, end_key); + + std::vector<std::string> wait_for_deleted_binlog_keys; + std::vector<std::string> wait_for_deleted_binlog_files; + auto add_to_wait_for_deleted = [&](std::string_view key, std::string_view rowset_id, + int64_t num_segments) { + // add binlog meta key and binlog data key + wait_for_deleted_binlog_keys.emplace_back(key); + wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key)); + + for (int64_t i = 0; i < num_segments; ++i) { + auto segment_file = fmt::format("{}_{}.dat", rowset_id, i); + wait_for_deleted_binlog_files.emplace_back( + fmt::format("{}/_binlog/{}", tablet_path, segment_file)); + } + }; + + auto check_binlog_ttl = [&](const std::string& key, const std::string& value) mutable -> bool { + if (key >= end_key) { + return false; + } + + BinlogMetaEntryPB binlog_meta_entry_pb; + if (!binlog_meta_entry_pb.ParseFromString(value)) { + LOG(WARNING) << "failed to parse binlog meta entry, key:" << key; + return true; + } + + auto num_segments = binlog_meta_entry_pb.num_segments(); + std::string rowset_id; + if (binlog_meta_entry_pb.has_rowset_id_v2()) { + rowset_id = binlog_meta_entry_pb.rowset_id_v2(); + } else { + // key is 'binglog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593' + auto pos = key.rfind("_"); + if (pos == std::string::npos) { + LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key); + return false; + } + rowset_id = key.substr(pos + 1); + } + add_to_wait_for_deleted(key, rowset_id, num_segments); + + return true; + }; + + auto status = meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, check_binlog_ttl); + if (!status.ok()) { + LOG(WARNING) << "failed to iterate binlog meta, status:" << status; + return; + } + + // first remove binlog files, if failed, just break, then retry next time + // this keep binlog meta in meta store, so that binlog can be removed next time + bool remove_binlog_files_failed = false; + for (auto& file : wait_for_deleted_binlog_files) { + if (unlink(file.c_str()) != 0) { + // file not exist, continue + if (errno == ENOENT) { + continue; + } + + remove_binlog_files_failed = true; + LOG(WARNING) << "failed to remove binlog file:" << file << ", errno:" << errno; + break; + } + } + if (!remove_binlog_files_failed) { + meta->remove(META_COLUMN_FAMILY_INDEX, wait_for_deleted_binlog_keys); + } +} + Status Tablet::calc_delete_bitmap_between_segments( RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments, DeleteBitmapPtr delete_bitmap) { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 7ef38e1514..50eb3469f1 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -504,6 +504,7 @@ public: std::string get_segment_filepath(std::string_view rowset_id, std::string_view segment_index) const; bool can_add_binlog(uint64_t total_binlog_size) const; + void gc_binlogs(int64_t version); inline void increase_io_error_times() { ++_io_error_times; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java new file mode 100644 index 0000000000..a2e6afd1fc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.MaterializedIndex.IndexExtState; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.persist.BinlogGcInfo; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.BinlogGcTask; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; + +public class BinlogGcer extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(BinlogGcer.class); + private static final long GC_DURATION_MS = 313 * 1000L; // 313s + + // TODO(Drogon): use this to control gc frequency by real gc time waste sample + private long lastGcTime = 0L; + + public BinlogGcer() { + super("binlog-gcer", GC_DURATION_MS); + } + + @Override + protected void runAfterCatalogReady() { + LOG.debug("start binlog syncer jobs."); + try { + List<BinlogTombstone> tombstones = Env.getCurrentEnv().getBinlogManager().gc(); + if (tombstones != null && !tombstones.isEmpty()) { + LOG.info("tomebstones size: {}", tombstones.size()); + } else { + LOG.info("no gc binlogg"); + return; + } + + try { + sendGcInfoToBe(tombstones); + } catch (Throwable e) { + // TODO(Drogon): retry + // if send gc info to be failed, next gc depend on gc duration + LOG.warn("Failed to send gc info to be", e); + } + + for (BinlogTombstone tombstone : tombstones) { + tombstone.clearTableVersionMap(); + } + BinlogGcInfo info = new BinlogGcInfo(tombstones); + Env.getCurrentEnv().getEditLog().logGcBinlog(info); + } catch (Throwable e) { + LOG.warn("Failed to process one round of BinlogGcer", e); + } + } + + private void sendGcInfoToBe(List<BinlogTombstone> tombstones) { + if (tombstones == null || tombstones.isEmpty()) { + return; + } + + Map<Long, BinlogGcTask> beBinlogGcTaskMap = Maps.newHashMap(); + for (BinlogTombstone tombstone : tombstones) { + sendDbGcInfoToBe(beBinlogGcTaskMap, tombstone); + } + + if (beBinlogGcTaskMap.isEmpty()) { + return; + } + + AgentBatchTask batchTask = new AgentBatchTask(); + for (BinlogGcTask task : beBinlogGcTaskMap.values()) { + batchTask.addTask(task); + } + AgentTaskExecutor.submit(batchTask); + } + + private void sendDbGcInfoToBe(Map<Long, BinlogGcTask> beBinlogGcTaskMap, BinlogTombstone tombstone) { + long dbId = tombstone.getDbId(); + Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId); + if (db == null) { + LOG.warn("db {} does not exist", dbId); + return; + } + + Map<Long, UpsertRecord.TableRecord> tableVersionMap = tombstone.getTableVersionMap(); + for (Map.Entry<Long, UpsertRecord.TableRecord> entry : tableVersionMap.entrySet()) { + long tableId = entry.getKey(); + + OlapTable table = null; + try { + Table tbl = db.getTableOrMetaException(tableId); + if (tbl == null) { + LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); + continue; + } + if (!(tbl instanceof OlapTable)) { + LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId); + continue; + } + table = (OlapTable) tbl; + } catch (Exception e) { + LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); + continue; + } + + UpsertRecord.TableRecord record = entry.getValue(); + sendTableGcInfoToBe(beBinlogGcTaskMap, table, record); + } + } + + private void sendTableGcInfoToBe(Map<Long, BinlogGcTask> beBinlogGcTaskMap, OlapTable table, + UpsertRecord.TableRecord tableRecord) { + OlapTable olapTable = (OlapTable) table; + + olapTable.readLock(); + try { + for (UpsertRecord.TableRecord.PartitionRecord partitionRecord : tableRecord.getPartitionRecords()) { + long partitionId = partitionRecord.partitionId; + Partition partition = olapTable.getPartition(partitionId); + if (partition == null) { + LOG.warn("fail to get partition. table: {}, partition id: {}", olapTable.getName(), partitionId); + continue; + } + + long version = partitionRecord.version; + + List<MaterializedIndex> indexes = partition.getMaterializedIndices(IndexExtState.VISIBLE); + for (MaterializedIndex index : indexes) { + List<Tablet> tablets = index.getTablets(); + for (Tablet tablet : tablets) { + List<Replica> replicas = tablet.getReplicas(); + for (Replica replica : replicas) { + long beId = replica.getBackendId(); + long signature = -1; + BinlogGcTask binlogGcTask = null; + if (beBinlogGcTaskMap.containsKey(beId)) { + binlogGcTask = beBinlogGcTaskMap.get(beId); + } else { + binlogGcTask = new BinlogGcTask(beId, signature); + beBinlogGcTaskMap.put(beId, binlogGcTask); + } + + binlogGcTask.addTask(tablet.getId(), version); + } + } + } + } + } finally { + table.readUnlock(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index c2e4800935..59ba596152 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -17,13 +17,17 @@ package org.apache.doris.binlog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; +import org.apache.doris.persist.BinlogGcInfo; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -39,25 +43,22 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.Iterator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; public class BinlogManager { - private static final Logger LOG = LogManager.getLogger(BinlogManager.class); private static final int BUFFER_SIZE = 16 * 1024; + private static final Logger LOG = LogManager.getLogger(BinlogManager.class); + private ReentrantReadWriteLock lock; private Map<Long, DBBinlog> dbBinlogMap; - // Pair(commitSeq, timestamp), used for gc - // need UpsertRecord to add timestamps for gc - private List<Pair<Long, Long>> timestamps; public BinlogManager() { lock = new ReentrantReadWriteLock(); dbBinlogMap = Maps.newHashMap(); - timestamps = new ArrayList<Pair<Long, Long>>(); } private void addBinlog(TBinlog binlog) { @@ -65,7 +66,15 @@ public class BinlogManager { return; } + // find db BinlogConfig long dbId = binlog.getDbId(); + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + LOG.warn("db not found. dbId: {}", dbId); + return; + } + boolean dbBinlogEnable = db.getBinlogConfig().isEnable(); + DBBinlog dbBinlog; lock.writeLock().lock(); try { @@ -74,14 +83,11 @@ public class BinlogManager { dbBinlog = new DBBinlog(dbId); dbBinlogMap.put(dbId, dbBinlog); } - if (binlog.getTimestamp() > 0) { - timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); - } } finally { lock.writeLock().unlock(); } - dbBinlog.addBinlog(binlog); + dbBinlog.addBinlog(binlog, dbBinlogEnable); } private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long timestamp, TBinlogType type, @@ -140,35 +146,75 @@ public class BinlogManager { } } - // gc binlog, remove all binlog timestamp < minTimestamp - // TODO(Drogon): get minCommitSeq from timestamps - public void gc(long minTimestamp) { + public List<BinlogTombstone> gc() { + LOG.info("begin gc binlog"); + lock.writeLock().lock(); - long minCommitSeq = -1; + Map<Long, DBBinlog> gcDbBinlogMap = null; try { - // user iterator to remove element in timestamps - for (Iterator<Pair<Long, Long>> iterator = timestamps.iterator(); iterator.hasNext();) { - Pair<Long, Long> pair = iterator.next(); - // long commitSeq = pair.first; - long timestamp = pair.second; + gcDbBinlogMap = new HashMap<Long, DBBinlog>(dbBinlogMap); + } finally { + lock.writeLock().unlock(); + } - if (timestamp >= minTimestamp) { - break; - } + if (gcDbBinlogMap.isEmpty()) { + LOG.info("gc binlog, dbBinlogMap is null"); + return null; + } - iterator.remove(); + List<BinlogTombstone> tombstones = Lists.newArrayList(); + for (DBBinlog dbBinlog : gcDbBinlogMap.values()) { + List<BinlogTombstone> dbTombstones = dbBinlog.gc(); + if (dbTombstones != null) { + tombstones.addAll(dbTombstones); } + } + return tombstones; + } + + public void replayGc(BinlogGcInfo binlogGcInfo) { + lock.writeLock().lock(); + Map<Long, DBBinlog> gcDbBinlogMap = null; + try { + gcDbBinlogMap = new HashMap<Long, DBBinlog>(dbBinlogMap); } finally { lock.writeLock().unlock(); } - if (minCommitSeq == -1) { + if (gcDbBinlogMap.isEmpty()) { + LOG.info("replay gc binlog, dbBinlogMap is null"); return; } + for (BinlogTombstone tombstone : binlogGcInfo.getTombstones()) { + long dbId = tombstone.getDbId(); + DBBinlog dbBinlog = gcDbBinlogMap.get(dbId); + dbBinlog.replayGc(tombstone); + } + } + + public void removeDB(long dbId) { lock.writeLock().lock(); + try { + dbBinlogMap.remove(dbId); + } finally { + lock.writeLock().unlock(); + } } + public void removeTable(long dbId, long tableId) { + lock.writeLock().lock(); + try { + DBBinlog dbBinlog = dbBinlogMap.get(dbId); + if (dbBinlog != null) { + dbBinlog.removeTable(tableId); + } + } finally { + lock.writeLock().unlock(); + } + } + + private static void writeTBinlogToStream(DataOutputStream dos, TBinlog binlog) throws TException, IOException { TMemoryBuffer buffer = new TMemoryBuffer(BUFFER_SIZE); TBinaryProtocol protocol = new TBinaryProtocol(buffer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java new file mode 100644 index 0000000000..e1c01a5cfa --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + +import java.util.List; +import java.util.Map; + +public class BinlogTombstone { + @SerializedName(value = "dbBinlogTombstone") + private boolean dbBinlogTombstone; + + @SerializedName(value = "dbId") + private long dbId; + + @SerializedName(value = "commitSeq") + private long commitSeq; + + @SerializedName(value = "tableIds") + private List<Long> tableIds; + + @SerializedName(value = "tableVersionMap") + // this map keep last upsert record <tableId, UpsertRecord> + // only for master fe to send be gc task, not need persist + private Map<Long, UpsertRecord.TableRecord> tableVersionMap = Maps.newHashMap(); + + public BinlogTombstone(long dbId, List<Long> tableIds, long commitSeq) { + this.dbBinlogTombstone = true; + this.dbId = dbId; + this.tableIds = tableIds; + this.commitSeq = commitSeq; + } + + public BinlogTombstone(long dbId, long commitSeq) { + this.dbBinlogTombstone = false; + this.dbId = dbId; + this.tableIds = null; + this.commitSeq = commitSeq; + } + + public void addTableRecord(long tableId, UpsertRecord upsertRecord) { + Map<Long, UpsertRecord.TableRecord> tableRecords = upsertRecord.getTableRecords(); + UpsertRecord.TableRecord tableRecord = tableRecords.get(tableId); + tableVersionMap.put(tableId, tableRecord); + } + + public void addTableRecord(long tableId, UpsertRecord.TableRecord record) { + tableVersionMap.put(tableId, record); + } + + public boolean isDbBinlogTomstone() { + return dbBinlogTombstone; + } + + public long getDbId() { + return dbId; + } + + public List<Long> getTableIds() { + return tableIds; + } + + public long getCommitSeq() { + return commitSeq; + } + + public Map<Long, UpsertRecord.TableRecord> getTableVersionMap() { + return tableVersionMap; + } + + // only call when log editlog + public void clearTableVersionMap() { + tableVersionMap.clear(); + } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + @Override + public String toString() { + return toJson(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index c4312d2134..d6408b3076 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -17,18 +17,29 @@ package org.apache.doris.binlog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Pair; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; public class DBBinlog { + private static final Logger LOG = LogManager.getLogger(BinlogManager.class); + private long dbId; // guard for allBinlogs && tableBinlogMap private ReentrantReadWriteLock lock; @@ -37,6 +48,10 @@ public class DBBinlog { // table binlogs private Map<Long, TableBinlog> tableBinlogMap; + // Pair(commitSeq, timestamp), used for gc + // need UpsertRecord to add timestamps for gc + private List<Pair<Long, Long>> timestamps; + public DBBinlog(long dbId) { lock = new ReentrantReadWriteLock(); this.dbId = dbId; @@ -51,13 +66,19 @@ public class DBBinlog { } }); tableBinlogMap = new HashMap<Long, TableBinlog>(); + timestamps = new ArrayList<Pair<Long, Long>>(); } - public void addBinlog(TBinlog binlog) { + public void addBinlog(TBinlog binlog, boolean dbBinlogEnable) { List<Long> tableIds = binlog.getTableIds(); lock.writeLock().lock(); try { + if (binlog.getTimestamp() > 0 && dbBinlogEnable) { + timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); + } + allBinlogs.add(binlog); + if (tableIds == null) { return; } @@ -98,8 +119,189 @@ public class DBBinlog { } } + public List<BinlogTombstone> gc() { + // check db + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + LOG.error("db not found. dbId: {}", dbId); + return null; + } + + boolean dbBinlogEnable = db.getBinlogConfig().isEnable(); + if (dbBinlogEnable) { + // db binlog is enable, only one binlogTombstones + long ttlSeconds = db.getBinlogConfig().getTtlSeconds(); + long currentSeconds = System.currentTimeMillis() / 1000; + long expireSeconds = currentSeconds - ttlSeconds; + long expireMs = expireSeconds * 1000; + + BinlogTombstone tombstone = dbBinlogEnableGc(expireMs); + List<BinlogTombstone> tombstones = new ArrayList<BinlogTombstone>(); + if (tombstone != null) { + tombstones.add(tombstone); + } + return tombstones; + } else { + return dbBinlogDisableGc(db); + } + } + + private List<BinlogTombstone> dbBinlogDisableGc(Database db) { + List<BinlogTombstone> tombstones = new ArrayList<BinlogTombstone>(); + List<TableBinlog> tableBinlogs = null; + + lock.writeLock().lock(); + try { + tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values()); + } finally { + lock.writeLock().unlock(); + } + + for (TableBinlog tableBinlog : tableBinlogs) { + BinlogTombstone tombstone = tableBinlog.gc(db); + if (tombstone != null) { + tombstones.add(tombstone); + } + } + return tombstones; + } + + private BinlogTombstone dbBinlogEnableGc(long expireMs) { + // find commitSeq from timestamps, if commitSeq's timestamp is less than expireSeconds, then remove it + long largestExpiredCommitSeq = -1; + TBinlog tombstoneBinlog = null; + List<Long> tableIds = null; + List<TableBinlog> tableBinlogs = null; + + lock.writeLock().lock(); + try { + Iterator<Pair<Long, Long>> iterator = timestamps.iterator(); + while (iterator.hasNext()) { + Pair<Long, Long> pair = iterator.next(); + if (pair.second < expireMs) { + largestExpiredCommitSeq = pair.first; + iterator.remove(); + } else { + break; + } + } + + Iterator<TBinlog> binlogIterator = allBinlogs.iterator(); + while (binlogIterator.hasNext()) { + TBinlog binlog = binlogIterator.next(); + if (binlog.getCommitSeq() <= largestExpiredCommitSeq) { + tombstoneBinlog = binlog; + binlogIterator.remove(); + } else { + break; + } + } + + tableIds = new ArrayList<Long>(tableBinlogMap.keySet()); + tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values()); + } finally { + lock.writeLock().unlock(); + } + LOG.info("gc binlog. dbId: {}, expireMs: {}, largestExpiredCommitSeq: {}", + dbId, expireMs, largestExpiredCommitSeq); + if (tombstoneBinlog == null) { + return null; + } + + BinlogTombstone tombstone = new BinlogTombstone(dbId, tableIds, tombstoneBinlog.getCommitSeq()); + for (TableBinlog tableBinlog : tableBinlogs) { + BinlogTombstone binlogTombstone = tableBinlog.gc(largestExpiredCommitSeq); + if (binlogTombstone == null) { + continue; + } + + Map<Long, UpsertRecord.TableRecord> tableVersionMap = binlogTombstone.getTableVersionMap(); + if (tableVersionMap.size() > 1) { + LOG.warn("tableVersionMap size is greater than 1. tableVersionMap: {}", tableVersionMap); + } + for (Map.Entry<Long, UpsertRecord.TableRecord> entry : tableVersionMap.entrySet()) { + long tableId = entry.getKey(); + UpsertRecord.TableRecord record = entry.getValue(); + tombstone.addTableRecord(tableId, record); + } + } + + return tombstone; + } + + public void replayGc(BinlogTombstone tombstone) { + if (tombstone.isDbBinlogTomstone()) { + dbBinlogEnableReplayGc(tombstone); + } else { + dbBinlogDisableReplayGc(tombstone); + } + } + + public void dbBinlogEnableReplayGc(BinlogTombstone tombstone) { + long largestExpiredCommitSeq = tombstone.getCommitSeq(); + + lock.writeLock().lock(); + try { + Iterator<Pair<Long, Long>> iterator = timestamps.iterator(); + while (iterator.hasNext()) { + Pair<Long, Long> pair = iterator.next(); + if (pair.first <= largestExpiredCommitSeq) { + iterator.remove(); + } else { + break; + } + } + + Iterator<TBinlog> binlogIterator = allBinlogs.iterator(); + while (binlogIterator.hasNext()) { + TBinlog binlog = binlogIterator.next(); + if (binlog.getCommitSeq() <= largestExpiredCommitSeq) { + binlogIterator.remove(); + } else { + break; + } + } + } finally { + lock.writeLock().unlock(); + } + + dbBinlogDisableReplayGc(tombstone); + } + + public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) { + List<TableBinlog> tableBinlogs = null; + + lock.writeLock().lock(); + try { + tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values()); + } finally { + lock.writeLock().unlock(); + } + + if (tableBinlogs.isEmpty()) { + return; + } + + Set<Long> tableIds = new HashSet<Long>(tombstone.getTableIds()); + long largestExpiredCommitSeq = tombstone.getCommitSeq(); + for (TableBinlog tableBinlog : tableBinlogs) { + if (tableIds.contains(tableBinlog.getTableId())) { + tableBinlog.replayGc(largestExpiredCommitSeq); + } + } + } + // not thread safety, do this without lock public void getAllBinlogs(List<TBinlog> binlogs) { binlogs.addAll(allBinlogs); } + + public void removeTable(long tableId) { + lock.writeLock().lock(); + try { + tableBinlogMap.remove(tableId); + } finally { + lock.writeLock().unlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 8a3391847b..659f32e6f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -17,14 +17,24 @@ package org.apache.doris.binlog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.common.Pair; import org.apache.doris.thrift.TBinlog; +import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Iterator; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; public class TableBinlog { + private static final Logger LOG = LogManager.getLogger(TableBinlog.class); + private long tableId; private ReentrantReadWriteLock lock; private TreeSet<TBinlog> binlogs; @@ -65,4 +75,108 @@ public class TableBinlog { lock.readLock().unlock(); } } + + // this method call when db binlog enable + public BinlogTombstone gc(long largestExpiredCommitSeq) { + TBinlog tombstoneUpsert = null; + + lock.writeLock().lock(); + try { + Iterator<TBinlog> iter = binlogs.iterator(); + while (iter.hasNext()) { + TBinlog binlog = iter.next(); + if (binlog.getCommitSeq() <= largestExpiredCommitSeq) { + if (binlog.getType() == TBinlogType.UPSERT) { + tombstoneUpsert = binlog; + } + iter.remove(); + } else { + break; + } + } + } finally { + lock.writeLock().unlock(); + } + + if (tombstoneUpsert == null) { + return null; + } + + BinlogTombstone tombstone = new BinlogTombstone(-1, largestExpiredCommitSeq); + UpsertRecord upsertRecord = UpsertRecord.fromJson(tombstoneUpsert.getData()); + tombstone.addTableRecord(tableId, upsertRecord); + return tombstone; + } + + // this method call when db binlog disable + public BinlogTombstone gc(Database db) { + OlapTable table = null; + try { + Table tbl = db.getTableOrMetaException(tableId); + if (tbl == null) { + LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); + return null; + } + if (!(tbl instanceof OlapTable)) { + LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId); + return null; + } + table = (OlapTable) tbl; + } catch (Exception e) { + LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); + return null; + } + + long dbId = db.getId(); + long ttlSeconds = table.getBinlogConfig().getTtlSeconds(); + long currentSeconds = System.currentTimeMillis() / 1000; + long expireSeconds = currentSeconds - ttlSeconds; + long expireMs = expireSeconds * 1000; + + TBinlog tombstoneUpsert = null; + long largestExpiredCommitSeq = 0; + lock.writeLock().lock(); + try { + Iterator<TBinlog> iter = binlogs.iterator(); + while (iter.hasNext()) { + TBinlog binlog = iter.next(); + if (binlog.getTimestamp() <= expireMs) { + if (binlog.getType() == TBinlogType.UPSERT) { + tombstoneUpsert = binlog; + } + largestExpiredCommitSeq = binlog.getCommitSeq(); + iter.remove(); + } else { + break; + } + } + } finally { + lock.writeLock().unlock(); + } + + BinlogTombstone tombstone = new BinlogTombstone(dbId, largestExpiredCommitSeq); + if (tombstoneUpsert != null) { + UpsertRecord upsertRecord = UpsertRecord.fromJson(tombstoneUpsert.getData()); + tombstone.addTableRecord(tableId, upsertRecord); + } + + return tombstone; + } + + public void replayGc(long largestExpiredCommitSeq) { + lock.writeLock().lock(); + try { + Iterator<TBinlog> iter = binlogs.iterator(); + while (iter.hasNext()) { + TBinlog binlog = iter.next(); + if (binlog.getCommitSeq() <= largestExpiredCommitSeq) { + iter.remove(); + } else { + break; + } + } + } finally { + lock.writeLock().unlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java index e564250093..32052f798a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java @@ -31,8 +31,8 @@ import java.util.List; import java.util.Map; public class UpsertRecord { - class TableRecord { - class PartitionRecord { + public static class TableRecord { + public static class PartitionRecord { @SerializedName(value = "partitionId") public long partitionId; @SerializedName(value = "version") @@ -52,6 +52,10 @@ public class UpsertRecord { partitionRecord.version = partitionCommitInfo.getVersion(); partitionRecords.add(partitionRecord); } + + public List<PartitionRecord> getPartitionRecords() { + return partitionRecords; + } } @SerializedName(value = "commitSeq") @@ -105,10 +109,18 @@ public class UpsertRecord { return new ArrayList<>(tableRecords.keySet()); } + public Map<Long, TableRecord> getTableRecords() { + return tableRecords; + } + public String toJson() { return GsonUtils.GSON.toJson(this); } + public static UpsertRecord fromJson(String json) { + return GsonUtils.GSON.fromJson(json, UpsertRecord.class); + } + @Override public String toString() { return toJson(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index b853232f42..dc822b1470 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -79,6 +79,7 @@ import org.apache.doris.analysis.TableRenameClause; import org.apache.doris.analysis.TruncateTableStmt; import org.apache.doris.analysis.UninstallPluginStmt; import org.apache.doris.backup.BackupHandler; +import org.apache.doris.binlog.BinlogGcer; import org.apache.doris.binlog.BinlogManager; import org.apache.doris.blockrule.SqlBlockRuleMgr; import org.apache.doris.catalog.ColocateTableIndex.GroupId; @@ -172,6 +173,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.AlterMultiMaterializedView; import org.apache.doris.persist.BackendReplicasInfo; import org.apache.doris.persist.BackendTabletsInfo; +import org.apache.doris.persist.BinlogGcInfo; import org.apache.doris.persist.CleanQueryStatsInfo; import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.EditLog; @@ -448,6 +450,8 @@ public class Env { private BinlogManager binlogManager; + private BinlogGcer binlogGcer; + /** * TODO(tsy): to be removed after load refactor */ @@ -666,6 +670,7 @@ public class Env { this.loadManagerAdapter = new LoadManagerAdapter(); this.hiveTransactionMgr = new HiveTransactionMgr(); this.binlogManager = new BinlogManager(); + this.binlogGcer = new BinlogGcer(); } public static void destroyCheckpoint() { @@ -1480,6 +1485,9 @@ public class Env { // start mtmv jobManager mtmvJobManager.start(); getRefreshManager().start(); + + // binlog gcer + binlogGcer.start(); } // start threads that should running on all FE @@ -2805,6 +2813,10 @@ public class Env { getInternalCatalog().replayRecoverPartition(info); } + public void replayGcBinlog(BinlogGcInfo binlogGcInfo) { + binlogManager.replayGc(binlogGcInfo); + } + public static void getDdlStmt(TableIf table, List<String> createTableStmt, List<String> addPartitionStmt, List<String> createRollupStmt, boolean separatePartition, boolean hidePassword, long specificVersion) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 1626bedbf2..39b804eb40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -73,6 +73,7 @@ import org.apache.doris.persist.BatchDropInfo; import org.apache.doris.persist.BatchModifyPartitionsInfo; import org.apache.doris.persist.BatchRemoveTransactionsOperation; import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; +import org.apache.doris.persist.BinlogGcInfo; import org.apache.doris.persist.CleanLabelOperationLog; import org.apache.doris.persist.CleanQueryStatsInfo; import org.apache.doris.persist.ColocatePersistInfo; @@ -823,6 +824,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_GC_BINLOG: { + data = BinlogGcInfo.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BinlogGcInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BinlogGcInfo.java new file mode 100644 index 0000000000..55b13ad5ce --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BinlogGcInfo.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + + +import org.apache.doris.binlog.BinlogTombstone; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +public class BinlogGcInfo implements Writable { + @SerializedName(value = "tombstones") + private List<BinlogTombstone> tombstones = null; + + public BinlogGcInfo() { + // for persist + this.tombstones = null; + } + + public BinlogGcInfo(List<BinlogTombstone> tombstones) { + this.tombstones = tombstones; + } + + public List<BinlogTombstone> getTombstones() { + return tombstones; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static BinlogGcInfo read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), BinlogGcInfo.class); + } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + @Override + public String toString() { + return toJson(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 541518148f..04c1b58d6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -1031,6 +1031,12 @@ public class EditLog { alterDatabasePropertyInfo.getProperties()); break; } + case OperationType.OP_GC_BINLOG: { + BinlogGcInfo binlogGcInfo = (BinlogGcInfo) journal.getData(); + LOG.info("replay gc binlog: {}", binlogGcInfo); + env.replayGcBinlog(binlogGcInfo); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -1797,4 +1803,8 @@ public class EditLog { public void logAlterDatabaseProperty(AlterDatabasePropertyInfo log) { logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log); } + + public void logGcBinlog(BinlogGcInfo log) { + logEdit(OperationType.OP_GC_BINLOG, log); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 4893bbc1ec..674374f917 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -306,6 +306,8 @@ public class OperationType { public static final short OP_ALTER_DATABASE_PROPERTY = 434; + public static final short OP_GC_BINLOG = 435; + /** * Get opcode name by op code. diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 2815c9229e..d296663745 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -110,6 +110,8 @@ import org.apache.doris.thrift.TGetBinlogRequest; import org.apache.doris.thrift.TGetBinlogResult; import org.apache.doris.thrift.TGetDbsParams; import org.apache.doris.thrift.TGetDbsResult; +import org.apache.doris.thrift.TGetMasterTokenRequest; +import org.apache.doris.thrift.TGetMasterTokenResult; import org.apache.doris.thrift.TGetQueryStatsRequest; import org.apache.doris.thrift.TGetSnapshotRequest; import org.apache.doris.thrift.TGetSnapshotResult; @@ -940,6 +942,17 @@ public class FrontendServiceImpl implements FrontendService.Iface { } } + private void checkPassword(String cluster, String user, String passwd, String clientIp) + throws AuthenticationException { + if (Strings.isNullOrEmpty(cluster)) { + cluster = SystemInfoService.DEFAULT_CLUSTER; + } + final String fullUserName = ClusterNamespace.getFullName(cluster, user); + List<UserIdentity> currentUser = Lists.newArrayList(); + Env.getCurrentEnv().getAuth().checkPlainPassword(fullUserName, clientIp, passwd, currentUser); + Preconditions.checkState(currentUser.size() == 1); + } + @Override public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException { String clientAddr = getClientAddrAsString(); @@ -2296,7 +2309,6 @@ public class FrontendServiceImpl implements FrontendService.Iface { Env env = Env.getCurrentEnv(); String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb()); Database db = env.getInternalCatalog().getDbNullable(fullDbName); - long dbId = db.getId(); if (db == null) { String dbName = fullDbName; if (Strings.isNullOrEmpty(request.getCluster())) { @@ -2318,6 +2330,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { } // step 6: get binlog + long dbId = db.getId(); TGetBinlogResult result = new TGetBinlogResult(); result.setStatus(new TStatus(TStatusCode.OK)); long prevCommitSeq = request.getPrevCommitSeq(); @@ -2516,4 +2529,27 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + + public TGetMasterTokenResult getMasterToken(TGetMasterTokenRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("receive get master token request: {}", request); + + TGetMasterTokenResult result = new TGetMasterTokenResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + try { + checkPassword(request.getCluster(), request.getUser(), request.getPassword(), clientAddr); + result.setToken(Env.getCurrentEnv().getToken()); + } catch (AuthenticationException e) { + LOG.warn("failed to get master token: {}", e.getMessage()); + status.setStatusCode(TStatusCode.NOT_AUTHORIZED); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + } + + return result; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index 4f9d9ef2f6..3c12160610 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -34,6 +34,7 @@ import org.apache.doris.thrift.TCompactionReq; import org.apache.doris.thrift.TCreateTabletReq; import org.apache.doris.thrift.TDownloadReq; import org.apache.doris.thrift.TDropTabletReq; +import org.apache.doris.thrift.TGcBinlogReq; import org.apache.doris.thrift.TMoveDirReq; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPublishVersionRequest; @@ -375,6 +376,15 @@ public class AgentBatchTask implements Runnable { tAgentTaskRequest.setPushCooldownConf(request); return tAgentTaskRequest; } + case GC_BINLOG: { + BinlogGcTask binlogGcTask = (BinlogGcTask) task; + TGcBinlogReq request = binlogGcTask.toThrift(); + if (LOG.isDebugEnabled()) { + LOG.debug(request.toString()); + } + tAgentTaskRequest.setGcBinlogReq(request); + return tAgentTaskRequest; + } default: LOG.debug("could not find task type for task [{}]", task); return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/BinlogGcTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/BinlogGcTask.java new file mode 100644 index 0000000000..15bd81473e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/BinlogGcTask.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.thrift.TGcBinlogReq; +import org.apache.doris.thrift.TTabletGcBinlogInfo; +import org.apache.doris.thrift.TTaskType; + +import java.util.ArrayList; +import java.util.List; + +public class BinlogGcTask extends AgentTask { + private List<TTabletGcBinlogInfo> tabletGcBinlogInfos = new ArrayList<>(); + + public BinlogGcTask(long backendId, long signature) { + super(null, backendId, TTaskType.GC_BINLOG, -1, -1, -1, -1, -1, signature); + } + + public void addTask(long tabletId, long version) { + TTabletGcBinlogInfo tabletGcBinlogInfo = new TTabletGcBinlogInfo(); + tabletGcBinlogInfo.setTabletId(tabletId); + tabletGcBinlogInfo.setVersion(version); + tabletGcBinlogInfos.add(tabletGcBinlogInfo); + } + + public TGcBinlogReq toThrift() { + TGcBinlogReq req = new TGcBinlogReq(); + req.setTabletGcBinlogInfos(tabletGcBinlogInfos); + return req; + } +} diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 9bf2abe5c8..6a982fb58c 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -59,7 +59,7 @@ message KeyBoundsPB { } message RowsetMetaPB { - required int64 rowset_id = 1; + required int64 rowset_id = 1; // Deprecated. Use rowset_id_v2 instead. optional int64 partition_id = 2; optional int64 tablet_id = 3; // only for pending rowset @@ -328,7 +328,8 @@ message DeleteBitmapPB { message BinlogMetaEntryPB { optional int64 version = 1; optional int64 tablet_id = 2; - optional int64 rowset_id = 3; + optional int64 rowset_id = 3; // Deprecated use rowset_id_v2 instead optional int64 num_segments = 4; optional int64 creation_time = 5; + optional string rowset_id_v2 = 6; } diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index b30c7ed26a..f92f873fc4 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -198,6 +198,15 @@ struct TAlterInvertedIndexReq { 10: optional i64 expiration } +struct TTabletGcBinlogInfo { + 1: optional Types.TTabletId tablet_id + 2: optional i64 version +} + +struct TGcBinlogReq { + 1: optional list<TTabletGcBinlogInfo> tablet_gc_binlog_infos +} + struct TStorageMigrationReqV2 { 1: optional Types.TTabletId base_tablet_id 2: optional Types.TTabletId new_tablet_id @@ -449,6 +458,7 @@ struct TAgentTaskRequest { 30: optional TPushCooldownConfReq push_cooldown_conf 31: optional TPushStoragePolicyReq push_storage_policy_req 32: optional TAlterInvertedIndexReq alter_inverted_index_req + 33: optional TGcBinlogReq gc_binlog_req } struct TAgentResult { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index e4bff3a4fc..28b77a3f20 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1029,6 +1029,17 @@ struct TRestoreSnapshotResult { 1: optional Status.TStatus status } +struct TGetMasterTokenRequest { + 1: optional string cluster + 2: optional string user + 3: optional string password +} + +struct TGetMasterTokenResult { + 1: optional Status.TStatus status + 2: optional string token +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1089,4 +1100,6 @@ service FrontendService { TQueryStatsResult getQueryStats(1: TGetQueryStatsRequest request) TGetTabletReplicaInfosResult getTabletReplicaInfos(1: TGetTabletReplicaInfosRequest request) + + TGetMasterTokenResult getMasterToken(1: TGetMasterTokenRequest request) } diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 91074669e7..4949ca55f2 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -218,7 +218,8 @@ enum TTaskType { NOTIFY_UPDATE_STORAGE_POLICY, // deprecated PUSH_COOLDOWN_CONF, PUSH_STORAGE_POLICY, - ALTER_INVERTED_INDEX + ALTER_INVERTED_INDEX, + GC_BINLOG } enum TStmtType { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org