mymeiyi commented on code in PR #23053: URL: https://github.com/apache/doris/pull/23053#discussion_r1314410302
########## be/src/olap/wal_table.cpp: ########## @@ -0,0 +1,397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_table.h" + +#include <thrift/protocol/TDebugProtocol.h> + +#include "http/action/stream_load.h" +#include "io/fs/local_file_system.h" +#include "olap/wal_manager.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "util/path_util.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/format/wal/wal_reader.h" + +namespace doris { + +#ifdef BE_TEST +TCheckWalResult k_check_wal_result; +TUpdateWalMapResult k_update_wal_map_result; +#endif + +WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) + : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {} +WalTable::~WalTable() {} + +#ifdef BE_TEST +TStreamLoadPutResult wal_stream_load_put_result; +#endif + +Status WalTable::add_wals(std::vector<std::string> wals) { + std::lock_guard<std::mutex> lock(_replay_wal_lock); + for (const auto& wal : wals) { + LOG(INFO) << "add replay wal " << wal; + _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false}); + } + return Status::OK(); +} +Status WalTable::replay_wals() { + std::vector<std::string> need_replay_wals; + { + std::lock_guard<std::mutex> lock(_replay_wal_lock); + if (_replay_wal_map.empty()) { + return Status::OK(); + } + VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id + << ", wal size=" << _replay_wal_map.size(); + for (auto& [wal, info] : _replay_wal_map) { + auto& [retry_num, start_ts, replaying] = info; + if (replaying) { + continue; + } + if (retry_num >= config::group_commit_replay_wal_retry_num) { + LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id + << ", wal=" << wal + << ", retry_num=" << config::group_commit_replay_wal_retry_num; + std::string rename_path = get_tmp_path(wal); + LOG(INFO) << "rename wal from " << wal << " to " << rename_path; + std::rename(wal.c_str(), rename_path.c_str()); + _replay_wal_map.erase(wal); + continue; + } + if (need_replay(info)) { + replaying = true; + need_replay_wals.push_back(wal); + } + } + std::sort(need_replay_wals.begin(), need_replay_wals.end()); + } + for (const auto& wal : need_replay_wals) { + auto st = replay_wal_internal(wal); + if (!st.ok()) { + std::lock_guard<std::mutex> lock(_replay_wal_lock); + auto it = _replay_wal_map.find(wal); + if (it != _replay_wal_map.end()) { + auto& [retry_num, start_time, replaying] = it->second; + replaying = false; + } + LOG(WARNING) << "failed replay wal, drop this round, db=" << _db_id + << ", table=" << _table_id << ", wal=" << wal << ", st=" << st.to_string(); + break; + } + VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id << ", label=" << wal + << ", st=" << st.to_string(); + } + return Status::OK(); +} + +std::string WalTable::get_tmp_path(const std::string wal) { + std::vector<std::string> path_element; + doris::vectorized::WalReader::string_split(wal, "/", path_element); + std::stringstream ss; + int index = 0; + while (index < path_element.size() - 3) { + ss << path_element[index] << "/"; + index++; + } + ss << "tmp/"; + while (index < path_element.size()) { + if (index != path_element.size() - 1) { + ss << path_element[index] << "_"; + } else { + ss << path_element[index]; + } + index++; + } + return ss.str(); +} + +bool WalTable::need_replay(const doris::WalTable::replay_wal_info& info) { +#ifndef BE_TEST + auto& [retry_num, start_ts, replaying] = info; + auto replay_interval = + pow(2, retry_num) * config::group_commit_replay_wal_retry_interval_seconds * 1000; + return UnixMillis() - start_ts >= replay_interval; +#else + return true; +#endif +} + +Status WalTable::replay_wal_internal(const std::string& wal) { + LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" << _table_id << ", wal=" << wal; + // start a new stream load + { + std::lock_guard<std::mutex> lock(_replay_wal_lock); + auto it = _replay_wal_map.find(wal); + if (it != _replay_wal_map.end()) { + auto& [retry_num, start_time, replaying] = it->second; + ++retry_num; + replaying = true; + } else { + LOG(WARNING) << "can not find wal in stream load replay map. db=" << _db_id + << ", table=" << _table_id << ", wal=" << wal; + return Status::OK(); + } + } + // check whether wal need to do recover or not + bool needRecovery = true; + int64_t wal_id = get_wal_id(wal); + auto st = check_wal(wal_id, needRecovery); + if (!st.ok()) { + LOG(WARNING) << "fail to check status for wal=" << wal << ", st=" << st.to_string(); + return st; + } else if (!needRecovery) { + LOG(INFO) << "wal is already committed, skip recovery, wal=" << wal_id; + _exec_env->wal_mgr()->delete_wal(wal_id); + +// RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(std::to_string(wal_id))); +// LOG(INFO) << "delete wal=" << wal; + std::lock_guard<std::mutex> lock(_replay_wal_lock); + _replay_wal_map.erase(wal); + return Status::OK(); + } + // generate and execute a plan fragment + std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); + TStreamLoadPutRequest request; + st = begin_txn(ctx, wal_id); + if (!st.ok()) { + LOG(WARNING) << "fail to begin txn files for wal=" << wal << ", st=" << st.to_string(); Review Comment: should be "fail to begin txn for wal" ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org