Copilot commented on code in PR #60284:
URL: https://github.com/apache/doris/pull/60284#discussion_r2732414853


##########
be/src/runtime/load_stream.h:
##########
@@ -42,7 +42,7 @@ class OlapTableSchemaParam;
 // origin_segid(index) -> new_segid(value in vector)
 using SegIdMapping = std::vector<uint32_t>;
 using FailedTablets = std::vector<std::pair<int64_t, Status>>;
-class TabletStream : public std::enable_shared_from_this<TabletStream> {
+class TabletStream {

Review Comment:
   TabletStream should have a destructor that calls wait_for_flush_tasks() to 
ensure flush tasks are properly handled before destruction. Currently, the code 
relies on IndexStream::~IndexStream() to call wait_for_flush_tasks() on all 
TabletStreams, but this creates a fragile dependency. If a TabletStream is ever 
destroyed outside of this path (e.g., if future code holds a separate 
shared_ptr reference), the flush tasks won't be waited for, potentially causing 
use-after-free issues with the raw 'this' pointer captured in lambdas.



##########
be/src/runtime/load_stream.cpp:
##########
@@ -294,21 +290,36 @@ Status 
TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
     return st;
 }
 
-void TabletStream::pre_close() {
+void TabletStream::wait_for_flush_tasks() {
+    {
+        std::lock_guard lock_guard(_lock);
+        if (_flush_tasks_done) {
+            return;
+        }
+        _flush_tasks_done = true;
+    }
+
     if (!_status.ok()) {
-        // cancel all pending tasks, wait all running tasks to finish
         _flush_token->shutdown();
         return;
     }
 
-    SCOPED_TIMER(_close_wait_timer);
-    _status.update(_run_in_heavy_work_pool([this]() {
+    // Use heavy_work_pool to avoid blocking bthread
+    auto st = _run_in_heavy_work_pool([this]() {
         _flush_token->wait();

Review Comment:
   There is a potential race condition between wait_for_flush_tasks() and 
append_data()/add_segment(). After _flush_tasks_done is set to true and the 
lock is released (line 300), new tasks could still be submitted via 
append_data() or add_segment() before _flush_token->wait() is called (line 
309). While the expected usage pattern is that no data is appended after 
close(), there is no explicit check to prevent this. Consider adding a check 
for _flush_tasks_done in append_data() and add_segment() to reject new 
submissions after wait_for_flush_tasks() has been called, similar to the 
_status check at the beginning of these methods.



##########
be/src/runtime/load_stream.cpp:
##########
@@ -294,21 +290,36 @@ Status 
TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
     return st;
 }
 
-void TabletStream::pre_close() {
+void TabletStream::wait_for_flush_tasks() {
+    {
+        std::lock_guard lock_guard(_lock);
+        if (_flush_tasks_done) {
+            return;
+        }
+        _flush_tasks_done = true;
+    }
+
     if (!_status.ok()) {
-        // cancel all pending tasks, wait all running tasks to finish
         _flush_token->shutdown();
         return;
     }
 
-    SCOPED_TIMER(_close_wait_timer);
-    _status.update(_run_in_heavy_work_pool([this]() {
+    // Use heavy_work_pool to avoid blocking bthread
+    auto st = _run_in_heavy_work_pool([this]() {
         _flush_token->wait();
         return Status::OK();
-    }));
-    // it is necessary to check status after wait_func,
-    // for create_rowset could fail during add_segment when loading to MOW 
table,
-    // in this case, should skip close to avoid submit_calc_delete_bitmap_task 
which could cause coredump.
+    });
+    if (!st.ok()) {
+        // If heavy_work_pool is unavailable, fall back to shutdown
+        // which will cancel pending tasks and wait for running tasks
+        _flush_token->shutdown();
+        _status.update(st);
+    }
+}

Review Comment:
   The SCOPED_TIMER for _close_wait_timer was removed from the flush token wait 
logic. Previously, the time spent waiting for flush tasks was tracked under 
CloseWaitTime, but now wait_for_flush_tasks() has no timing instrumentation. 
This means the time spent waiting for flush tasks to complete is no longer 
visible in performance profiles. Consider adding timing instrumentation to 
wait_for_flush_tasks() if this metric is important for performance monitoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to