github-actions[bot] commented on code in PR #23053: URL: https://github.com/apache/doris/pull/23053#discussion_r1334696944
########## be/src/olap/wal_manager.cpp: ########## @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_manager.h" + +#include <thrift/protocol/TDebugProtocol.h> + +#include <chrono> +#include <filesystem> + +#include "io/fs/local_file_system.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/path_util.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/format/wal/wal_reader.h" + +namespace doris { +WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) + : _exec_env(exec_env), _stop_background_threads_latch(1) { + doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); +} + +WalManager::~WalManager() { + _stop_background_threads_latch.count_down(); + if (_replay_thread) { + _replay_thread->join(); + } + LOG(INFO) << "WalManager is destoried"; +} +void WalManager::stop() { + _stop = true; + LOG(INFO) << "WalManager is stopped"; +} + +Status WalManager::init() { + bool exists = false; + for (auto wal_dir : _wal_dirs) { + std::string tmp_dir = wal_dir + "/tmp"; + LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir)); + } + RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); + } + RETURN_IF_ERROR(scan_wals(wal_dir)); + } + return Thread::create( + "WalMgr", "replay_wal", [this]() { this->replay(); }, &_replay_thread); +} + +Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, + const std::string& label) { + std::string base_path = + _wal_dirs.size() == 1 ? _wal_dirs[0] : _wal_dirs[rand() % _wal_dirs.size()]; + std::stringstream ss; Review Comment: warning: variable 'ss' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::stringstream ss = 0; ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_manager.h" + +#include <thrift/protocol/TDebugProtocol.h> + +#include <chrono> +#include <filesystem> + +#include "io/fs/local_file_system.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/path_util.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/format/wal/wal_reader.h" + +namespace doris { +WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) + : _exec_env(exec_env), _stop_background_threads_latch(1) { + doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); +} + +WalManager::~WalManager() { + _stop_background_threads_latch.count_down(); + if (_replay_thread) { + _replay_thread->join(); + } + LOG(INFO) << "WalManager is destoried"; +} +void WalManager::stop() { + _stop = true; + LOG(INFO) << "WalManager is stopped"; +} + +Status WalManager::init() { + bool exists = false; + for (auto wal_dir : _wal_dirs) { + std::string tmp_dir = wal_dir + "/tmp"; + LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir)); + } + RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); + } + RETURN_IF_ERROR(scan_wals(wal_dir)); + } + return Thread::create( + "WalMgr", "replay_wal", [this]() { this->replay(); }, &_replay_thread); +} + +Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, + const std::string& label) { + std::string base_path = + _wal_dirs.size() == 1 ? _wal_dirs[0] : _wal_dirs[rand() % _wal_dirs.size()]; + std::stringstream ss; + ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" + << std::to_string(wal_id) << "_" << label; + { + std::lock_guard<std::shared_mutex> wrlock(_wal_lock); + _wal_path_map.emplace(wal_id, ss.str()); + } + return Status::OK(); +} + +Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) { + std::shared_lock rdlock(_wal_lock); + auto it = _wal_path_map.find(wal_id); + if (it != _wal_path_map.end()) { + wal_path = _wal_path_map[wal_id]; + } else { + return Status::InternalError("can not find wal_id {} in wal_path_map", wal_id); + } + return Status::OK(); +} + +Status WalManager::create_wal_reader(const std::string& wal_path, + std::shared_ptr<WalReader>& wal_reader) { + wal_reader = std::make_shared<WalReader>(wal_path); + RETURN_IF_ERROR(wal_reader->init()); + return Status::OK(); +} + +Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer) { + std::string wal_path; Review Comment: warning: variable 'wal_path' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string wal_path = 0; ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_manager.h" + +#include <thrift/protocol/TDebugProtocol.h> + +#include <chrono> +#include <filesystem> + +#include "io/fs/local_file_system.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/path_util.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/format/wal/wal_reader.h" + +namespace doris { +WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) + : _exec_env(exec_env), _stop_background_threads_latch(1) { + doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); +} + +WalManager::~WalManager() { + _stop_background_threads_latch.count_down(); + if (_replay_thread) { + _replay_thread->join(); + } + LOG(INFO) << "WalManager is destoried"; +} +void WalManager::stop() { + _stop = true; + LOG(INFO) << "WalManager is stopped"; +} + +Status WalManager::init() { + bool exists = false; + for (auto wal_dir : _wal_dirs) { + std::string tmp_dir = wal_dir + "/tmp"; + LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir)); + } + RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); + } + RETURN_IF_ERROR(scan_wals(wal_dir)); + } + return Thread::create( + "WalMgr", "replay_wal", [this]() { this->replay(); }, &_replay_thread); +} + +Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, + const std::string& label) { + std::string base_path = + _wal_dirs.size() == 1 ? _wal_dirs[0] : _wal_dirs[rand() % _wal_dirs.size()]; + std::stringstream ss; + ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" + << std::to_string(wal_id) << "_" << label; + { + std::lock_guard<std::shared_mutex> wrlock(_wal_lock); + _wal_path_map.emplace(wal_id, ss.str()); + } + return Status::OK(); +} + +Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) { + std::shared_lock rdlock(_wal_lock); + auto it = _wal_path_map.find(wal_id); + if (it != _wal_path_map.end()) { + wal_path = _wal_path_map[wal_id]; + } else { + return Status::InternalError("can not find wal_id {} in wal_path_map", wal_id); + } + return Status::OK(); +} + +Status WalManager::create_wal_reader(const std::string& wal_path, + std::shared_ptr<WalReader>& wal_reader) { + wal_reader = std::make_shared<WalReader>(wal_path); + RETURN_IF_ERROR(wal_reader->init()); + return Status::OK(); +} + +Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer) { + std::string wal_path; + RETURN_IF_ERROR(get_wal_path(wal_id, wal_path)); + std::vector<std::string> path_element; + doris::vectorized::WalReader::string_split(wal_path, "/", path_element); + std::stringstream ss; Review Comment: warning: variable 'ss' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::stringstream ss = 0; ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_manager.h" + +#include <thrift/protocol/TDebugProtocol.h> + +#include <chrono> +#include <filesystem> + +#include "io/fs/local_file_system.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/path_util.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/format/wal/wal_reader.h" + +namespace doris { +WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) + : _exec_env(exec_env), _stop_background_threads_latch(1) { + doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); +} + +WalManager::~WalManager() { + _stop_background_threads_latch.count_down(); + if (_replay_thread) { + _replay_thread->join(); + } + LOG(INFO) << "WalManager is destoried"; +} +void WalManager::stop() { + _stop = true; + LOG(INFO) << "WalManager is stopped"; +} + +Status WalManager::init() { + bool exists = false; + for (auto wal_dir : _wal_dirs) { + std::string tmp_dir = wal_dir + "/tmp"; + LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir)); + } + RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); + } + RETURN_IF_ERROR(scan_wals(wal_dir)); + } + return Thread::create( + "WalMgr", "replay_wal", [this]() { this->replay(); }, &_replay_thread); +} + +Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, + const std::string& label) { + std::string base_path = + _wal_dirs.size() == 1 ? _wal_dirs[0] : _wal_dirs[rand() % _wal_dirs.size()]; + std::stringstream ss; + ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" + << std::to_string(wal_id) << "_" << label; + { + std::lock_guard<std::shared_mutex> wrlock(_wal_lock); + _wal_path_map.emplace(wal_id, ss.str()); + } + return Status::OK(); +} + +Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) { + std::shared_lock rdlock(_wal_lock); + auto it = _wal_path_map.find(wal_id); + if (it != _wal_path_map.end()) { + wal_path = _wal_path_map[wal_id]; + } else { + return Status::InternalError("can not find wal_id {} in wal_path_map", wal_id); + } + return Status::OK(); +} + +Status WalManager::create_wal_reader(const std::string& wal_path, + std::shared_ptr<WalReader>& wal_reader) { + wal_reader = std::make_shared<WalReader>(wal_path); + RETURN_IF_ERROR(wal_reader->init()); + return Status::OK(); +} + +Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer) { + std::string wal_path; + RETURN_IF_ERROR(get_wal_path(wal_id, wal_path)); + std::vector<std::string> path_element; + doris::vectorized::WalReader::string_split(wal_path, "/", path_element); + std::stringstream ss; + for (int i = 0; i < path_element.size() - 1; i++) { + ss << path_element[i] << "/"; + } + std::string base_path = ss.str(); Review Comment: warning: variable 'base_path' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string base_path = 0 = ss.str(); ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_manager.h" + +#include <thrift/protocol/TDebugProtocol.h> + +#include <chrono> +#include <filesystem> + +#include "io/fs/local_file_system.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/path_util.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/format/wal/wal_reader.h" + +namespace doris { +WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) + : _exec_env(exec_env), _stop_background_threads_latch(1) { + doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); +} + +WalManager::~WalManager() { + _stop_background_threads_latch.count_down(); + if (_replay_thread) { + _replay_thread->join(); + } + LOG(INFO) << "WalManager is destoried"; +} +void WalManager::stop() { + _stop = true; + LOG(INFO) << "WalManager is stopped"; +} + +Status WalManager::init() { + bool exists = false; + for (auto wal_dir : _wal_dirs) { + std::string tmp_dir = wal_dir + "/tmp"; + LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir)); + } + RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); + } + RETURN_IF_ERROR(scan_wals(wal_dir)); + } + return Thread::create( + "WalMgr", "replay_wal", [this]() { this->replay(); }, &_replay_thread); +} + +Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, + const std::string& label) { + std::string base_path = + _wal_dirs.size() == 1 ? _wal_dirs[0] : _wal_dirs[rand() % _wal_dirs.size()]; + std::stringstream ss; + ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" + << std::to_string(wal_id) << "_" << label; + { + std::lock_guard<std::shared_mutex> wrlock(_wal_lock); + _wal_path_map.emplace(wal_id, ss.str()); + } + return Status::OK(); +} + +Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) { + std::shared_lock rdlock(_wal_lock); + auto it = _wal_path_map.find(wal_id); + if (it != _wal_path_map.end()) { + wal_path = _wal_path_map[wal_id]; + } else { + return Status::InternalError("can not find wal_id {} in wal_path_map", wal_id); + } + return Status::OK(); +} + +Status WalManager::create_wal_reader(const std::string& wal_path, + std::shared_ptr<WalReader>& wal_reader) { + wal_reader = std::make_shared<WalReader>(wal_path); + RETURN_IF_ERROR(wal_reader->init()); + return Status::OK(); +} + +Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer) { + std::string wal_path; + RETURN_IF_ERROR(get_wal_path(wal_id, wal_path)); + std::vector<std::string> path_element; + doris::vectorized::WalReader::string_split(wal_path, "/", path_element); + std::stringstream ss; + for (int i = 0; i < path_element.size() - 1; i++) { + ss << path_element[i] << "/"; + } + std::string base_path = ss.str(); + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(base_path, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path)); + } + LOG(INFO) << "create wal " << wal_path; + wal_writer = std::make_shared<WalWriter>(wal_path); + RETURN_IF_ERROR(wal_writer->init()); + return Status::OK(); +} + +Status WalManager::scan_wals(const std::string& wal_path) { + size_t count = 0; + bool exists = true; + std::vector<io::FileInfo> dbs; + Status st = io::global_local_filesystem()->list(wal_path, false, &dbs, &exists); + if (!st.ok()) { + LOG(WARNING) << "Failed list files for dir=" << wal_path << ", st=" << st.to_string(); + return st; + } + for (const auto& db_id : dbs) { + if (db_id.is_file) { + continue; + } + std::vector<io::FileInfo> tables; + auto db_path = wal_path + "/" + db_id.file_name; + st = io::global_local_filesystem()->list(db_path, false, &tables, &exists); + if (!st.ok()) { + LOG(WARNING) << "Failed list files for dir=" << db_path << ", st=" << st.to_string(); + return st; + } + for (const auto& table_id : tables) { + if (table_id.is_file) { + continue; + } + std::vector<io::FileInfo> wals; + auto table_path = db_path + "/" + table_id.file_name; + st = io::global_local_filesystem()->list(table_path, false, &wals, &exists); + if (!st.ok()) { + LOG(WARNING) << "Failed list files for dir=" << table_path + << ", st=" << st.to_string(); + return st; + } + if (wals.size() == 0) { + continue; + } + std::vector<std::string> res; + for (const auto& wal : wals) { + auto wal_file = table_path + "/" + wal.file_name; + res.emplace_back(wal_file); + { + std::lock_guard<std::shared_mutex> wrlock(_wal_lock); + int64_t wal_id = std::strtoll(wal.file_name.c_str(), NULL, 10); + _wal_path_map.emplace(wal_id, wal_file); + } + } + st = add_recover_wal(db_id.file_name, table_id.file_name, res); + count += res.size(); + if (!st.ok()) { + LOG(WARNING) << "Failed add replay wal, db=" << db_id.file_name + << ", table=" << table_id.file_name << ", st=" << st.to_string(); + return st; + } + } + } + LOG(INFO) << "Finish list all wals, size:" << count; + return Status::OK(); +} + +Status WalManager::replay() { + do { + if (_stop || _exec_env->master_info() == nullptr) { + break; + } + // port == 0 means not received heartbeat yet + while (_exec_env->master_info()->network_address.port == 0) { + sleep(1); + continue; + } Review Comment: warning: redundant continue statement at the end of loop statement [readability-redundant-control-flow] ```suggestion } ``` ########## be/src/olap/wal_reader.cpp: ########## @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_reader.h" + +#include "common/status.h" +#include "io/fs/file_reader.h" +#include "io/fs/local_file_system.h" +#include "io/fs/path.h" +#include "util/crc32c.h" +#include "wal_writer.h" + +namespace doris { + +WalReader::WalReader(const std::string& file_name) : _file_name(file_name), _offset(0) {} + +WalReader::~WalReader() {} + +Status WalReader::init() { + RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_name, &file_reader)); + return Status::OK(); +} + +Status WalReader::finalize() { + return file_reader->close(); +} + +Status WalReader::read_block(PBlock& block) { + if (_offset >= file_reader->size()) { + return Status::EndOfFile("end of wal file"); + } + size_t bytes_read = 0; + uint8_t row_len_buf[WalWriter::LENGTH_SIZE]; + RETURN_IF_ERROR( + file_reader->read_at(_offset, {row_len_buf, WalWriter::LENGTH_SIZE}, &bytes_read)); + _offset += WalWriter::LENGTH_SIZE; + size_t block_len; + memcpy(&block_len, row_len_buf, WalWriter::LENGTH_SIZE); + // read block + std::string block_buf; Review Comment: warning: variable 'block_buf' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string block_buf = 0; ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_manager.h" + +#include <thrift/protocol/TDebugProtocol.h> + +#include <chrono> +#include <filesystem> + +#include "io/fs/local_file_system.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/path_util.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/format/wal/wal_reader.h" + +namespace doris { +WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) + : _exec_env(exec_env), _stop_background_threads_latch(1) { + doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); +} + +WalManager::~WalManager() { + _stop_background_threads_latch.count_down(); + if (_replay_thread) { + _replay_thread->join(); + } + LOG(INFO) << "WalManager is destoried"; +} +void WalManager::stop() { + _stop = true; + LOG(INFO) << "WalManager is stopped"; +} + +Status WalManager::init() { + bool exists = false; + for (auto wal_dir : _wal_dirs) { + std::string tmp_dir = wal_dir + "/tmp"; + LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir)); + } + RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); + } + RETURN_IF_ERROR(scan_wals(wal_dir)); + } + return Thread::create( + "WalMgr", "replay_wal", [this]() { this->replay(); }, &_replay_thread); +} + +Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, + const std::string& label) { + std::string base_path = + _wal_dirs.size() == 1 ? _wal_dirs[0] : _wal_dirs[rand() % _wal_dirs.size()]; + std::stringstream ss; + ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" + << std::to_string(wal_id) << "_" << label; + { + std::lock_guard<std::shared_mutex> wrlock(_wal_lock); + _wal_path_map.emplace(wal_id, ss.str()); + } + return Status::OK(); +} + +Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) { + std::shared_lock rdlock(_wal_lock); + auto it = _wal_path_map.find(wal_id); + if (it != _wal_path_map.end()) { + wal_path = _wal_path_map[wal_id]; + } else { + return Status::InternalError("can not find wal_id {} in wal_path_map", wal_id); + } + return Status::OK(); +} + +Status WalManager::create_wal_reader(const std::string& wal_path, + std::shared_ptr<WalReader>& wal_reader) { + wal_reader = std::make_shared<WalReader>(wal_path); + RETURN_IF_ERROR(wal_reader->init()); + return Status::OK(); +} + +Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer) { + std::string wal_path; + RETURN_IF_ERROR(get_wal_path(wal_id, wal_path)); + std::vector<std::string> path_element; + doris::vectorized::WalReader::string_split(wal_path, "/", path_element); + std::stringstream ss; + for (int i = 0; i < path_element.size() - 1; i++) { + ss << path_element[i] << "/"; + } + std::string base_path = ss.str(); + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(base_path, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path)); + } + LOG(INFO) << "create wal " << wal_path; + wal_writer = std::make_shared<WalWriter>(wal_path); + RETURN_IF_ERROR(wal_writer->init()); + return Status::OK(); +} + +Status WalManager::scan_wals(const std::string& wal_path) { + size_t count = 0; + bool exists = true; + std::vector<io::FileInfo> dbs; + Status st = io::global_local_filesystem()->list(wal_path, false, &dbs, &exists); + if (!st.ok()) { + LOG(WARNING) << "Failed list files for dir=" << wal_path << ", st=" << st.to_string(); + return st; + } + for (const auto& db_id : dbs) { + if (db_id.is_file) { + continue; + } + std::vector<io::FileInfo> tables; + auto db_path = wal_path + "/" + db_id.file_name; + st = io::global_local_filesystem()->list(db_path, false, &tables, &exists); + if (!st.ok()) { + LOG(WARNING) << "Failed list files for dir=" << db_path << ", st=" << st.to_string(); + return st; + } + for (const auto& table_id : tables) { + if (table_id.is_file) { + continue; + } + std::vector<io::FileInfo> wals; + auto table_path = db_path + "/" + table_id.file_name; + st = io::global_local_filesystem()->list(table_path, false, &wals, &exists); + if (!st.ok()) { + LOG(WARNING) << "Failed list files for dir=" << table_path + << ", st=" << st.to_string(); + return st; + } + if (wals.size() == 0) { + continue; + } + std::vector<std::string> res; + for (const auto& wal : wals) { + auto wal_file = table_path + "/" + wal.file_name; + res.emplace_back(wal_file); + { + std::lock_guard<std::shared_mutex> wrlock(_wal_lock); + int64_t wal_id = std::strtoll(wal.file_name.c_str(), NULL, 10); + _wal_path_map.emplace(wal_id, wal_file); + } + } + st = add_recover_wal(db_id.file_name, table_id.file_name, res); + count += res.size(); + if (!st.ok()) { + LOG(WARNING) << "Failed add replay wal, db=" << db_id.file_name + << ", table=" << table_id.file_name << ", st=" << st.to_string(); + return st; + } + } + } + LOG(INFO) << "Finish list all wals, size:" << count; + return Status::OK(); +} + +Status WalManager::replay() { + do { + if (_stop || _exec_env->master_info() == nullptr) { + break; + } + // port == 0 means not received heartbeat yet + while (_exec_env->master_info()->network_address.port == 0) { + sleep(1); + continue; + } + std::vector<std::string> replay_tables; + { + std::lock_guard<std::shared_mutex> wrlock(_lock); + auto it = _table_map.begin(); + while (it != _table_map.end()) { + if (it->second->size() == 0) { + it = _table_map.erase(it); + } else { + replay_tables.push_back(it->first); + it++; + } + } + } + for (const auto& table_id : replay_tables) { + auto st = _table_map[table_id]->replay_wals(); + if (!st.ok()) { + LOG(WARNING) << "Failed add replay wal on table " << table_id; + } + } + } while (!_stop_background_threads_latch.wait_for( + std::chrono::seconds(config::group_commit_replay_wal_retry_interval_seconds))); + return Status::OK(); +} + +Status WalManager::add_recover_wal(const std::string& db_id, const std::string& table_id, + std::vector<std::string> wals) { + std::lock_guard<std::shared_mutex> wrlock(_lock); + std::shared_ptr<WalTable> table_ptr; + auto it = _table_map.find(table_id); + if (it == _table_map.end()) { + table_ptr = std::make_shared<WalTable>(_exec_env, std::stoll(db_id), std::stoll(table_id)); + _table_map.emplace(table_id, table_ptr); + } else { + table_ptr = it->second; + } + table_ptr->add_wals(wals); + return Status::OK(); +} + +size_t WalManager::get_wal_table_size(const std::string& table_id) { + std::shared_lock rdlock(_lock); + auto it = _table_map.find(table_id); + if (it != _table_map.end()) { + return it->second->size(); + } else { + return 0; + } +} + +Status WalManager::delete_wal(int64_t wal_id) { + { + std::lock_guard<std::shared_mutex> wrlock(_wal_lock); + std::string wal_path = _wal_path_map[wal_id]; Review Comment: warning: variable 'wal_path' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string wal_path = 0 = _wal_path_map[wal_id]; ``` ########## be/src/olap/wal_manager.h: ########## @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "common/config.h" +#include "gen_cpp/FrontendService.h" Review Comment: warning: 'gen_cpp/FrontendService.h' file not found [clang-diagnostic-error] ```cpp #include "gen_cpp/FrontendService.h" ^ ``` ########## be/src/olap/wal_reader.cpp: ########## @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_reader.h" + +#include "common/status.h" +#include "io/fs/file_reader.h" +#include "io/fs/local_file_system.h" +#include "io/fs/path.h" +#include "util/crc32c.h" +#include "wal_writer.h" + +namespace doris { + +WalReader::WalReader(const std::string& file_name) : _file_name(file_name), _offset(0) {} + +WalReader::~WalReader() {} + +Status WalReader::init() { + RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_name, &file_reader)); + return Status::OK(); +} + +Status WalReader::finalize() { + return file_reader->close(); +} + +Status WalReader::read_block(PBlock& block) { + if (_offset >= file_reader->size()) { + return Status::EndOfFile("end of wal file"); + } + size_t bytes_read = 0; + uint8_t row_len_buf[WalWriter::LENGTH_SIZE]; + RETURN_IF_ERROR( + file_reader->read_at(_offset, {row_len_buf, WalWriter::LENGTH_SIZE}, &bytes_read)); + _offset += WalWriter::LENGTH_SIZE; + size_t block_len; Review Comment: warning: variable 'block_len' is not initialized [cppcoreguidelines-init-variables] ```suggestion size_t block_len = 0; ``` ########## be/src/olap/wal_table.cpp: ########## @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_table.h" + +#include <event2/bufferevent.h> +#include <event2/event.h> +#include <event2/event_struct.h> +#include <event2/http.h> +#include <thrift/protocol/TDebugProtocol.h> + +#include "evhttp.h" +#include "http/action/stream_load.h" +#include "http/ev_http_server.h" +#include "http/http_common.h" +#include "http/http_headers.h" +#include "http/utils.h" +#include "io/fs/local_file_system.h" +#include "olap/wal_manager.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "util/path_util.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/format/wal/wal_reader.h" + +namespace doris { + +WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) + : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {} +WalTable::~WalTable() {} Review Comment: warning: use '= default' to define a trivial destructor [modernize-use-equals-default] ```suggestion WalTable::~WalTable() = default; ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_manager.h" + +#include <thrift/protocol/TDebugProtocol.h> + +#include <chrono> +#include <filesystem> + +#include "io/fs/local_file_system.h" +#include "runtime/client_cache.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/path_util.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/format/wal/wal_reader.h" + +namespace doris { +WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) + : _exec_env(exec_env), _stop_background_threads_latch(1) { + doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); +} + +WalManager::~WalManager() { + _stop_background_threads_latch.count_down(); + if (_replay_thread) { + _replay_thread->join(); + } + LOG(INFO) << "WalManager is destoried"; +} +void WalManager::stop() { + _stop = true; + LOG(INFO) << "WalManager is stopped"; +} + +Status WalManager::init() { + bool exists = false; + for (auto wal_dir : _wal_dirs) { + std::string tmp_dir = wal_dir + "/tmp"; + LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_dir)); + } + RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); + } + RETURN_IF_ERROR(scan_wals(wal_dir)); + } + return Thread::create( + "WalMgr", "replay_wal", [this]() { this->replay(); }, &_replay_thread); +} + +Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, + const std::string& label) { + std::string base_path = Review Comment: warning: variable 'base_path' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string base_path = 0 = ``` ########## be/src/olap/wal_reader.cpp: ########## @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_reader.h" + +#include "common/status.h" +#include "io/fs/file_reader.h" +#include "io/fs/local_file_system.h" +#include "io/fs/path.h" +#include "util/crc32c.h" +#include "wal_writer.h" + +namespace doris { + +WalReader::WalReader(const std::string& file_name) : _file_name(file_name), _offset(0) {} + +WalReader::~WalReader() {} Review Comment: warning: use '= default' to define a trivial destructor [modernize-use-equals-default] ```suggestion WalReader::~WalReader() = default; ``` ########## be/src/olap/wal_reader.cpp: ########## @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_reader.h" + +#include "common/status.h" +#include "io/fs/file_reader.h" +#include "io/fs/local_file_system.h" +#include "io/fs/path.h" +#include "util/crc32c.h" +#include "wal_writer.h" + +namespace doris { + +WalReader::WalReader(const std::string& file_name) : _file_name(file_name), _offset(0) {} + +WalReader::~WalReader() {} + +Status WalReader::init() { + RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_name, &file_reader)); + return Status::OK(); +} + +Status WalReader::finalize() { + return file_reader->close(); +} + +Status WalReader::read_block(PBlock& block) { + if (_offset >= file_reader->size()) { + return Status::EndOfFile("end of wal file"); + } + size_t bytes_read = 0; + uint8_t row_len_buf[WalWriter::LENGTH_SIZE]; + RETURN_IF_ERROR( + file_reader->read_at(_offset, {row_len_buf, WalWriter::LENGTH_SIZE}, &bytes_read)); + _offset += WalWriter::LENGTH_SIZE; + size_t block_len; + memcpy(&block_len, row_len_buf, WalWriter::LENGTH_SIZE); + // read block + std::string block_buf; + block_buf.resize(block_len); + RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.c_str(), block_len}, &bytes_read)); + _offset += block_len; + RETURN_IF_ERROR(_deserialize(block, block_buf)); + // checksum + uint8_t checksum_len_buf[WalWriter::CHECKSUM_SIZE]; + RETURN_IF_ERROR(file_reader->read_at(_offset, {checksum_len_buf, WalWriter::CHECKSUM_SIZE}, + &bytes_read)); + _offset += WalWriter::CHECKSUM_SIZE; + uint32_t checksum; Review Comment: warning: variable 'checksum' is not initialized [cppcoreguidelines-init-variables] ```suggestion uint32_t checksum = 0; ``` ########## be/src/olap/wal_writer.cpp: ########## @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_writer.h" + +#include "io/fs/file_writer.h" +#include "io/fs/local_file_system.h" +#include "io/fs/path.h" +#include "olap/storage_engine.h" +#include "util/crc32c.h" + +namespace doris { + +WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {} + +WalWriter::~WalWriter() {} Review Comment: warning: use '= default' to define a trivial destructor [modernize-use-equals-default] ```suggestion WalWriter::~WalWriter() = default; ``` ########## be/src/olap/wal_table.h: ########## @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include <mutex> +#include <string> +#include <unordered_map> +#include <vector> + +#include "common/status.h" +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "runtime/exec_env.h" +#include "runtime/stream_load/stream_load_context.h" +namespace doris { +class WalTable { +public: + WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id); + ~WalTable(); + // <retry_num, start_time_ms, is_doing_replay> + using replay_wal_info = std::tuple<int64_t, int64_t, bool>; + // used when be start and there are wals need to do recovery + void add_wals(std::vector<std::string> wals); + Status replay_wals(); + size_t size(); + +private: + std::pair<int64_t, std::string> get_wal_info(const std::string& wal); + std::string get_tmp_path(const std::string wal); + Status send_request(int64_t wal_id, const std::string& wal, const std::string& label); + +private: Review Comment: warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers] ```suggestion ``` <details> <summary>Additional context</summary> **be/src/olap/wal_table.h:40:** previously declared here ```cpp private: ^ ``` </details> ########## be/src/runtime/group_commit_mgr.cpp: ########## @@ -308,9 +311,25 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ << ", instance_id=" << print_id(instance_id) << ", executor status=" << status.to_string() << ", request commit status=" << st.to_string(); + if (!prepare_failed) { + _exec_env->wal_mgr()->add_wal_path(_db_id, table_id, txn_id, label); + std::string wal_path; Review Comment: warning: variable 'wal_path' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string wal_path = 0; ``` ########## be/src/runtime/runtime_state.h: ########## @@ -229,6 +229,12 @@ class RuntimeState { const std::string& db_name() { return _db_name; } + void set_wal_id(int64_t wal_id) { _wal_id = wal_id; } + + int64_t wal_id() { return _wal_id; } Review Comment: warning: method 'wal_id' can be made const [readability-make-member-function-const] ```suggestion int64_t wal_id() const { return _wal_id; } ``` ########## be/src/runtime/group_commit_mgr.cpp: ########## @@ -185,16 +186,17 @@ Status GroupCommitTable::get_first_block_load_queue( Status GroupCommitTable::_create_group_commit_load( int64_t table_id, std::shared_ptr<LoadBlockQueue>& load_block_queue) { TStreamLoadPutRequest request; - std::stringstream ss; - ss << "insert into " << table_id << " select * from group_commit(\"table_id\"=\"" << table_id - << "\")"; - request.__set_load_sql(ss.str()); UniqueId load_id = UniqueId::gen_uid(); TUniqueId tload_id; tload_id.__set_hi(load_id.hi); tload_id.__set_lo(load_id.lo); + std::regex reg("-"); + std::string label = "group_commit_" + std::regex_replace(load_id.to_string(), reg, "_"); Review Comment: warning: variable 'label' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string label = 0 = "group_commit_" + std::regex_replace(load_id.to_string(), reg, "_"); ``` ########## be/src/olap/wal_writer.cpp: ########## @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_writer.h" + +#include "io/fs/file_writer.h" +#include "io/fs/local_file_system.h" +#include "io/fs/path.h" +#include "olap/storage_engine.h" +#include "util/crc32c.h" + +namespace doris { + +WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {} + +WalWriter::~WalWriter() {} + +Status WalWriter::init() { + _batch = config::group_commit_sync_wal_batch; + RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_name, &_file_writer)); + return Status::OK(); +} + +Status WalWriter::finalize() { + RETURN_IF_ERROR(_file_writer->close()); + return Status::OK(); +} + +Status WalWriter::append_blocks(const PBlockArray& blocks) { + size_t total_size = 0; + for (const auto& block : blocks) { + total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE; + } + std::string binary(total_size, '\0'); + char* row_binary = binary.data(); Review Comment: warning: variable 'row_binary' is not initialized [cppcoreguidelines-init-variables] ```suggestion char* row_binary = nullptr = binary.data(); ``` ########## be/src/runtime/group_commit_mgr.cpp: ########## @@ -308,9 +311,25 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ << ", instance_id=" << print_id(instance_id) << ", executor status=" << status.to_string() << ", request commit status=" << st.to_string(); + if (!prepare_failed) { + _exec_env->wal_mgr()->add_wal_path(_db_id, table_id, txn_id, label); + std::string wal_path; + _exec_env->wal_mgr()->get_wal_path(txn_id, wal_path); + _exec_env->wal_mgr()->add_recover_wal(std::to_string(db_id), std::to_string(table_id), + std::vector<std::string> {wal_path}); + } return st; } // TODO handle execute and commit error + if (!prepare_failed && !result_status.ok()) { + _exec_env->wal_mgr()->add_wal_path(_db_id, table_id, txn_id, label); + std::string wal_path; Review Comment: warning: variable 'wal_path' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string wal_path = 0; ``` ########## be/src/runtime/group_commit_mgr.cpp: ########## @@ -185,16 +186,17 @@ Status GroupCommitTable::get_first_block_load_queue( Status GroupCommitTable::_create_group_commit_load( int64_t table_id, std::shared_ptr<LoadBlockQueue>& load_block_queue) { TStreamLoadPutRequest request; - std::stringstream ss; - ss << "insert into " << table_id << " select * from group_commit(\"table_id\"=\"" << table_id - << "\")"; - request.__set_load_sql(ss.str()); UniqueId load_id = UniqueId::gen_uid(); TUniqueId tload_id; tload_id.__set_hi(load_id.hi); tload_id.__set_lo(load_id.lo); + std::regex reg("-"); + std::string label = "group_commit_" + std::regex_replace(load_id.to_string(), reg, "_"); + std::stringstream ss; Review Comment: warning: variable 'ss' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::stringstream ss = 0; ``` ########## be/src/vec/exec/format/wal/wal_reader.cpp: ########## @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "wal_reader.h" + +#include "common/logging.h" +#include "olap/wal_manager.h" +#include "runtime/runtime_state.h" +namespace doris::vectorized { +WalReader::WalReader(RuntimeState* state) : _state(state) { + _wal_id = state->wal_id(); +} +WalReader::~WalReader() { + if (_wal_reader.get() != nullptr) { + _wal_reader->finalize(); + } +} +Status WalReader::init_reader() { + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path)); + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_reader(_wal_path, _wal_reader)); + return Status::OK(); +} +Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + PBlock pblock; + auto st = _wal_reader->read_block(pblock); + if (st.is<ErrorCode::END_OF_FILE>()) { + LOG(INFO) << "read eof on wal:" << _wal_path; + *read_rows = 0; + *eof = true; + return Status::OK(); + } + if (!st.ok()) { + LOG(WARNING) << "Failed to read wal on path = " << _wal_path; + return st; + } + vectorized::Block tmp_block; + tmp_block.deserialize(pblock); + block->swap(tmp_block); + *read_rows = block->rows(); + VLOG_DEBUG << "read block rows:" << *read_rows; + return Status::OK(); +} + +void WalReader::string_split(const std::string& str, const std::string& splits, + std::vector<std::string>& res) { + if (str == "") return; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (str == "") { return; } ``` ########## be/test/olap/wal_reader_writer_test.cpp: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> Review Comment: warning: 'gtest/gtest.h' file not found [clang-diagnostic-error] ```cpp #include <gtest/gtest.h> ^ ``` ########## be/src/vec/exec/format/wal/wal_reader.cpp: ########## @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "wal_reader.h" + +#include "common/logging.h" +#include "olap/wal_manager.h" +#include "runtime/runtime_state.h" +namespace doris::vectorized { +WalReader::WalReader(RuntimeState* state) : _state(state) { + _wal_id = state->wal_id(); +} +WalReader::~WalReader() { + if (_wal_reader.get() != nullptr) { + _wal_reader->finalize(); + } +} +Status WalReader::init_reader() { + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path)); + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_reader(_wal_path, _wal_reader)); + return Status::OK(); +} +Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + PBlock pblock; + auto st = _wal_reader->read_block(pblock); + if (st.is<ErrorCode::END_OF_FILE>()) { + LOG(INFO) << "read eof on wal:" << _wal_path; + *read_rows = 0; + *eof = true; + return Status::OK(); + } + if (!st.ok()) { + LOG(WARNING) << "Failed to read wal on path = " << _wal_path; + return st; + } + vectorized::Block tmp_block; + tmp_block.deserialize(pblock); + block->swap(tmp_block); + *read_rows = block->rows(); + VLOG_DEBUG << "read block rows:" << *read_rows; + return Status::OK(); +} + +void WalReader::string_split(const std::string& str, const std::string& splits, + std::vector<std::string>& res) { + if (str == "") return; + std::string strs = str + splits; + size_t pos = strs.find(splits); + int step = splits.size(); + while (pos != strs.npos) { + std::string temp = strs.substr(0, pos); Review Comment: warning: variable 'temp' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string temp = 0 = strs.substr(0, pos); ``` ########## be/src/vec/exec/format/wal/wal_reader.cpp: ########## @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "wal_reader.h" + +#include "common/logging.h" +#include "olap/wal_manager.h" +#include "runtime/runtime_state.h" +namespace doris::vectorized { +WalReader::WalReader(RuntimeState* state) : _state(state) { + _wal_id = state->wal_id(); +} +WalReader::~WalReader() { + if (_wal_reader.get() != nullptr) { + _wal_reader->finalize(); + } +} +Status WalReader::init_reader() { + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path)); + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_reader(_wal_path, _wal_reader)); + return Status::OK(); +} +Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + PBlock pblock; + auto st = _wal_reader->read_block(pblock); + if (st.is<ErrorCode::END_OF_FILE>()) { + LOG(INFO) << "read eof on wal:" << _wal_path; + *read_rows = 0; + *eof = true; + return Status::OK(); + } + if (!st.ok()) { + LOG(WARNING) << "Failed to read wal on path = " << _wal_path; + return st; + } + vectorized::Block tmp_block; + tmp_block.deserialize(pblock); + block->swap(tmp_block); + *read_rows = block->rows(); + VLOG_DEBUG << "read block rows:" << *read_rows; + return Status::OK(); +} + +void WalReader::string_split(const std::string& str, const std::string& splits, + std::vector<std::string>& res) { + if (str == "") return; + std::string strs = str + splits; Review Comment: warning: variable 'strs' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string strs = 0 = str + splits; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org