dataroaring commented on code in PR #23053:
URL: https://github.com/apache/doris/pull/23053#discussion_r1297246703


##########
be/src/olap/wal_manager.cpp:
##########
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/wal_manager.h"
+
+#include <thrift/protocol/TDebugProtocol.h>
+
+#include <chrono>
+#include <filesystem>
+
+#include "io/fs/local_file_system.h"
+#include "runtime/client_cache.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "util/path_util.h"
+#include "util/thrift_rpc_helper.h"
+#include "vec/exec/format/wal/wal_reader.h"
+
+namespace doris {
+WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
+        : _exec_env(exec_env), _stop_background_threads_latch(1) {
+    doris::vectorized::WalReader::string_split(wal_dir_list, ",", wal_dirs);
+}
+
+WalManager::~WalManager() {
+    _stop_background_threads_latch.count_down();
+    if (_replay_thread) {
+        _replay_thread->join();
+    }
+}
+
+Status WalManager::init() {
+    bool exists = false;
+    for (auto wal_dir : wal_dirs) {
+        std::string tmp_dir = wal_dir + "/tmp";
+        LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir;
+        io::global_local_filesystem()->exists(wal_dir, &exists);
+        if (!exists) {
+            io::global_local_filesystem()->create_directory(wal_dir);
+        }
+        io::global_local_filesystem()->exists(tmp_dir, &exists);
+        if (!exists) {
+            io::global_local_filesystem()->create_directory(tmp_dir);
+        }
+        RETURN_IF_ERROR(scan_wals(wal_dir));
+    }
+    return Thread::create(
+            "WalMgr", "replay_wal", [this]() { this->replay(); }, 
&_replay_thread);
+}
+
+Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t 
wal_id) {
+    std::string base_path = wal_dirs.size() == 1 ? wal_dirs[0] : 
wal_dirs[rand() % wal_dirs.size()];
+    std::stringstream ss;
+    ss << base_path << "/" << std::to_string(db_id) << "/" << 
std::to_string(table_id) << "/"
+       << std::to_string(wal_id);
+    {
+        std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+        wal_path_map.emplace(wal_id, ss.str());
+    }
+    return Status::OK();
+}
+
+Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) {
+    std::shared_lock rdlock(_wal_lock);
+    auto it = wal_path_map.find(wal_id);
+    if (it != wal_path_map.end()) {
+        wal_path = wal_path_map[wal_id];
+    } else {
+        return Status::InternalError("can not find wal_id {} in wal_path_map", 
wal_id);
+    }
+    return Status::OK();
+}
+
+Status WalManager::create_wal_reader(const std::string& wal_path,
+                                     std::shared_ptr<WalReader>& wal_reader) {
+    wal_reader = std::make_shared<WalReader>(wal_path);
+    RETURN_IF_ERROR(wal_reader->init());
+    return Status::OK();
+}
+
+Status WalManager::create_wal_writer(int64_t wal_id, 
std::shared_ptr<WalWriter>& wal_writer) {
+    std::string wal_path;
+    get_wal_path(wal_id, wal_path);
+    std::vector<std::string> path_element;
+    doris::vectorized::WalReader::string_split(wal_path, "/", path_element);
+    std::stringstream ss;
+    for (int i = 0; i < path_element.size() - 1; i++) {
+        ss << path_element[i] << "/";
+    }
+    std::string base_path = ss.str();
+    bool exists = false;
+    io::global_local_filesystem()->exists(base_path, &exists);
+    if (!exists) {
+        io::global_local_filesystem()->create_directory(base_path);

Review Comment:
   same as above.



##########
be/src/olap/wal_manager.h:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/config.h"
+#include "gen_cpp/FrontendService.h"
+#include "gen_cpp/FrontendService_types.h"
+#include "gen_cpp/HeartbeatService_types.h"
+#include "olap/wal_reader.h"
+#include "olap/wal_table.h"
+#include "olap/wal_writer.h"
+#include "runtime/exec_env.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "util/thread.h"
+
+namespace doris {
+class WalManager {
+public:
+    WalManager(ExecEnv* exec_env, const std::string& wal_dir);
+    ~WalManager();
+    Status delete_wal(int64_t wal_id);
+    Status init();
+    Status scan_wals(const std::string& wal_path);
+    Status replay();
+    Status create_wal_reader(const std::string& wal_path, 
std::shared_ptr<WalReader>& wal_reader);
+    Status create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& 
wal_writer);
+    Status scan();
+    size_t get_wal_table_size(const std::string& table_id);
+    Status add_recover_wal(const std::string& db_id, const std::string& 
table_id,
+                           std::vector<std::string> wals);
+    Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id);
+    Status get_wal_path(int64_t wal_id, std::string& wal_path);
+
+private:
+    ExecEnv* _exec_env;
+    std::shared_mutex _lock;
+    scoped_refptr<Thread> _replay_thread;
+    CountDownLatch _stop_background_threads_latch;
+    std::map<std::string, std::shared_ptr<WalTable>> _table_map;
+    std::vector<std::string> wal_dirs;
+    std::shared_mutex _wal_lock;
+    std::unordered_map<int64_t, std::string> wal_path_map;

Review Comment:
   _wal_path_map



##########
be/src/olap/wal_manager.h:
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/config.h"
+#include "gen_cpp/FrontendService.h"
+#include "gen_cpp/FrontendService_types.h"
+#include "gen_cpp/HeartbeatService_types.h"
+#include "olap/wal_reader.h"
+#include "olap/wal_table.h"
+#include "olap/wal_writer.h"
+#include "runtime/exec_env.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "util/thread.h"
+
+namespace doris {
+class WalManager {
+public:
+    WalManager(ExecEnv* exec_env, const std::string& wal_dir);
+    ~WalManager();
+    Status delete_wal(int64_t wal_id);
+    Status init();
+    Status scan_wals(const std::string& wal_path);
+    Status replay();
+    Status create_wal_reader(const std::string& wal_path, 
std::shared_ptr<WalReader>& wal_reader);
+    Status create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& 
wal_writer);
+    Status scan();
+    size_t get_wal_table_size(const std::string& table_id);
+    Status add_recover_wal(const std::string& db_id, const std::string& 
table_id,
+                           std::vector<std::string> wals);
+    Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id);
+    Status get_wal_path(int64_t wal_id, std::string& wal_path);
+
+private:
+    ExecEnv* _exec_env;
+    std::shared_mutex _lock;
+    scoped_refptr<Thread> _replay_thread;
+    CountDownLatch _stop_background_threads_latch;
+    std::map<std::string, std::shared_ptr<WalTable>> _table_map;
+    std::vector<std::string> wal_dirs;

Review Comment:
   _wal_dirs



##########
be/src/olap/wal_manager.cpp:
##########
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/wal_manager.h"
+
+#include <thrift/protocol/TDebugProtocol.h>
+
+#include <chrono>
+#include <filesystem>
+
+#include "io/fs/local_file_system.h"
+#include "runtime/client_cache.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "util/path_util.h"
+#include "util/thrift_rpc_helper.h"
+#include "vec/exec/format/wal/wal_reader.h"
+
+namespace doris {
+WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
+        : _exec_env(exec_env), _stop_background_threads_latch(1) {
+    doris::vectorized::WalReader::string_split(wal_dir_list, ",", wal_dirs);
+}
+
+WalManager::~WalManager() {
+    _stop_background_threads_latch.count_down();
+    if (_replay_thread) {
+        _replay_thread->join();
+    }
+}
+
+Status WalManager::init() {
+    bool exists = false;
+    for (auto wal_dir : wal_dirs) {
+        std::string tmp_dir = wal_dir + "/tmp";
+        LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir;
+        io::global_local_filesystem()->exists(wal_dir, &exists);
+        if (!exists) {
+            io::global_local_filesystem()->create_directory(wal_dir);

Review Comment:
   handle error



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -1057,19 +1057,31 @@ private TLoadTxnBeginResult 
loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin
         if (Strings.isNullOrEmpty(request.getLabel())) {
             throw new UserException("empty label in begin request");
         }
+        OlapTable table;
+        Database db;
         // check database
         Env env = Env.getCurrentEnv();
-        String fullDbName = ClusterNamespace.getFullName(cluster, 
request.getDb());
-        Database db = env.getInternalCatalog().getDbNullable(fullDbName);
-        if (db == null) {
-            String dbName = fullDbName;
-            if (Strings.isNullOrEmpty(request.getCluster())) {
-                dbName = request.getDb();
+        if (Strings.isNullOrEmpty(request.getDb()) && request.getTableId() != 
-1) {

Review Comment:
   IfSetTableID && getTableID()
   ```suggestion
           if (Strings.isNullOrEmpty(request.getDb()) && request.getTableId() 
!= -1) {
   ```



##########
be/src/olap/wal_manager.cpp:
##########
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/wal_manager.h"
+
+#include <thrift/protocol/TDebugProtocol.h>
+
+#include <chrono>
+#include <filesystem>
+
+#include "io/fs/local_file_system.h"
+#include "runtime/client_cache.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "util/path_util.h"
+#include "util/thrift_rpc_helper.h"
+#include "vec/exec/format/wal/wal_reader.h"
+
+namespace doris {
+WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
+        : _exec_env(exec_env), _stop_background_threads_latch(1) {
+    doris::vectorized::WalReader::string_split(wal_dir_list, ",", wal_dirs);
+}
+
+WalManager::~WalManager() {
+    _stop_background_threads_latch.count_down();
+    if (_replay_thread) {
+        _replay_thread->join();
+    }
+}
+
+Status WalManager::init() {
+    bool exists = false;
+    for (auto wal_dir : wal_dirs) {
+        std::string tmp_dir = wal_dir + "/tmp";
+        LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir;
+        io::global_local_filesystem()->exists(wal_dir, &exists);
+        if (!exists) {
+            io::global_local_filesystem()->create_directory(wal_dir);
+        }
+        io::global_local_filesystem()->exists(tmp_dir, &exists);
+        if (!exists) {
+            io::global_local_filesystem()->create_directory(tmp_dir);
+        }
+        RETURN_IF_ERROR(scan_wals(wal_dir));
+    }
+    return Thread::create(
+            "WalMgr", "replay_wal", [this]() { this->replay(); }, 
&_replay_thread);
+}
+
+Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t 
wal_id) {
+    std::string base_path = wal_dirs.size() == 1 ? wal_dirs[0] : 
wal_dirs[rand() % wal_dirs.size()];
+    std::stringstream ss;
+    ss << base_path << "/" << std::to_string(db_id) << "/" << 
std::to_string(table_id) << "/"
+       << std::to_string(wal_id);
+    {
+        std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+        wal_path_map.emplace(wal_id, ss.str());
+    }
+    return Status::OK();
+}
+
+Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) {
+    std::shared_lock rdlock(_wal_lock);
+    auto it = wal_path_map.find(wal_id);
+    if (it != wal_path_map.end()) {
+        wal_path = wal_path_map[wal_id];
+    } else {
+        return Status::InternalError("can not find wal_id {} in wal_path_map", 
wal_id);
+    }
+    return Status::OK();
+}
+
+Status WalManager::create_wal_reader(const std::string& wal_path,
+                                     std::shared_ptr<WalReader>& wal_reader) {
+    wal_reader = std::make_shared<WalReader>(wal_path);
+    RETURN_IF_ERROR(wal_reader->init());
+    return Status::OK();
+}
+
+Status WalManager::create_wal_writer(int64_t wal_id, 
std::shared_ptr<WalWriter>& wal_writer) {
+    std::string wal_path;
+    get_wal_path(wal_id, wal_path);

Review Comment:
   handle return value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to