This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b86dd11a7d [fix](pipeline) refactor olap table sink close (#20771)
b86dd11a7d is described below

commit b86dd11a7d7aed5696cfa1eccb5f4af9889f1ae8
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Tue Jul 4 11:27:51 2023 +0800

    [fix](pipeline) refactor olap table sink close (#20771)
    
    For pipeline, olap table sink close is divided into three stages, 
try_close() --> pending_finish() --> close()
    only after all node channels are done or canceled, pending_finish() will 
return false, close() will start.
    this will avoid block pipeline on close().
    
    In close, check the index channel intolerable failure status after each 
node channel failure,
    if intolerable failure is true, the close will be terminated in advance, 
and all node channels will be canceled to avoid meaningless blocking.
---
 be/src/exec/data_sink.h                |   5 +
 be/src/pipeline/exec/operator.h        |   9 +-
 be/src/pipeline/exec/scan_operator.cpp |   4 +-
 be/src/pipeline/exec/scan_operator.h   |   2 +-
 be/src/pipeline/pipeline_task.cpp      |   3 +-
 be/src/vec/exec/scan/vscan_node.cpp    |   2 +-
 be/src/vec/exec/scan/vscan_node.h      |   2 +-
 be/src/vec/sink/vtablet_sink.cpp       | 272 +++++++++++++++++++++------------
 be/src/vec/sink/vtablet_sink.h         |  32 +++-
 9 files changed, 227 insertions(+), 104 deletions(-)

diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 542dc428cc..cf7b774fcd 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -66,6 +66,11 @@ public:
     virtual Status send(RuntimeState* state, vectorized::Block* block, bool 
eos = false) {
         return Status::NotSupported("Not support send block");
     }
+
+    virtual void try_close(RuntimeState* state, Status exec_status) {}
+
+    virtual bool is_close_done() { return true; }
+
     // Releases all resources that were allocated in prepare()/send().
     // Further send() calls are illegal after calling close().
     // It must be okay to call this multiple times. Subsequent calls should
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index fcd22eedcb..12a117b4c4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -231,7 +231,7 @@ public:
      */
     virtual bool is_pending_finish() const { return false; }
 
-    virtual Status try_close() { return Status::OK(); }
+    virtual Status try_close(RuntimeState* state) { return Status::OK(); }
 
     bool is_closed() const { return _is_closed; }
 
@@ -284,6 +284,13 @@ public:
         return Status::OK();
     }
 
+    Status try_close(RuntimeState* state) override {
+        _sink->try_close(state, state->query_status());
+        return Status::OK();
+    }
+
+    [[nodiscard]] bool is_pending_finish() const override { return 
!_sink->is_close_done(); }
+
     Status close(RuntimeState* state) override {
         if (is_closed()) {
             return Status::OK();
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 83e41d93ba..f34461a9fd 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -56,8 +56,8 @@ bool ScanOperator::is_pending_finish() const {
     return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
 }
 
-Status ScanOperator::try_close() {
-    return _node->try_close();
+Status ScanOperator::try_close(RuntimeState* state) {
+    return _node->try_close(state);
 }
 
 bool ScanOperator::runtime_filters_are_ready_or_timeout() {
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index dbcf8f0ed7..850a1ab020 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -50,7 +50,7 @@ public:
 
     std::string debug_string() const override;
 
-    Status try_close() override;
+    Status try_close(RuntimeState* state) override;
 };
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index a6dfd238c2..11aeb620fa 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -265,7 +265,8 @@ Status PipelineTask::finalize() {
 }
 
 Status PipelineTask::try_close() {
-    return _source->try_close();
+    _sink->try_close(_state);
+    return _source->try_close(_state);
 }
 
 Status PipelineTask::close() {
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 2af6fc87c5..aaefef32ee 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -334,7 +334,7 @@ void VScanNode::release_resource(RuntimeState* state) {
     ExecNode::release_resource(state);
 }
 
-Status VScanNode::try_close() {
+Status VScanNode::try_close(RuntimeState* state) {
     if (_scanner_ctx.get()) {
         // mark this scanner ctx as should_stop to make sure scanners will not 
be scheduled anymore
         // TODO: there is a lock in `set_should_stop` may cause some slight 
impact
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 112ca47b54..ee0dadefdc 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -154,7 +154,7 @@ public:
     Status alloc_resource(RuntimeState* state) override;
     void release_resource(RuntimeState* state) override;
 
-    Status try_close();
+    Status try_close(RuntimeState* state);
 
     bool should_run_serial() const {
         return _should_run_serial || _state->enable_scan_node_run_serial();
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index f95c2b13c5..e1870e48e8 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -119,6 +119,7 @@ public:
                                                       
vnode_channel->channel_info(), ss.str()),
                                           -1);
         }
+        done = true;
     }
 
     void join() { brpc::Join(cntl.call_id()); }
@@ -128,6 +129,7 @@ public:
     VNodeChannel* vnode_channel;
     IndexChannel* index_channel;
     int64_t partition_id;
+    std::atomic<bool> done {false};
 };
 
 IndexChannel::~IndexChannel() {}
@@ -552,6 +554,15 @@ void VNodeChannel::open_partition_wait() {
     }
 }
 
+bool VNodeChannel::open_partition_finished() const {
+    for (auto& open_partition_closure : _open_partition_closures) {
+        if (!open_partition_closure->done) {
+            return false;
+        }
+    }
+    return true;
+}
+
 Status VNodeChannel::add_block(vectorized::Block* block, const Payload* 
payload, bool is_append) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     if (payload->second.empty()) {
@@ -681,6 +692,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* 
state,
     }
 
     if (!_add_block_closure->try_set_in_flight()) {
+        // There is packet in flight, skip.
         return _send_finished ? 0 : 1;
     }
 
@@ -858,6 +870,10 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
 }
 
 void VNodeChannel::cancel(const std::string& cancel_msg) {
+    if (_is_closed) {
+        // skip the channels that have been canceled or close_wait.
+        return;
+    }
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     // set _is_closed to true finally
     Defer set_closed {[&]() {
@@ -888,6 +904,11 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
     request.release_id();
 }
 
+bool VNodeChannel::is_rpc_done() const {
+    return (_add_batches_finished || (_cancelled && 
!_add_block_closure->is_packet_in_flight())) &&
+           open_partition_finished();
+}
+
 Status VNodeChannel::close_wait(RuntimeState* state) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     // set _is_closed to true finally
@@ -908,6 +929,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
     }
 
     // waiting for finished, it may take a long time, so we couldn't set a 
timeout
+    // In pipeline, is_close_done() is false at this time, will not bock.
     while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
         // std::this_thread::sleep_for(std::chrono::milliseconds(1));
         bthread_usleep(1000);
@@ -925,15 +947,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
         return Status::OK();
     }
 
-    std::stringstream ss;
-    ss << "close wait failed coz rpc error";
-    {
-        std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
-        if (_cancel_msg != "") {
-            ss << ". " << _cancel_msg;
-        }
-    }
-    return Status::InternalError(ss.str());
+    return Status::InternalError(get_cancel_msg());
 }
 
 void VNodeChannel::_close_check() {
@@ -1121,14 +1135,14 @@ Status VOlapTableSink::open(RuntimeState* state) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     fmt::memory_buffer buf;
-    for (auto index_channel : _channels) {
+    for (const auto& index_channel : _channels) {
         fmt::format_to(buf, "index id:{}", index_channel->_index_id);
         index_channel->for_each_node_channel(
                 [](const std::shared_ptr<VNodeChannel>& ch) { ch->open(); });
     }
     VLOG_DEBUG << "list of open index id = " << fmt::to_string(buf);
 
-    for (auto index_channel : _channels) {
+    for (const auto& index_channel : _channels) {
         index_channel->for_each_node_channel([&index_channel](
                                                      const 
std::shared_ptr<VNodeChannel>& ch) {
             auto st = ch->open_wait();
@@ -1181,7 +1195,7 @@ void VOlapTableSink::_send_batch_process() {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     while (true) {
         int running_channels_num = 0;
-        for (auto index_channel : _channels) {
+        for (const auto& index_channel : _channels) {
             index_channel->for_each_node_channel([&running_channels_num,
                                                   this](const 
std::shared_ptr<VNodeChannel>& ch) {
                 running_channels_num +=
@@ -1201,8 +1215,8 @@ void VOlapTableSink::_send_batch_process() {
 
 size_t VOlapTableSink::get_pending_bytes() const {
     size_t mem_consumption = 0;
-    for (auto& indexChannel : _channels) {
-        mem_consumption += indexChannel->get_pending_bytes();
+    for (const auto& index_channel : _channels) {
+        mem_consumption += index_channel->get_pending_bytes();
     }
     return mem_consumption;
 }
@@ -1453,13 +1467,95 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
     return Status::OK();
 }
 
+Status VOlapTableSink::_cancel_channel_and_check_intolerable_failure(
+        Status status, const std::string& err_msg, const 
std::shared_ptr<IndexChannel> ich,
+        const std::shared_ptr<VNodeChannel> nch) {
+    LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << 
err_msg;
+    ich->mark_as_failed(nch->node_id(), nch->host(), err_msg, -1);
+    // cancel the node channel in best effort
+    nch->cancel(err_msg);
+
+    // check if index has intolerable failure
+    Status index_st = ich->check_intolerable_failure();
+    if (!index_st.ok()) {
+        status = index_st;
+    } else if (Status st = ich->check_tablet_received_rows_consistency(); 
!st.ok()) {
+        status = st;
+    }
+    return status;
+}
+
+void VOlapTableSink::_cancel_all_channel(Status status) {
+    for (const auto& index_channel : _channels) {
+        index_channel->for_each_node_channel([&status](const 
std::shared_ptr<VNodeChannel>& ch) {
+            ch->cancel(status.to_string());
+        });
+    }
+    LOG(INFO) << fmt::format(
+            "close olap table sink. load_id={}, txn_id={}, canceled all node 
channels due to "
+            "error: {}",
+            print_id(_load_id), _txn_id, status);
+}
+
+void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
+    if (_try_close) {
+        return;
+    }
+    SCOPED_TIMER(_close_timer);
+    Status status = exec_status;
+    if (status.ok()) {
+        // only if status is ok can we call this 
_profile->total_time_counter().
+        // if status is not ok, this sink may not be prepared, so that 
_profile is null
+        SCOPED_TIMER(_profile->total_time_counter());
+        {
+            for (const auto& index_channel : _channels) {
+                if (!status.ok()) {
+                    break;
+                }
+                index_channel->for_each_node_channel(
+                        [this, &index_channel, &status](const 
std::shared_ptr<VNodeChannel>& ch) {
+                            if (!status.ok() || ch->is_closed()) {
+                                return;
+                            }
+                            // only first try close, all node channels will 
mark_close()
+                            ch->mark_close();
+                            if (ch->is_cancelled()) {
+                                status = 
this->_cancel_channel_and_check_intolerable_failure(
+                                        status, ch->get_cancel_msg(), 
index_channel, ch);
+                            }
+                        });
+            } // end for index channels
+        }
+    }
+
+    if (!status.ok()) {
+        _cancel_all_channel(status);
+        _close_status = status;
+        _try_close = true;
+    }
+}
+
+bool VOlapTableSink::is_close_done() {
+    bool close_done = true;
+    for (const auto& index_channel : _channels) {
+        index_channel->for_each_node_channel(
+                [&close_done](const std::shared_ptr<VNodeChannel>& ch) {
+                    close_done &= ch->is_rpc_done();
+                });
+    }
+    return close_done;
+}
+
 Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
     if (_closed) {
         return _close_status;
     }
+    try_close(state, exec_status);
     SCOPED_TIMER(_close_timer);
-    Status status = exec_status;
-    if (status.ok()) {
+    // If _close_status is not ok, all nodes have been canceled in try_close.
+    if (_close_status.ok()) {
+        DCHECK(exec_status.ok());
+        auto status = Status::OK();
         // only if status is ok can we call this 
_profile->total_time_counter().
         // if status is not ok, this sink may not be prepared, so that 
_profile is null
         SCOPED_TIMER(_profile->total_time_counter());
@@ -1472,7 +1568,7 @@ Status VOlapTableSink::close(RuntimeState* state, Status 
exec_status) {
         VNodeChannelStat channel_stat;
         {
             if (config::enable_lazy_open_partition) {
-                for (auto index_channel : _channels) {
+                for (const auto& index_channel : _channels) {
                     index_channel->for_each_node_channel(
                             [](const std::shared_ptr<VNodeChannel>& ch) {
                                 ch->open_partition_wait();
@@ -1480,30 +1576,27 @@ Status VOlapTableSink::close(RuntimeState* state, 
Status exec_status) {
                 }
             }
 
-            for (auto index_channel : _channels) {
-                index_channel->for_each_node_channel(
-                        [](const std::shared_ptr<VNodeChannel>& ch) { 
ch->mark_close(); });
-                num_node_channels += index_channel->num_node_channels();
-            }
-
-            for (auto index_channel : _channels) {
+            for (const auto& index_channel : _channels) {
+                if (!status.ok()) {
+                    break;
+                }
                 int64_t add_batch_exec_time = 0;
                 int64_t wait_exec_time = 0;
                 index_channel->for_each_node_channel(
-                        [&index_channel, &state, &node_add_batch_counter_map, 
&serialize_batch_ns,
-                         &channel_stat, &queue_push_lock_ns, 
&actual_consume_ns,
-                         &total_add_batch_exec_time_ns, &add_batch_exec_time,
+                        [this, &index_channel, &status, &state, 
&node_add_batch_counter_map,
+                         &serialize_batch_ns, &channel_stat, 
&queue_push_lock_ns,
+                         &actual_consume_ns, &total_add_batch_exec_time_ns, 
&add_batch_exec_time,
                          &total_wait_exec_time_ns, &wait_exec_time,
                          &total_add_batch_num](const 
std::shared_ptr<VNodeChannel>& ch) {
+                            if (!status.ok() || ch->is_closed()) {
+                                return;
+                            }
+                            // in pipeline, all node channels are done or 
canceled, will not block.
+                            // no pipeline, close may block waiting.
                             auto s = ch->close_wait(state);
                             if (!s.ok()) {
-                                auto err_msg = s.to_string();
-                                index_channel->mark_as_failed(ch->node_id(), 
ch->host(), err_msg,
-                                                              -1);
-                                // cancel the node channel in best effort
-                                ch->cancel(err_msg);
-                                LOG(WARNING) << ch->channel_info()
-                                             << ", close channel failed, err: 
" << err_msg;
+                                status = 
this->_cancel_channel_and_check_intolerable_failure(
+                                        status, s.to_string(), index_channel, 
ch);
                             }
                             ch->time_report(&node_add_batch_counter_map, 
&serialize_batch_ns,
                                             &channel_stat, 
&queue_push_lock_ns, &actual_consume_ns,
@@ -1511,75 +1604,63 @@ Status VOlapTableSink::close(RuntimeState* state, 
Status exec_status) {
                                             &total_wait_exec_time_ns, 
&wait_exec_time,
                                             &total_add_batch_num);
                         });
-
+                num_node_channels += index_channel->num_node_channels();
                 if (add_batch_exec_time > max_add_batch_exec_time_ns) {
                     max_add_batch_exec_time_ns = add_batch_exec_time;
                 }
                 if (wait_exec_time > max_wait_exec_time_ns) {
                     max_wait_exec_time_ns = wait_exec_time;
                 }
-
-                // check if index has intolerable failure
-                Status index_st = index_channel->check_intolerable_failure();
-                if (!index_st.ok()) {
-                    status = index_st;
-                } else if (Status st = 
index_channel->check_tablet_received_rows_consistency();
-                           !st.ok()) {
-                    status = st;
-                }
             } // end for index channels
         }
-        // TODO need to be improved
-        LOG(INFO) << "total mem_exceeded_block_ns=" << 
channel_stat.mem_exceeded_block_ns
-                  << ", total queue_push_lock_ns=" << queue_push_lock_ns
-                  << ", total actual_consume_ns=" << actual_consume_ns
-                  << ", load id=" << print_id(_load_id);
-
-        COUNTER_SET(_input_rows_counter, _number_input_rows);
-        COUNTER_SET(_output_rows_counter, _number_output_rows);
-        COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
-        COUNTER_SET(_send_data_timer, _send_data_ns);
-        COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
-        COUNTER_SET(_filter_timer, _filter_ns);
-        COUNTER_SET(_append_node_channel_timer, 
channel_stat.append_node_channel_ns);
-        COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns);
-        COUNTER_SET(_wait_mem_limit_timer, channel_stat.mem_exceeded_block_ns);
-        COUNTER_SET(_validate_data_timer, _validate_data_ns);
-        COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
-        COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
-        COUNTER_SET(_total_add_batch_exec_timer, total_add_batch_exec_time_ns);
-        COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns);
-        COUNTER_SET(_total_wait_exec_timer, total_wait_exec_time_ns);
-        COUNTER_SET(_max_wait_exec_timer, max_wait_exec_time_ns);
-        COUNTER_SET(_add_batch_number, total_add_batch_num);
-        COUNTER_SET(_num_node_channels, num_node_channels);
-        // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node
-        int64_t num_rows_load_total = _number_input_rows + 
state->num_rows_load_filtered() +
-                                      state->num_rows_load_unselected();
-        state->set_num_rows_load_total(num_rows_load_total);
-        state->update_num_rows_load_filtered(_number_filtered_rows);
-        
state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows);
-
-        // print log of add batch time of all node, for tracing load 
performance easily
-        std::stringstream ss;
-        ss << "finished to close olap table sink. load_id=" << 
print_id(_load_id)
-           << ", txn_id=" << _txn_id
-           << ", node add batch time(ms)/wait execution time(ms)/close 
time(ms)/num: ";
-        for (auto const& pair : node_add_batch_counter_map) {
-            ss << "{" << pair.first << ":(" << 
(pair.second.add_batch_execution_time_us / 1000)
-               << ")(" << (pair.second.add_batch_wait_execution_time_us / 
1000) << ")("
-               << pair.second.close_wait_time_ms << ")(" << 
pair.second.add_batch_num << ")} ";
-        }
-        LOG(INFO) << ss.str();
-    } else {
-        for (auto channel : _channels) {
-            channel->for_each_node_channel([&status](const 
std::shared_ptr<VNodeChannel>& ch) {
-                ch->cancel(status.to_string());
-            });
+
+        if (status.ok()) {
+            // TODO need to be improved
+            LOG(INFO) << "total mem_exceeded_block_ns=" << 
channel_stat.mem_exceeded_block_ns
+                      << ", total queue_push_lock_ns=" << queue_push_lock_ns
+                      << ", total actual_consume_ns=" << actual_consume_ns
+                      << ", load id=" << print_id(_load_id);
+
+            COUNTER_SET(_input_rows_counter, _number_input_rows);
+            COUNTER_SET(_output_rows_counter, _number_output_rows);
+            COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
+            COUNTER_SET(_send_data_timer, _send_data_ns);
+            COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
+            COUNTER_SET(_filter_timer, _filter_ns);
+            COUNTER_SET(_append_node_channel_timer, 
channel_stat.append_node_channel_ns);
+            COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns);
+            COUNTER_SET(_wait_mem_limit_timer, 
channel_stat.mem_exceeded_block_ns);
+            COUNTER_SET(_validate_data_timer, _validate_data_ns);
+            COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
+            COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
+            COUNTER_SET(_total_add_batch_exec_timer, 
total_add_batch_exec_time_ns);
+            COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns);
+            COUNTER_SET(_total_wait_exec_timer, total_wait_exec_time_ns);
+            COUNTER_SET(_max_wait_exec_timer, max_wait_exec_time_ns);
+            COUNTER_SET(_add_batch_number, total_add_batch_num);
+            COUNTER_SET(_num_node_channels, num_node_channels);
+            // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node
+            int64_t num_rows_load_total = _number_input_rows + 
state->num_rows_load_filtered() +
+                                          state->num_rows_load_unselected();
+            state->set_num_rows_load_total(num_rows_load_total);
+            state->update_num_rows_load_filtered(_number_filtered_rows);
+            
state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows);
+
+            // print log of add batch time of all node, for tracing load 
performance easily
+            std::stringstream ss;
+            ss << "finished to close olap table sink. load_id=" << 
print_id(_load_id)
+               << ", txn_id=" << _txn_id
+               << ", node add batch time(ms)/wait execution time(ms)/close 
time(ms)/num: ";
+            for (auto const& pair : node_add_batch_counter_map) {
+                ss << "{" << pair.first << ":(" << 
(pair.second.add_batch_execution_time_us / 1000)
+                   << ")(" << (pair.second.add_batch_wait_execution_time_us / 
1000) << ")("
+                   << pair.second.close_wait_time_ms << ")(" << 
pair.second.add_batch_num << ")} ";
+            }
+            LOG(INFO) << ss.str();
+        } else {
+            _cancel_all_channel(status);
         }
-        LOG(INFO) << "finished to close olap table sink. load_id=" << 
print_id(_load_id)
-                  << ", txn_id=" << _txn_id
-                  << ", canceled all node channels due to error: " << status;
+        _close_status = status;
     }
 
     // Sender join() must put after node channels mark_close/cancel.
@@ -1592,9 +1673,8 @@ Status VOlapTableSink::close(RuntimeState* state, Status 
exec_status) {
         _send_batch_thread_pool_token->wait();
     }
 
-    _close_status = status;
     DataSink::close(state, exec_status);
-    return status;
+    return _close_status;
 }
 
 } // namespace stream_load
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 9c608cda43..7f9dc38f5f 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -113,6 +113,8 @@ struct AddBatchCounter {
 // It's very error-prone to guarantee the handler capture vars' & this 
closure's destruct sequence.
 // So using create() to get the closure pointer is recommended. We can delete 
the closure ptr before the capture vars destruction.
 // Delete this point is safe, don't worry about RPC callback will run after 
ReusableClosure deleted.
+// "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, 
`clear_in_flight` after rpc failure or callback,
+// then next send will start, and it will wait for the rpc callback to 
complete when it is destroyed.
 template <typename T>
 class ReusableClosure final : public google::protobuf::Closure {
 public:
@@ -233,6 +235,8 @@ public:
 
     void open_partition_wait();
 
+    bool open_partition_finished() const;
+
     Status add_block(vectorized::Block* block, const Payload* payload, bool 
is_append = false);
 
     int try_send_and_fetch_status(RuntimeState* state,
@@ -247,6 +251,22 @@ public:
     // 2. just cancel()
     void mark_close();
 
+    bool is_rpc_done() const;
+
+    bool is_closed() const { return _is_closed; }
+    bool is_cancelled() const { return _cancelled; }
+    std::string get_cancel_msg() {
+        std::stringstream ss;
+        ss << "close wait failed coz rpc error";
+        {
+            std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
+            if (_cancel_msg != "") {
+                ss << ". " << _cancel_msg;
+            }
+        }
+        return ss.str();
+    }
+
     // two ways to stop channel:
     // 1. mark_close()->close_wait() PS. close_wait() will block waiting for 
the last AddBatch rpc response.
     // 2. just cancel()
@@ -466,6 +486,9 @@ public:
 
     Status open(RuntimeState* state) override;
 
+    void try_close(RuntimeState* state, Status exec_status) override;
+    // if true, all node channels rpc done, can start close().
+    bool is_close_done() override;
     Status close(RuntimeState* state, Status close_status) override;
     Status send(RuntimeState* state, vectorized::Block* block, bool eos = 
false) override;
 
@@ -501,6 +524,12 @@ private:
 
     void _open_partition(const VOlapTablePartition* partition);
 
+    Status _cancel_channel_and_check_intolerable_failure(Status status, const 
std::string& err_msg,
+                                                         const 
std::shared_ptr<IndexChannel> ich,
+                                                         const 
std::shared_ptr<VNodeChannel> nch);
+
+    void _cancel_all_channel(Status status);
+
     std::shared_ptr<MemTracker> _mem_tracker;
 
     ObjectPool* _pool;
@@ -585,8 +614,9 @@ private:
     int64_t _load_channel_timeout_s = 0;
 
     int32_t _send_batch_parallelism = 1;
-    // Save the status of close() method
+    // Save the status of try_close() and close() method
     Status _close_status;
+    bool _try_close = false;
 
     // User can change this config at runtime, avoid it being modified during 
query or loading process.
     bool _transfer_large_data_by_brpc = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to