This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new a701945c91 [branch-1.2](improvement) avoid BufferControlBlock block 
and cancel fail (#16917)
a701945c91 is described below

commit a701945c91ec29be795130465917c91f165161e3
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Tue Feb 21 14:13:29 2023 +0800

    [branch-1.2](improvement) avoid BufferControlBlock block and cancel fail 
(#16917)
    
    cherry-pick part of #16304
---
 be/src/exec/es/es_scroll_parser.h       | 1 +
 be/src/runtime/buffer_control_block.cpp | 2 +-
 be/src/runtime/buffer_control_block.h   | 2 +-
 be/src/vec/exec/scan/vscanner.cpp       | 7 ++++++-
 4 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/be/src/exec/es/es_scroll_parser.h 
b/be/src/exec/es/es_scroll_parser.h
index 2803777776..b2fc83595f 100644
--- a/be/src/exec/es/es_scroll_parser.h
+++ b/be/src/exec/es/es_scroll_parser.h
@@ -54,6 +54,7 @@ private:
     Status fill_date_slot_with_strval(void* slot, const rapidjson::Value& col, 
PrimitiveType type);
     Status fill_date_slot_with_timestamp(void* slot, const rapidjson::Value& 
col,
                                          PrimitiveType type);
+
 private:
     std::string _scroll_id;
     int _size;
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 03bd6d3466..4c4d148017 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -105,7 +105,7 @@ Status 
BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
     int num_rows = result->result_batch.rows.size();
 
     while ((!_batch_queue.empty() && (num_rows + _buffer_rows) > 
_buffer_limit) && !_is_cancelled) {
-        _data_removal.wait(l);
+        _data_removal.wait_for(l, std::chrono::seconds(1), [&]() { return 
_is_cancelled.load(); });
     }
 
     if (_is_cancelled) {
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 8528f74164..3ad27bfd19 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -106,7 +106,7 @@ private:
     // result's query id
     TUniqueId _fragment_id;
     bool _is_close;
-    bool _is_cancelled;
+    std::atomic_bool _is_cancelled;
     Status _status;
     int _buffer_rows;
     int _buffer_limit;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index e0a5814df6..8679f37f89 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -69,7 +69,12 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
                 // record rows return (after filter) for _limit check
                 _num_rows_return += block->rows();
             }
-        } while (block->rows() == 0 && !(*eof) && raw_rows_read() < 
raw_rows_threshold);
+        } while (!state->is_cancelled() && block->rows() == 0 && !(*eof) &&
+                 raw_rows_read() < raw_rows_threshold);
+    }
+
+    if (state->is_cancelled()) {
+        return Status::Cancelled("cancelled");
     }
 
     // set eof to true if per scanner limit is reached


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to