Tanya-W commented on code in PR #19063:
URL: https://github.com/apache/doris/pull/19063#discussion_r1203734755


##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -498,116 +499,59 @@ void 
TaskWorkerPool::_drop_tablet_worker_thread_callback() {
 void TaskWorkerPool::_alter_inverted_index_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
+        TAlterInvertedIndexReq alter_inverted_index_rq;
+
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
 
             agent_task_req = _tasks.front();
+            alter_inverted_index_rq = agent_task_req.alter_inverted_index_req;
             _tasks.pop_front();
         }
-        int64_t signature = agent_task_req.signature;
-        LOG(INFO) << "get alter inverted index task, signature: " << 
agent_task_req.signature;
-        bool is_task_timeout = false;
-        if (agent_task_req.__isset.recv_time) {
-            int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
-            if (time_elapsed > config::report_task_interval_seconds * 20) {
-                LOG(INFO) << "task elapsed " << time_elapsed
-                          << " seconds since it is inserted to queue, it is 
timeout";
-                is_task_timeout = true;
-            }
-        }
-        if (!is_task_timeout) {
-            TFinishTaskRequest finish_task_request;
-            TTaskType::type task_type = agent_task_req.task_type;
-            switch (task_type) {
-            case TTaskType::ALTER_INVERTED_INDEX:
-                _alter_inverted_index(agent_task_req, signature, task_type, 
&finish_task_request);
-                break;
-            default:
-                // pass
-                break;
-            }
-            _finish_task(finish_task_request);
-        }
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
-    }
-}
-
-void TaskWorkerPool::_alter_inverted_index(const TAgentTaskRequest& 
alter_inverted_index_request,
-                                           int64_t signature, const 
TTaskType::type task_type,
-                                           TFinishTaskRequest* 
finish_task_request) {
-    Status status = Status::OK();
-    string process_name;
-    switch (task_type) {
-    case TTaskType::ALTER_INVERTED_INDEX:
-        process_name = "AlterInvertedIndex";
-        break;
-    default:
-        std::string task_name;
-        EnumToString(TTaskType, task_type, task_name);
-        LOG(WARNING) << "schema change type invalid. type: " << task_name
-                     << ", signature: " << signature;
-        status = Status::NotSupported("Schema change type invalid");
-        break;
-    }
-
-    TTabletId tablet_id;
-    TSchemaHash schema_hash = 0;
-    if (status.ok()) {
-        tablet_id = 
alter_inverted_index_request.alter_inverted_index_req.tablet_id;
-        schema_hash = 
alter_inverted_index_request.alter_inverted_index_req.schema_hash;
-        EngineAlterInvertedIndexTask engine_task(
-                alter_inverted_index_request.alter_inverted_index_req);
-        Status sc_status = _env->storage_engine()->execute_task(&engine_task);
-        if (!sc_status.ok()) {
-            status = Status::DataQualityError("The data quality does not 
satisfy");
-        } else {
-            status = Status::OK();
-        }
-    }
 
-    if (status.ok()) {
-        ++_s_report_version;
-        LOG(INFO) << process_name << " finished. signature: " << signature;
-    }
-
-    // Return result to fe
-    finish_task_request->__set_backend(BackendOptions::get_local_backend());
-    finish_task_request->__set_report_version(_s_report_version);
-    finish_task_request->__set_task_type(task_type);
-    finish_task_request->__set_signature(signature);
+        LOG(INFO) << "get alter inverted index task. signature=" << 
agent_task_req.signature
+                  << ", tablet_id=" << alter_inverted_index_rq.tablet_id
+                  << ", job_id=" << alter_inverted_index_rq.job_id;
 
-    std::vector<TTabletInfo> finish_tablet_infos;
-    if (status.ok()) {
-        TTabletInfo tablet_info;
-        status = _get_tablet_info(tablet_id, schema_hash, signature, 
&tablet_info);
+        TabletSharedPtr tablet_ptr =
+                
StorageEngine::instance()->tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id);
+        if (tablet_ptr != nullptr) {
+            // auto data_dir = tablet_ptr->data_dir();
+            // Return result to fe
+            Status status = Status::OK();
+            TFinishTaskRequest finish_task_request;
+            finish_task_request.__set_backend(_backend);
+            finish_task_request.__set_task_type(agent_task_req.task_type);
+            finish_task_request.__set_signature(agent_task_req.signature);
 
-        if (!status.ok()) {
-            LOG(WARNING) << process_name << " success, but get tablet info 
failed."
-                         << "tablet_id: " << tablet_id << ", schema_hash: " << 
schema_hash
-                         << ", signature: " << signature;
-        } else {
-            finish_tablet_infos.push_back(tablet_info);
+            EngineIndexChangeTask engine_task(alter_inverted_index_rq);
+            status = _env->storage_engine()->execute_task(&engine_task);
+            std::vector<TTabletInfo> finish_tablet_infos;
+            if (!status.ok()) {
+                LOG(WARNING) << "failed to alter inverted index task, 
signature=" << agent_task_req.signature
+                             << ", job_id=" << alter_inverted_index_rq.job_id
+                             << ", error=" << status;
+            } else {
+                LOG(WARNING) << "successfully alter inverted index task, 
signature=" << agent_task_req.signature
+                             << ", job_id=" << alter_inverted_index_rq.job_id;
+                TTabletInfo tablet_info;
+                status = _get_tablet_info(alter_inverted_index_rq.tablet_id, 
alter_inverted_index_rq.schema_hash,
+                                          agent_task_req.signature, 
&tablet_info);
+                if (status.ok()) {
+                    finish_tablet_infos.push_back(tablet_info);
+                }
+                
finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
+            }
+            finish_task_request.__set_task_status(status.to_thrift());
+            _finish_task(finish_task_request);
+            _remove_task_info(agent_task_req.task_type, 
agent_task_req.signature);
         }

Review Comment:
   this is a bug, I will fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to