github-actions[bot] commented on code in PR #29466:
URL: https://github.com/apache/doris/pull/29466#discussion_r1440172276


##########
be/src/vec/sink/vresult_file_sink.cpp:
##########
@@ -110,44 +113,54 @@ Status VResultFileSink::open(RuntimeState* state) {
     return AsyncWriterSink::open(state);
 }
 
-Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
+Status VResultFileSink::try_close(RuntimeState* rs, Status exec_status) {

Review Comment:
   warning: method 'try_close' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   Status VResultFileSink::try_close(RuntimeState* rs, Status exec_status) 
const {
   ```
   
   be/src/vec/sink/vresult_file_sink.h:65:
   ```diff
   -     Status try_close(RuntimeState* rs, Status exec_status) override;
   +     Status try_close(RuntimeState* rs, Status exec_status) const override;
   ```
   



##########
be/src/vec/sink/vresult_file_sink.cpp:
##########
@@ -110,44 +113,54 @@
     return AsyncWriterSink::open(state);
 }
 
-Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
+Status VResultFileSink::try_close(RuntimeState* rs, Status exec_status) {
     if (_closed) {
+        LOG_WARNING("Closing a closed result file sink");
         return Status::OK();
     }
 
-    Status final_status = exec_status;
-    // close the writer
-    if (_writer && _writer->need_normal_close()) {
-        Status st = _writer->close();
-        if (!st.ok() && exec_status.ok()) {
-            // close file writer failed, should return this error to client
-            final_status = st;
-        }
+    if (_writer == nullptr) {
+        LOG_WARNING("writer of result file sink is not initialized");
+        return Status::RuntimeError("writer of result file sink is not 
initialized");
     }
+
+    // TaskScheduler should call close if exec_status is not ok.
+    // actually we should remove arg exec_status.
+    RETURN_IF_ERROR(exec_status);
+
+    // here we should rename _writer->cloes to _writer->try_close
+    // to make read code easily
+    // check of EOF should be left to operator
+    RETURN_IF_ERROR(_writer->try_close(exec_status));
+
     if (_is_top_sink) {
-        // close sender, this is normal path end
         if (_sender) {
-            _sender->update_num_written_rows(_writer == nullptr ? 0 : 
_writer->get_written_rows());
-            static_cast<void>(_sender->close(final_status));
+            _sender->update_num_written_rows(_writer->get_written_rows());
+            // WIP: close -> flush
+            RETURN_IF_ERROR(_sender->close(exec_status));
         }
-        static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->result_mgr()->cancel_at_time(
                 time(nullptr) + config::result_buffer_cancelled_interval_time,
-                state->fragment_instance_id()));
+                rs->fragment_instance_id()));
     } else {
-        if (final_status.ok()) {
-            auto st = _stream_sender->send(state, _output_block.get(), true);
-            if (!st.template is<ErrorCode::END_OF_FILE>()) {
-                RETURN_IF_ERROR(st);
-            }
-        }
-        RETURN_IF_ERROR(_stream_sender->close(state, final_status));
+        RETURN_IF_ERROR(_stream_sender->send(rs, _output_block.get(), true));
+        // WIP: close -> flush
+        RETURN_IF_ERROR(_stream_sender->close(rs, exec_status));
         _output_block->clear();
     }
-
     _closed = true;
     return Status::OK();
 }
 
+Status VResultFileSink::close(RuntimeState* state, Status exec_status) {

Review Comment:
   warning: method 'close' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/sink/vresult_file_sink.h:68:
   ```diff
   -     Status close(RuntimeState* state, Status exec_status) override;
   +     static Status close(RuntimeState* state, Status exec_status) override;
   ```
   



##########
be/src/vec/sink/writer/async_result_writer.h:
##########
@@ -98,19 +115,19 @@
 
     void _return_free_block(std::unique_ptr<Block>);
 
-private:
+protected:

Review Comment:
   warning: redundant access specifier has the same accessibility as the 
previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/vec/sink/writer/async_result_writer.h:109:** previously declared 
here
   ```cpp
   protected:
   ^
   ```
   
   </details>
   



##########
be/src/vec/sink/writer/async_result_writer.cpp:
##########
@@ -93,48 +95,68 @@ void AsyncResultWriter::start_writer(RuntimeState* state, 
RuntimeProfile* profil
 
 void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* 
profile) {
     if (auto status = open(state, profile); !status.ok()) {
-        force_close(status);
+        auto close_st = close(status);
+        return;
     }
 
-    if (_writer_status.ok()) {
-        while (true) {
-            if (!_eos && _data_queue.empty() && _writer_status.ok()) {
-                std::unique_lock l(_m);
-                while (!_eos && _data_queue.empty() && _writer_status.ok()) {
-                    _cv.wait(l);
-                }
-            }
-
-            if ((_eos && _data_queue.empty()) || !_writer_status.ok()) {
-                _data_queue.clear();
-                break;
-            }
-
-            auto block = _get_block_from_queue();
-            auto status = write(block);
-            if (!status.ok()) [[unlikely]] {
-                std::unique_lock l(_m);
-                _writer_status = status;
-                if (_dependency && _is_finished()) {
-                    _dependency->set_ready();
-                }
-                break;
-            }
+    while (true) {
+        std::unique_lock<std::mutex> lock(_m);
+
+        while (!_is_closed && !_eos && _data_queue.empty()) {
+            _cv.wait_for(lock, std::chrono::seconds(1));
+        }
+
+        if (_is_closed) {
+            break;
+        }
+
+        if (_eos && _data_queue.empty()) {
+            break;
+        }
+
+        // if _eos == true && !_data_queue.empty()
+        // we will write block to downstream one by one until _data_queue is 
empty
+        // and loop will be breaked then.
+
+        // release lock before IO
+        lock.unlock();
+
+        auto block = _get_block_from_queue();
+        // for VFileResultWriter, we have three types of transformer:
+        // 1. VCSVTransformer
+        // 2. VParquetTransformer
+        // 3. FORMAT_ORC
+        // what if method write blocks for a very long time? seems that we 
cannot
+        // notify them to stop unless they are using async writing pattern.
+        auto write_st = write(block);
 
+        if (write_st.ok()) {
             _return_free_block(std::move(block));
+            continue;
+        }
+
+        lock.lock();
+        // from this point on, there will be no further block added to 
_data_queue
+        _is_closed = true;
+
+        if (_dependency && _is_finished()) {
+            _dependency->set_ready();
         }
-    }
 
-    // if not in transaction or status is in error or force close we can do 
close in
-    // async IO thread
-    if (!_writer_status.ok() || !in_transaction()) {
-        _writer_status = close(_writer_status);
-        _need_normal_close = false;
+        lock.unlock();
+        break;
     }
+
+    std::lock_guard<std::mutex> lg(_m);
+    // we need this to notify try_close to return.
     _writer_thread_closed = true;
+    _data_queue.clear();
+
     if (_finish_dependency) {
         _finish_dependency->set_ready();
     }
+
+    return;
 }

Review Comment:
   warning: redundant return statement at the end of a function with a void 
return type [readability-redundant-control-flow]
   
   ```suggestion
       }
   ```
   



##########
be/src/vec/sink/writer/async_result_writer.h:
##########
@@ -19,8 +19,10 @@
 #include <concurrentqueue.h>

Review Comment:
   warning: 'concurrentqueue.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <concurrentqueue.h>
            ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to