morningman commented on a change in pull request #5101:
URL: https://github.com/apache/incubator-doris/pull/5101#discussion_r555864664



##########
File path: be/src/http/action/tablet_migration_action.cpp
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/tablet_migration_action.h"
+
+#include <string>
+
+#include "gutil/strings/substitute.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "olap/task/engine_storage_migration_task.h"
+#include "util/json_util.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+TabletMigrationAction::TabletMigrationAction() {
+    _init_migration_action();
+}
+
+void TabletMigrationAction::_init_migration_action() {
+    int32_t max_thread_num = config::max_tablet_migration_threads;
+    int32_t min_thread_num = config::min_tablet_migration_threads;
+    ThreadPoolBuilder("MigrationTaskThreadPool")
+            .set_min_threads(min_thread_num)
+            .set_max_threads(max_thread_num)
+            .build(&_migration_thread_pool);
+}
+
+void TabletMigrationAction::handle(HttpRequest* req) {
+    int64_t tablet_id = 0;
+    int32_t schema_hash = 0;
+    string dest_disk = "";
+    string goal = "";
+    Status status = _check_param(req, tablet_id, schema_hash, dest_disk, goal);
+    if (status.ok()) {
+        if (goal == "run") {
+            MigrationTask current_task(tablet_id, schema_hash, dest_disk);
+            TabletSharedPtr tablet;
+            DataDir* dest_store;
+            Status status =
+                    _check_migrate_request(tablet_id, schema_hash, dest_disk, 
tablet, &dest_store);
+            if (status.ok()) {
+                do {
+                    {
+                        std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                        std::map<MigrationTask, std::string>::iterator it_task 
=
+                                _migration_tasks.begin();
+                        for (; it_task != _migration_tasks.end(); it_task++) {
+                            if (tablet_id == (it_task->first)._tablet_id &&
+                                schema_hash == (it_task->first)._schema_hash) {
+                                status = 
Status::AlreadyExist(strings::Substitute(
+                                        "There is a migration task for this 
tablet already exists. "
+                                        "dest_disk is $0 .",
+                                        (it_task->first)._dest_disk));
+                                break;
+                            }
+                        }
+                        if (it_task != _migration_tasks.end()) {
+                            break;
+                        }
+                    }
+                    {
+                        std::unique_lock<std::mutex> 
lock(_migration_status_mutex);

Review comment:
       Looks like you can move this block to the above code block, to avoid 
lock `_migration_status_mutex ` twice?

##########
File path: be/src/http/action/tablet_migration_action.cpp
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/tablet_migration_action.h"
+
+#include <string>
+
+#include "gutil/strings/substitute.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "olap/task/engine_storage_migration_task.h"
+#include "util/json_util.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+TabletMigrationAction::TabletMigrationAction() {
+    _init_migration_action();
+}
+
+void TabletMigrationAction::_init_migration_action() {
+    int32_t max_thread_num = config::max_tablet_migration_threads;
+    int32_t min_thread_num = config::min_tablet_migration_threads;
+    ThreadPoolBuilder("MigrationTaskThreadPool")
+            .set_min_threads(min_thread_num)
+            .set_max_threads(max_thread_num)
+            .build(&_migration_thread_pool);
+}
+
+void TabletMigrationAction::handle(HttpRequest* req) {
+    int64_t tablet_id = 0;
+    int32_t schema_hash = 0;
+    string dest_disk = "";
+    string goal = "";
+    Status status = _check_param(req, tablet_id, schema_hash, dest_disk, goal);
+    if (status.ok()) {
+        if (goal == "run") {
+            MigrationTask current_task(tablet_id, schema_hash, dest_disk);
+            TabletSharedPtr tablet;
+            DataDir* dest_store;
+            Status status =
+                    _check_migrate_request(tablet_id, schema_hash, dest_disk, 
tablet, &dest_store);
+            if (status.ok()) {
+                do {
+                    {
+                        std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                        std::map<MigrationTask, std::string>::iterator it_task 
=
+                                _migration_tasks.begin();
+                        for (; it_task != _migration_tasks.end(); it_task++) {
+                            if (tablet_id == (it_task->first)._tablet_id &&
+                                schema_hash == (it_task->first)._schema_hash) {
+                                status = 
Status::AlreadyExist(strings::Substitute(
+                                        "There is a migration task for this 
tablet already exists. "
+                                        "dest_disk is $0 .",
+                                        (it_task->first)._dest_disk));
+                                break;
+                            }
+                        }
+                        if (it_task != _migration_tasks.end()) {
+                            break;
+                        }
+                    }
+                    {
+                        std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                        _migration_tasks[current_task] = "submitted";
+                    }
+                    auto st = _migration_thread_pool->submit_func([&, 
tablet_id, schema_hash,
+                                                                   dest_disk, 
current_task]() {
+                        {
+                            std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                            std::map<MigrationTask, std::string>::iterator 
it_task =
+                                    _migration_tasks.find(current_task);
+                            if (it_task != _migration_tasks.end()) {
+                                it_task->second = "running";
+                            }
+                        }
+                        Status result_status = 
_execute_tablet_migration(tablet, dest_store);
+                        {
+                            std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                            std::map<MigrationTask, std::string>::iterator 
it_task =
+                                    _migration_tasks.find(current_task);
+                            if (it_task != _migration_tasks.end()) {
+                                _migration_tasks.erase(it_task);
+                            }
+                            std::pair<MigrationTask, Status> finished_task =
+                                    make_pair(current_task, result_status);
+                            if (_finished_migration_tasks.size() >=
+                                config::finished_migration_tasks_size) {
+                                _finished_migration_tasks.pop_front();
+                            }
+                            _finished_migration_tasks.push_back(finished_task);
+                        }
+                    });
+                    if (!st.ok()) {
+                        status = Status::InternalError("Migration task 
submission failed");
+                        std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                        std::map<MigrationTask, std::string>::iterator it_task 
=
+                                _migration_tasks.find(current_task);
+                        if (it_task != _migration_tasks.end()) {
+                            _migration_tasks.erase(it_task);
+                        }
+                    }
+                } while (0);
+            }
+            std::string status_result;
+            if (!status.ok()) {
+                status_result = to_json(status);
+            } else {
+                status_result =
+                        "{\"status\": \"Success\", \"msg\": \"migration task 
is successfully "
+                        "submitted.\"}";
+            }
+            req->add_output_header(HttpHeaders::CONTENT_TYPE, 
HEADER_JSON.c_str());
+            HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        } else {
+            DCHECK(goal == "status");
+            std::string status_result;
+            do {
+                std::unique_lock<std::mutex> lock(_migration_status_mutex);
+                std::map<MigrationTask, std::string>::iterator it_task = 
_migration_tasks.begin();
+                for (; it_task != _migration_tasks.end(); it_task++) {

Review comment:
       `_migration_tasks` is a map, so why not just find the task by key?
   You can refer to `https://www.cnblogs.com/xupeidong/p/11976671.html`

##########
File path: be/src/http/action/tablet_migration_action.cpp
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/tablet_migration_action.h"
+
+#include <string>
+
+#include "gutil/strings/substitute.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "olap/task/engine_storage_migration_task.h"
+#include "util/json_util.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+TabletMigrationAction::TabletMigrationAction() {
+    _init_migration_action();
+}
+
+void TabletMigrationAction::_init_migration_action() {
+    int32_t max_thread_num = config::max_tablet_migration_threads;
+    int32_t min_thread_num = config::min_tablet_migration_threads;
+    ThreadPoolBuilder("MigrationTaskThreadPool")
+            .set_min_threads(min_thread_num)
+            .set_max_threads(max_thread_num)
+            .build(&_migration_thread_pool);
+}
+
+void TabletMigrationAction::handle(HttpRequest* req) {
+    int64_t tablet_id = 0;
+    int32_t schema_hash = 0;
+    string dest_disk = "";
+    string goal = "";
+    Status status = _check_param(req, tablet_id, schema_hash, dest_disk, goal);
+    if (status.ok()) {
+        if (goal == "run") {
+            MigrationTask current_task(tablet_id, schema_hash, dest_disk);
+            TabletSharedPtr tablet;
+            DataDir* dest_store;
+            Status status =
+                    _check_migrate_request(tablet_id, schema_hash, dest_disk, 
tablet, &dest_store);
+            if (status.ok()) {
+                do {
+                    {
+                        std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                        std::map<MigrationTask, std::string>::iterator it_task 
=
+                                _migration_tasks.begin();
+                        for (; it_task != _migration_tasks.end(); it_task++) {
+                            if (tablet_id == (it_task->first)._tablet_id &&
+                                schema_hash == (it_task->first)._schema_hash) {
+                                status = 
Status::AlreadyExist(strings::Substitute(
+                                        "There is a migration task for this 
tablet already exists. "
+                                        "dest_disk is $0 .",
+                                        (it_task->first)._dest_disk));
+                                break;
+                            }
+                        }
+                        if (it_task != _migration_tasks.end()) {
+                            break;
+                        }
+                    }
+                    {
+                        std::unique_lock<std::mutex> 
lock(_migration_status_mutex);

Review comment:
       And the `"submitted"` status seems not necessary? It will be changed to 
"running" very soon. And it it submit failed, it will be removed from 
_migration_tasks.

##########
File path: be/src/http/action/tablet_migration_action.cpp
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/tablet_migration_action.h"
+
+#include <string>
+
+#include "gutil/strings/substitute.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "olap/task/engine_storage_migration_task.h"
+#include "util/json_util.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+TabletMigrationAction::TabletMigrationAction() {
+    _init_migration_action();
+}
+
+void TabletMigrationAction::_init_migration_action() {
+    int32_t max_thread_num = config::max_tablet_migration_threads;
+    int32_t min_thread_num = config::min_tablet_migration_threads;
+    ThreadPoolBuilder("MigrationTaskThreadPool")
+            .set_min_threads(min_thread_num)
+            .set_max_threads(max_thread_num)
+            .build(&_migration_thread_pool);
+}
+
+void TabletMigrationAction::handle(HttpRequest* req) {
+    int64_t tablet_id = 0;
+    int32_t schema_hash = 0;
+    string dest_disk = "";
+    string goal = "";
+    Status status = _check_param(req, tablet_id, schema_hash, dest_disk, goal);
+    if (status.ok()) {
+        if (goal == "run") {
+            MigrationTask current_task(tablet_id, schema_hash, dest_disk);
+            TabletSharedPtr tablet;
+            DataDir* dest_store;
+            Status status =
+                    _check_migrate_request(tablet_id, schema_hash, dest_disk, 
tablet, &dest_store);
+            if (status.ok()) {
+                do {
+                    {
+                        std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                        std::map<MigrationTask, std::string>::iterator it_task 
=
+                                _migration_tasks.begin();
+                        for (; it_task != _migration_tasks.end(); it_task++) {
+                            if (tablet_id == (it_task->first)._tablet_id &&
+                                schema_hash == (it_task->first)._schema_hash) {
+                                status = 
Status::AlreadyExist(strings::Substitute(
+                                        "There is a migration task for this 
tablet already exists. "
+                                        "dest_disk is $0 .",
+                                        (it_task->first)._dest_disk));
+                                break;
+                            }
+                        }
+                        if (it_task != _migration_tasks.end()) {
+                            break;
+                        }
+                    }
+                    {
+                        std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                        _migration_tasks[current_task] = "submitted";
+                    }
+                    auto st = _migration_thread_pool->submit_func([&, 
tablet_id, schema_hash,
+                                                                   dest_disk, 
current_task]() {
+                        {
+                            std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                            std::map<MigrationTask, std::string>::iterator 
it_task =
+                                    _migration_tasks.find(current_task);
+                            if (it_task != _migration_tasks.end()) {
+                                it_task->second = "running";
+                            }
+                        }
+                        Status result_status = 
_execute_tablet_migration(tablet, dest_store);
+                        {
+                            std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                            std::map<MigrationTask, std::string>::iterator 
it_task =
+                                    _migration_tasks.find(current_task);
+                            if (it_task != _migration_tasks.end()) {
+                                _migration_tasks.erase(it_task);
+                            }
+                            std::pair<MigrationTask, Status> finished_task =
+                                    make_pair(current_task, result_status);
+                            if (_finished_migration_tasks.size() >=
+                                config::finished_migration_tasks_size) {
+                                _finished_migration_tasks.pop_front();
+                            }
+                            _finished_migration_tasks.push_back(finished_task);
+                        }
+                    });
+                    if (!st.ok()) {
+                        status = Status::InternalError("Migration task 
submission failed");
+                        std::unique_lock<std::mutex> 
lock(_migration_status_mutex);
+                        std::map<MigrationTask, std::string>::iterator it_task 
=
+                                _migration_tasks.find(current_task);
+                        if (it_task != _migration_tasks.end()) {
+                            _migration_tasks.erase(it_task);
+                        }
+                    }
+                } while (0);
+            }
+            std::string status_result;
+            if (!status.ok()) {
+                status_result = to_json(status);
+            } else {
+                status_result =
+                        "{\"status\": \"Success\", \"msg\": \"migration task 
is successfully "
+                        "submitted.\"}";
+            }
+            req->add_output_header(HttpHeaders::CONTENT_TYPE, 
HEADER_JSON.c_str());
+            HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        } else {
+            DCHECK(goal == "status");
+            std::string status_result;
+            do {
+                std::unique_lock<std::mutex> lock(_migration_status_mutex);
+                std::map<MigrationTask, std::string>::iterator it_task = 
_migration_tasks.begin();
+                for (; it_task != _migration_tasks.end(); it_task++) {
+                    if (tablet_id == (it_task->first)._tablet_id &&
+                        schema_hash == (it_task->first)._schema_hash) {
+                        status_result = "{\"status\": \"Success\", \"msg\": 
\"migration task is " +
+                                        it_task->second + "\", \"dest_disk\": 
\"" +
+                                        (it_task->first)._dest_disk + "\"}";
+                        break;
+                    }
+                }
+                if (it_task != _migration_tasks.end()) {
+                    break;
+                }
+
+                int i = _finished_migration_tasks.size() - 1;
+                for (; i >= 0; i--) {
+                    MigrationTask finished_task = 
_finished_migration_tasks[i].first;
+                    if (finished_task._tablet_id == tablet_id &&
+                        finished_task._schema_hash == schema_hash) {
+                        status = _finished_migration_tasks[i].second;
+                        if (status.ok()) {
+                            status_result =
+                                    "{\"status\": \"Success\", \"msg\": 
\"migration task has "
+                                    "finished successfully\", \"dest_disk\": 
\"" +
+                                    finished_task._dest_disk + "\"}";
+                        }
+                        break;
+                    }
+                }
+                if (i < 0) {
+                    status = Status::NotFound("Migration task not found");
+                }
+            } while (0);
+            if (!status.ok()) {
+                status_result = to_json(status);
+            }
+            req->add_output_header(HttpHeaders::CONTENT_TYPE, 
HEADER_JSON.c_str());
+            HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        }
+    } else {
+        std::string status_result = to_json(status);
+        req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
+        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+    }
+}
+
+Status TabletMigrationAction::_check_param(HttpRequest* req, int64_t& 
tablet_id,
+                                           int32_t& schema_hash, string& 
dest_disk, string& goal) {
+    const std::string& req_tablet_id = req->param("tablet_id");
+    const std::string& req_schema_hash = req->param("schema_hash");
+    try {
+        tablet_id = std::stoull(req_tablet_id);
+        schema_hash = std::stoul(req_schema_hash);
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "invalid argument.tablet_id:" << req_tablet_id
+                     << ", schema_hash:" << req_schema_hash;
+        return Status::InternalError(strings::Substitute("Convert failed, $0", 
e.what()));
+    }
+    dest_disk = req->param("disk");
+    goal = req->param("goal");
+    if (goal != "run" && goal != "status") {
+        return Status::InternalError(strings::Substitute("invalid goal 
argument."));
+    }
+    return Status::OK();
+}
+
+Status TabletMigrationAction::_check_migrate_request(int64_t tablet_id, 
int32_t schema_hash,
+                                                     string dest_disk, 
TabletSharedPtr& tablet,
+                                                     DataDir** dest_store) {
+    tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash);
+    if (tablet == nullptr) {
+        LOG(WARNING) << "no tablet for tablet_id:" << tablet_id << " schema 
hash:" << schema_hash;
+        return Status::NotFound("Tablet not found");
+    }
+
+    // request specify the data dir
+    *dest_store = StorageEngine::instance()->get_store(dest_disk);
+    if (*dest_store == nullptr) {
+        LOG(WARNING) << "data dir not found: " << dest_disk;
+        return Status::NotFound("Disk not found");
+    }
+
+    if (tablet->data_dir() == *dest_store) {
+        LOG(WARNING) << "tablet already exist in destine disk: " << dest_disk;
+        return Status::AlreadyExist("Tablet already exist in destination 
disk");
+    }
+
+    // check disk capacity
+    int64_t tablet_size = tablet->tablet_footprint();
+    if ((*dest_store)->reach_capacity_limit(tablet_size)) {
+        LOG(WARNING) << "reach the capacity limit of path: " << 
(*dest_store)->path()
+                     << ", tablet size: " << tablet_size;
+        return Status::InternalError("Insufficient disk capacity");
+    }
+
+    return Status::OK();
+}
+
+Status TabletMigrationAction::_execute_tablet_migration(TabletSharedPtr tablet,
+                                                        DataDir* dest_store) {
+    int64_t tablet_id = tablet->tablet_id();
+    int32_t schema_hash = tablet->schema_hash();
+    string dest_disk = dest_store->path();
+    EngineStorageMigrationTask engine_task(tablet, dest_store);
+    OLAPStatus res = StorageEngine::instance()->execute_task(&engine_task);
+    Status status = Status::OK();
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "tablet migrate failed. tablet_id=" << tablet_id
+                     << ", schema_hash=" << schema_hash << ", dest_disk=" << 
dest_disk
+                     << ", status:" << res;
+        status = Status::InternalError("migration task failed");

Review comment:
       better to add `res` to the message in `status`, so that user can known 
what's wrong




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to