liaoxin01 commented on code in PR #51404:
URL: https://github.com/apache/doris/pull/51404#discussion_r2126827046


##########
be/src/vec/sink/writer/vtablet_writer.cpp:
##########
@@ -271,6 +271,55 @@ Status 
IndexChannel::check_tablet_filtered_rows_consistency() {
     return Status::OK();
 }
 
+static Status cancel_channel_and_check_intolerable_failure(Status status,
+                                                           const std::string& 
err_msg,
+                                                           IndexChannel& ich, 
VNodeChannel& nch) {
+    LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << 
err_msg;
+    ich.mark_as_failed(&nch, err_msg, -1);
+    // cancel the node channel in best effort
+    nch.cancel(err_msg);
+
+    // check if index has intolerable failure
+    if (Status index_st = ich.check_intolerable_failure(); !index_st.ok()) {
+        status = std::move(index_st);
+    } else if (Status receive_st = 
ich.check_tablet_received_rows_consistency(); !receive_st.ok()) {
+        status = std::move(receive_st);
+    } else if (Status filter_st = 
ich.check_tablet_filtered_rows_consistency(); !filter_st.ok()) {
+        status = std::move(filter_st);
+    }
+    return status;
+}
+
+Status IndexChannel::close_wait(
+        RuntimeState* state, WriterStats* writer_stats,
+        std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map) {
+    Status status = Status::OK();
+    while (true) {
+        bool all_node_channel_closed = true;
+        for (auto& it : _node_channels) {
+            if (!status.ok() || (it.second->is_closed() && 
!it.second->is_cancelled())) {
+                continue;
+            }
+            bool node_channel_closed = false;
+            auto s = it.second->close_wait(state, &node_channel_closed);
+            if (node_channel_closed) {

Review Comment:
   Should we first check the status of s?



##########
be/src/vec/sink/writer/vtablet_writer.cpp:
##########
@@ -936,13 +985,19 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
     static_cast<void>(request->release_id());
 }
 
-Status VNodeChannel::close_wait(RuntimeState* state) {
+Status VNodeChannel::close_wait(RuntimeState* state, bool* is_closed) {
     DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", {
         std::thread t(injection_full_gc_fn);
         t.join();
     });
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
 
+    *is_closed = true;
+
+    if (_rpc_timeout_ms < _timeout_watch.elapsed_time() / NANOS_PER_MILLIS) {

Review Comment:
   Why did you add a timeout logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to