github-actions[bot] commented on code in PR #29466:
URL: https://github.com/apache/doris/pull/29466#discussion_r1440237568


##########
be/src/vec/sink/writer/async_result_writer.cpp:
##########
@@ -93,48 +95,70 @@ void AsyncResultWriter::start_writer(RuntimeState* state, 
RuntimeProfile* profil
 
 void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* 
profile) {
     if (auto status = open(state, profile); !status.ok()) {
-        force_close(status);
+        auto close_st = close(status);
+        return;
     }
 
-    if (_writer_status.ok()) {
-        while (true) {
-            if (!_eos && _data_queue.empty() && _writer_status.ok()) {
-                std::unique_lock l(_m);
-                while (!_eos && _data_queue.empty() && _writer_status.ok()) {
-                    _cv.wait(l);
-                }
-            }
-
-            if ((_eos && _data_queue.empty()) || !_writer_status.ok()) {
-                _data_queue.clear();
-                break;
-            }
-
-            auto block = _get_block_from_queue();
-            auto status = write(block);
-            if (!status.ok()) [[unlikely]] {
-                std::unique_lock l(_m);
-                _writer_status = status;
-                if (_dependency && _is_finished()) {
-                    _dependency->set_ready();
-                }
-                break;
-            }
+    while (true) {
+        std::unique_lock<std::mutex> lock(_m);
+
+        while (!_is_closed && !_eos && _data_queue.empty()) {
+            _cv.wait_for(lock, std::chrono::seconds(1));
+        }
+
+        if (_is_closed) {
+            break;
+        }
+
+        if (_eos && _data_queue.empty()) {
+            break;
+        }
+
+        // if _eos == true && !_data_queue.empty()
+        // we will write block to downstream one by one until _data_queue is 
empty
+        // and loop will be breaked then.
 
+        // release lock before IO
+        lock.unlock();
+
+        auto block = _get_block_from_queue();
+        // for VFileResultWriter, we have three types of transformer:
+        // 1. VCSVTransformer
+        // 2. VParquetTransformer
+        // 3. FORMAT_ORC
+        // what if method write blocks for a very long time? seems that we 
cannot
+        // notify them to stop unless they are using async writing pattern.
+        auto write_st = write(block);
+
+        if (write_st.ok()) {
             _return_free_block(std::move(block));
+            continue;
         }
-    }
 
-    // if not in transaction or status is in error or force close we can do 
close in
-    // async IO thread
-    if (!_writer_status.ok() || !in_transaction()) {
-        _writer_status = close(_writer_status);
-        _need_normal_close = false;
+        lock.lock();
+        // from this point on, there will be no further block added to 
_data_queue
+        _is_closed = true;
+
+        if (_dependency && _is_finished()) {
+            _dependency->set_ready();
+        }
+
+        lock.unlock();
+        break;
     }
+
+    std::lock_guard<std::mutex> lg(_m);
+    // we need this to notify try_close to return.
     _writer_thread_closed = true;
+    _data_queue.clear();
+
     if (_finish_dependency) {
         _finish_dependency->set_ready();
     }
+    // notify try_close thread
+    _cv.notify_one();
+
+    return;

Review Comment:
   warning: redundant return statement at the end of a function with a void 
return type [readability-redundant-control-flow]
   
   be/src/vec/sink/writer/async_result_writer.cpp:159:
   ```diff
   - 
   -     return;
   - }
   + }
   ```
   



##########
be/src/vec/sink/vresult_file_sink.cpp:
##########
@@ -110,44 +113,54 @@ Status VResultFileSink::open(RuntimeState* state) {
     return AsyncWriterSink::open(state);
 }
 
-Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
+Status VResultFileSink::try_close(RuntimeState* state, Status exec_status) {

Review Comment:
   warning: method 'try_close' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   Status VResultFileSink::try_close(RuntimeState* state, Status exec_status) 
const {
   ```
   
   be/src/vec/sink/vresult_file_sink.h:65:
   ```diff
   -     Status try_close(RuntimeState* rs, Status exec_status) override;
   +     Status try_close(RuntimeState* rs, Status exec_status) const override;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to