This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fdb5891c3ec [Improvement](sink) optimization for parallel result sink 
(#36305)
fdb5891c3ec is described below

commit fdb5891c3eccefad7a354436dfb0eae82da5bd6e
Author: Pxl <pxl...@qq.com>
AuthorDate: Wed Jun 19 19:53:33 2024 +0800

    [Improvement](sink) optimization for parallel result sink (#36305)
    
    ## Proposed changes
    optimization for parallel result sink #36053
---
 be/src/pipeline/exec/result_file_sink_operator.cpp |   5 +-
 be/src/pipeline/exec/result_file_sink_operator.h   |   2 +-
 be/src/pipeline/exec/result_sink_operator.cpp      |  13 +-
 be/src/pipeline/exec/result_sink_operator.h        |   2 +-
 be/src/pipeline/local_exchange/local_exchanger.cpp |  22 +-
 be/src/runtime/buffer_control_block.cpp            | 258 ++++++++++-----------
 be/src/runtime/buffer_control_block.h              |  33 +--
 be/src/runtime/result_buffer_mgr.cpp               |   6 +-
 be/src/runtime/result_buffer_mgr.h                 |   3 +-
 be/src/runtime/result_writer.h                     |   2 +-
 be/src/service/point_query_executor.cpp            |  14 +-
 be/src/service/point_query_executor.h              |   2 +-
 be/src/vec/sink/varrow_flight_result_writer.cpp    |   4 +-
 be/src/vec/sink/varrow_flight_result_writer.h      |   2 +-
 be/src/vec/sink/vmysql_result_writer.cpp           |   4 +-
 be/src/vec/sink/vmysql_result_writer.h             |   2 +-
 be/src/vec/sink/writer/async_result_writer.cpp     |   2 +-
 .../sink/writer/iceberg/viceberg_table_writer.cpp  |   2 +-
 .../sink/writer/iceberg/viceberg_table_writer.h    |   2 +-
 be/src/vec/sink/writer/vfile_result_writer.cpp     |   5 +-
 be/src/vec/sink/writer/vfile_result_writer.h       |   2 +-
 be/src/vec/sink/writer/vhive_table_writer.cpp      |   2 +-
 be/src/vec/sink/writer/vhive_table_writer.h        |   4 +-
 be/src/vec/sink/writer/vjdbc_table_writer.cpp      |   2 +-
 be/src/vec/sink/writer/vjdbc_table_writer.h        |   2 +-
 be/src/vec/sink/writer/vmysql_table_writer.cpp     |   2 +-
 be/src/vec/sink/writer/vmysql_table_writer.h       |   2 +-
 be/src/vec/sink/writer/vodbc_table_writer.cpp      |   2 +-
 be/src/vec/sink/writer/vodbc_table_writer.h        |   2 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          |   4 +-
 be/src/vec/sink/writer/vtablet_writer.h            |   2 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |   4 +-
 be/src/vec/sink/writer/vtablet_writer_v2.h         |   2 +-
 .../serde/data_type_serde_mysql_test.cpp           |   2 +-
 34 files changed, 206 insertions(+), 213 deletions(-)

diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 0cd14899f52..029bea7494e 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -99,7 +99,8 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
     if (p._is_top_sink) {
         // create sender
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                state->fragment_instance_id(), p._buf_size, &_sender, 
state->execution_timeout()));
+                state->fragment_instance_id(), p._buf_size, &_sender, 
state->execution_timeout(),
+                state->batch_size()));
         // create writer
         _writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
                 p._file_opts.get(), p._storage_type, 
state->fragment_instance_id(),
@@ -175,7 +176,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
         // close sender, this is normal path end
         if (_sender) {
             _sender->update_return_rows(_writer == nullptr ? 0 : 
_writer->get_written_rows());
-            RETURN_IF_ERROR(_sender->close(final_status));
+            RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status));
         }
         state->exec_env()->result_mgr()->cancel_at_time(
                 time(nullptr) + config::result_buffer_cancelled_interval_time,
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h 
b/be/src/pipeline/exec/result_file_sink_operator.h
index 4fa31f615ce..7623dae7fea 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -107,7 +107,7 @@ private:
 
     // Owned by the RuntimeState.
     RowDescriptor _output_row_descriptor;
-    int _buf_size = 1024; // Allocated from _pool
+    int _buf_size = 4096; // Allocated from _pool
     bool _is_top_sink = true;
     std::string _header;
     std::string _header_type;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 24c5162c4f4..378fea18eea 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -49,10 +49,10 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
         _sender = _parent->cast<ResultSinkOperatorX>()._sender;
     } else {
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE, 
&_sender,
-                state->execution_timeout()));
+                fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender, 
state->execution_timeout(),
+                state->batch_size()));
     }
-    _sender->set_dependency(_dependency->shared_from_this());
+    _sender->set_dependency(fragment_instance_id, 
_dependency->shared_from_this());
     return Status::OK();
 }
 
@@ -122,7 +122,8 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {
 
     if (state->query_options().enable_parallel_result_sink) {
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, 
state->execution_timeout()));
+                state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, 
state->execution_timeout(),
+                state->batch_size()));
     }
     return Status::OK();
 }
@@ -139,7 +140,7 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block,
     if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
         RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
     }
-    RETURN_IF_ERROR(local_state._writer->write(*block));
+    RETURN_IF_ERROR(local_state._writer->write(state, *block));
     if (_fetch_option.use_two_phase_fetch) {
         // Block structure may be changed by calling 
_second_phase_fetch_data().
         // So we should clear block in case of unmatched columns
@@ -185,7 +186,7 @@ Status ResultSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
         if (_writer) {
             _sender->update_return_rows(_writer->get_written_rows());
         }
-        RETURN_IF_ERROR(_sender->close(final_status));
+        RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status));
     }
     state->exec_env()->result_mgr()->cancel_at_time(
             time(nullptr) + config::result_buffer_cancelled_interval_time,
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index 0ccb7f4946b..1d2490f486d 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -104,7 +104,7 @@ struct ResultFileOptions {
     }
 };
 
-constexpr int RESULT_SINK_BUFFER_SIZE = 4096;
+constexpr int RESULT_SINK_BUFFER_SIZE = 4096 * 8;
 
 class ResultSinkLocalState final : public 
PipelineXSinkLocalState<BasicSharedState> {
     ENABLE_FACTORY_CREATOR(ResultSinkLocalState);
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 51d2c8268e7..a8dc13438c1 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -200,8 +200,9 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     }
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
-    local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
+    size_t allocated_bytes = new_block.allocated_bytes();
     if (_data_queue[channel_id].enqueue(std::move(new_block))) {
+        local_state._shared_state->add_mem_usage(channel_id, allocated_bytes);
         local_state._shared_state->set_ready_to_read(channel_id);
     }
 
@@ -220,25 +221,16 @@ void 
PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                        LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            block->swap(next_block);
-            local_state._shared_state->sub_mem_usage(local_state._channel_id,
-                                                     block->allocated_bytes());
-            if (_free_block_limit == 0 ||
-                _free_blocks.size_approx() < _free_block_limit * _num_sources) 
{
-                _free_blocks.enqueue(std::move(next_block));
-            }
-        } else {
-            *eos = true;
-        }
-    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         block->swap(next_block);
+        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
         if (_free_block_limit == 0 ||
             _free_blocks.size_approx() < _free_block_limit * _num_sources) {
             _free_blocks.enqueue(std::move(next_block));
         }
-        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
+    } else if (all_finished) {
+        *eos = true;
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 8ef23265e3f..a1a83b22840 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -31,7 +31,7 @@
 
 #include "arrow/record_batch.h"
 #include "arrow/type_fwd.h"
-#include "pipeline/exec/result_sink_operator.h"
+#include "pipeline/dependency.h"
 #include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
 #include "util/thrift_util.h"
@@ -85,13 +85,14 @@ void GetResultBatchCtx::on_data(const 
std::unique_ptr<TFetchDataResult>& t_resul
     delete this;
 }
 
-BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size)
+BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, 
int batch_size)
         : _fragment_id(id),
           _is_close(false),
           _is_cancelled(false),
           _buffer_rows(0),
           _buffer_limit(buffer_size),
-          _packet_num(0) {
+          _packet_num(0),
+          _batch_size(batch_size) {
     _query_statistics = std::make_unique<QueryStatistics>();
 }
 
@@ -103,165 +104,153 @@ Status BufferControlBlock::init() {
     return Status::OK();
 }
 
-Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& 
result) {
-    {
-        std::unique_lock<std::mutex> l(_lock);
-
-        if (_is_cancelled) {
-            return Status::Cancelled("Cancelled");
-        }
-
-        int num_rows = result->result_batch.rows.size();
-
-        while ((!_fe_result_batch_queue.empty() && _buffer_rows > 
_buffer_limit) &&
-               !_is_cancelled) {
-            _data_removal.wait_for(l, std::chrono::seconds(1));
-        }
+Status BufferControlBlock::add_batch(RuntimeState* state,
+                                     std::unique_ptr<TFetchDataResult>& 
result) {
+    std::unique_lock<std::mutex> l(_lock);
 
-        if (_is_cancelled) {
-            return Status::Cancelled("Cancelled");
-        }
+    if (_is_cancelled) {
+        return Status::Cancelled("Cancelled");
+    }
 
-        if (_waiting_rpc.empty()) {
-            // Merge result into batch to reduce rpc times
-            if (!_fe_result_batch_queue.empty() &&
-                ((_fe_result_batch_queue.back()->result_batch.rows.size() + 
num_rows) <
-                 _buffer_limit) &&
-                !result->eos) {
-                std::vector<std::string>& back_rows =
-                        _fe_result_batch_queue.back()->result_batch.rows;
-                std::vector<std::string>& result_rows = 
result->result_batch.rows;
-                back_rows.insert(back_rows.end(), 
std::make_move_iterator(result_rows.begin()),
-                                 std::make_move_iterator(result_rows.end()));
-            } else {
-                _fe_result_batch_queue.push_back(std::move(result));
-            }
-            _buffer_rows += num_rows;
+    int num_rows = result->result_batch.rows.size();
+    if (_waiting_rpc.empty()) {
+        // Merge result into batch to reduce rpc times
+        if (!_fe_result_batch_queue.empty() &&
+            ((_fe_result_batch_queue.back()->result_batch.rows.size() + 
num_rows) <
+             _buffer_limit) &&
+            !result->eos) {
+            std::vector<std::string>& back_rows = 
_fe_result_batch_queue.back()->result_batch.rows;
+            std::vector<std::string>& result_rows = result->result_batch.rows;
+            back_rows.insert(back_rows.end(), 
std::make_move_iterator(result_rows.begin()),
+                             std::make_move_iterator(result_rows.end()));
         } else {
-            auto* ctx = _waiting_rpc.front();
-            _waiting_rpc.pop_front();
-            ctx->on_data(result, _packet_num);
-            _packet_num++;
+            _instance_rows_in_queue.emplace_back();
+            _fe_result_batch_queue.push_back(std::move(result));
         }
+        _buffer_rows += num_rows;
+        _instance_rows[state->fragment_instance_id()] += num_rows;
+        _instance_rows_in_queue.back()[state->fragment_instance_id()] += 
num_rows;
+    } else {
+        auto* ctx = _waiting_rpc.front();
+        _waiting_rpc.pop_front();
+        ctx->on_data(result, _packet_num);
+        _packet_num++;
     }
+
     _update_dependency();
     return Status::OK();
 }
 
-Status 
BufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& 
result) {
-    {
-        std::unique_lock<std::mutex> l(_lock);
-
-        if (_is_cancelled) {
-            return Status::Cancelled("Cancelled");
-        }
+Status BufferControlBlock::add_arrow_batch(RuntimeState* state,
+                                           
std::shared_ptr<arrow::RecordBatch>& result) {
+    std::unique_lock<std::mutex> l(_lock);
 
-        int num_rows = result->num_rows();
+    if (_is_cancelled) {
+        return Status::Cancelled("Cancelled");
+    }
 
-        while ((!_arrow_flight_batch_queue.empty() && _buffer_rows > 
_buffer_limit) &&
-               !_is_cancelled) {
-            _data_removal.wait_for(l, std::chrono::seconds(1));
-        }
+    int num_rows = result->num_rows();
 
-        if (_is_cancelled) {
-            return Status::Cancelled("Cancelled");
-        }
+    if (_is_cancelled) {
+        return Status::Cancelled("Cancelled");
+    }
 
-        // TODO: merge RocordBatch, ToStructArray -> Make again
+    // TODO: merge RocordBatch, ToStructArray -> Make again
 
-        _arrow_flight_batch_queue.push_back(std::move(result));
-        _buffer_rows += num_rows;
-        _data_arrival.notify_one();
-    }
+    _arrow_flight_batch_queue.push_back(std::move(result));
+    _buffer_rows += num_rows;
+    _instance_rows_in_queue.emplace_back();
+    _instance_rows[state->fragment_instance_id()] += num_rows;
+    _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
     _update_dependency();
     return Status::OK();
 }
 
 void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        if (!_status.ok()) {
-            ctx->on_failure(_status);
-            _update_dependency();
-            return;
-        }
-        if (_is_cancelled) {
-            ctx->on_failure(Status::Cancelled("Cancelled"));
-            _update_dependency();
-            return;
-        }
-        if (!_fe_result_batch_queue.empty()) {
-            // get result
-            std::unique_ptr<TFetchDataResult> result = 
std::move(_fe_result_batch_queue.front());
-            _fe_result_batch_queue.pop_front();
-            _buffer_rows -= result->result_batch.rows.size();
-            _data_removal.notify_one();
-
-            ctx->on_data(result, _packet_num);
-            _packet_num++;
-            _update_dependency();
-            return;
-        }
-        if (_is_close) {
-            ctx->on_close(_packet_num, _query_statistics.get());
-            _update_dependency();
-            return;
+    std::lock_guard<std::mutex> l(_lock);
+    if (!_status.ok()) {
+        ctx->on_failure(_status);
+        _update_dependency();
+        return;
+    }
+    if (_is_cancelled) {
+        ctx->on_failure(Status::Cancelled("Cancelled"));
+        _update_dependency();
+        return;
+    }
+    if (!_fe_result_batch_queue.empty()) {
+        // get result
+        std::unique_ptr<TFetchDataResult> result = 
std::move(_fe_result_batch_queue.front());
+        _fe_result_batch_queue.pop_front();
+        _buffer_rows -= result->result_batch.rows.size();
+        for (auto it : _instance_rows_in_queue.front()) {
+            _instance_rows[it.first] -= it.second;
         }
-        // no ready data, push ctx to waiting list
-        _waiting_rpc.push_back(ctx);
+        _instance_rows_in_queue.pop_front();
+
+        ctx->on_data(result, _packet_num);
+        _packet_num++;
+        _update_dependency();
+        return;
+    }
+    if (_is_close) {
+        ctx->on_close(_packet_num, _query_statistics.get());
+        _update_dependency();
+        return;
     }
+    // no ready data, push ctx to waiting list
+    _waiting_rpc.push_back(ctx);
     _update_dependency();
 }
 
 Status 
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* 
result) {
-    {
-        std::unique_lock<std::mutex> l(_lock);
-        if (!_status.ok()) {
-            return _status;
-        }
-        if (_is_cancelled) {
-            return Status::Cancelled("Cancelled");
-        }
-
-        while (_arrow_flight_batch_queue.empty() && !_is_cancelled && 
!_is_close) {
-            _data_arrival.wait_for(l, std::chrono::seconds(1));
-        }
+    std::unique_lock<std::mutex> l(_lock);
+    if (!_status.ok()) {
+        return _status;
+    }
+    if (_is_cancelled) {
+        return Status::Cancelled("Cancelled");
+    }
 
-        if (_is_cancelled) {
-            return Status::Cancelled("Cancelled");
-        }
+    if (_is_cancelled) {
+        return Status::Cancelled("Cancelled");
+    }
 
-        if (!_arrow_flight_batch_queue.empty()) {
-            *result = std::move(_arrow_flight_batch_queue.front());
-            _arrow_flight_batch_queue.pop_front();
-            _buffer_rows -= (*result)->num_rows();
-            _data_removal.notify_one();
-            _packet_num++;
-            _update_dependency();
-            return Status::OK();
+    if (!_arrow_flight_batch_queue.empty()) {
+        *result = std::move(_arrow_flight_batch_queue.front());
+        _arrow_flight_batch_queue.pop_front();
+        _buffer_rows -= (*result)->num_rows();
+        for (auto it : _instance_rows_in_queue.front()) {
+            _instance_rows[it.first] -= it.second;
         }
+        _instance_rows_in_queue.pop_front();
+        _packet_num++;
+        _update_dependency();
+        return Status::OK();
+    }
 
-        // normal path end
-        if (_is_close) {
-            _update_dependency();
-            return Status::OK();
-        }
+    // normal path end
+    if (_is_close) {
+        _update_dependency();
+        return Status::OK();
     }
     return Status::InternalError("Abnormal Ending");
 }
 
-Status BufferControlBlock::close(Status exec_status) {
+Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
     std::unique_lock<std::mutex> l(_lock);
-    close_cnt++;
-    if (close_cnt < _result_sink_dependencys.size()) {
+    auto it = _result_sink_dependencys.find(id);
+    if (it != _result_sink_dependencys.end()) {
+        it->second->set_always_ready();
+        _result_sink_dependencys.erase(it);
+    }
+    if (!_result_sink_dependencys.empty()) {
         return Status::OK();
     }
 
     _is_close = true;
     _status = exec_status;
 
-    // notify blocked get thread
-    _data_arrival.notify_all();
     if (!_waiting_rpc.empty()) {
         if (_status.ok()) {
             for (auto& ctx : _waiting_rpc) {
@@ -280,8 +269,6 @@ Status BufferControlBlock::close(Status exec_status) {
 void BufferControlBlock::cancel() {
     std::unique_lock<std::mutex> l(_lock);
     _is_cancelled = true;
-    _data_removal.notify_all();
-    _data_arrival.notify_all();
     for (auto& ctx : _waiting_rpc) {
         ctx->on_failure(Status::Cancelled("Cancelled"));
     }
@@ -290,18 +277,25 @@ void BufferControlBlock::cancel() {
 }
 
 void BufferControlBlock::set_dependency(
-        std::shared_ptr<pipeline::Dependency> result_sink_dependency) {
-    _result_sink_dependencys.push_back(result_sink_dependency);
+        const TUniqueId& id, std::shared_ptr<pipeline::Dependency> 
result_sink_dependency) {
+    std::unique_lock<std::mutex> l(_lock);
+    _result_sink_dependencys[id] = result_sink_dependency;
+    _update_dependency();
 }
 
 void BufferControlBlock::_update_dependency() {
-    if (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled) {
-        for (auto dependency : _result_sink_dependencys) {
-            dependency->set_ready();
+    if (_is_cancelled) {
+        for (auto it : _result_sink_dependencys) {
+            it.second->set_ready();
         }
-    } else if (!_batch_queue_empty && _buffer_rows < _buffer_limit && 
!_is_cancelled) {
-        for (auto dependency : _result_sink_dependencys) {
-            dependency->block();
+        return;
+    }
+
+    for (auto it : _result_sink_dependencys) {
+        if (_instance_rows[it.first] > _batch_size) {
+            it.second->block();
+        } else {
+            it.second->set_ready();
         }
     }
 }
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index c8c240f928a..1296f2c606b 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -27,15 +27,16 @@
 #include <list>
 #include <memory>
 #include <mutex>
+#include <unordered_map>
 
 #include "common/status.h"
 #include "runtime/query_statistics.h"
+#include "runtime/runtime_state.h"
+#include "util/hash_util.hpp"
 
-namespace google {
-namespace protobuf {
+namespace google::protobuf {
 class Closure;
-}
-} // namespace google
+} // namespace google::protobuf
 
 namespace arrow {
 class RecordBatch;
@@ -71,19 +72,19 @@ struct GetResultBatchCtx {
 // buffer used for result customer and producer
 class BufferControlBlock {
 public:
-    BufferControlBlock(const TUniqueId& id, int buffer_size);
+    BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size);
     ~BufferControlBlock();
 
     Status init();
-    Status add_batch(std::unique_ptr<TFetchDataResult>& result);
-    Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result);
+    Status add_batch(RuntimeState* state, std::unique_ptr<TFetchDataResult>& 
result);
+    Status add_arrow_batch(RuntimeState* state, 
std::shared_ptr<arrow::RecordBatch>& result);
 
     void get_batch(GetResultBatchCtx* ctx);
     Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result);
 
     // close buffer block, set _status to exec_status and set _is_close to 
true;
     // called because data has been read or error happened.
-    Status close(Status exec_status);
+    Status close(const TUniqueId& id, Status exec_status);
     // this is called by RPC, called from coordinator
     void cancel();
 
@@ -98,7 +99,8 @@ public:
         }
     }
 
-    void set_dependency(std::shared_ptr<pipeline::Dependency> 
result_sink_dependency);
+    void set_dependency(const TUniqueId& id,
+                        std::shared_ptr<pipeline::Dependency> 
result_sink_dependency);
 
 protected:
     void _update_dependency();
@@ -121,18 +123,17 @@ protected:
 
     // protects all subsequent data in this block
     std::mutex _lock;
-    // signal arrival of new batch or the eos/cancelled condition
-    std::condition_variable _data_arrival;
-    // signal removal of data by stream consumer
-    std::condition_variable _data_removal;
 
     std::deque<GetResultBatchCtx*> _waiting_rpc;
 
     // only used for FE using return rows to check limit
     std::unique_ptr<QueryStatistics> _query_statistics;
-    std::atomic_bool _batch_queue_empty = false;
-    std::vector<std::shared_ptr<pipeline::Dependency>> 
_result_sink_dependencys;
-    size_t close_cnt = 0;
+    // instance id to dependency
+    std::unordered_map<TUniqueId, std::shared_ptr<pipeline::Dependency>> 
_result_sink_dependencys;
+    std::unordered_map<TUniqueId, size_t> _instance_rows;
+    std::list<std::unordered_map<TUniqueId, size_t>> _instance_rows_in_queue;
+
+    int _batch_size;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/result_buffer_mgr.cpp 
b/be/src/runtime/result_buffer_mgr.cpp
index 23f440d1909..ccbf0c3ff67 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -67,8 +67,8 @@ Status ResultBufferMgr::init() {
 }
 
 Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int 
buffer_size,
-                                      std::shared_ptr<BufferControlBlock>* 
sender,
-                                      int exec_timout) {
+                                      std::shared_ptr<BufferControlBlock>* 
sender, int exec_timout,
+                                      int batch_size) {
     *sender = find_control_block(query_id);
     if (*sender != nullptr) {
         LOG(WARNING) << "already have buffer control block for this instance " 
<< query_id;
@@ -77,7 +77,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& 
query_id, int buffer_size
 
     std::shared_ptr<BufferControlBlock> control_block = nullptr;
 
-    control_block = std::make_shared<BufferControlBlock>(query_id, 
buffer_size);
+    control_block = std::make_shared<BufferControlBlock>(query_id, 
buffer_size, batch_size);
 
     {
         std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
diff --git a/be/src/runtime/result_buffer_mgr.h 
b/be/src/runtime/result_buffer_mgr.h
index 30b1b61eb7d..8bac69c23ac 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -58,7 +58,8 @@ public:
     // the returned sender do not need release
     // sender is not used when call cancel or unregister
     Status create_sender(const TUniqueId& query_id, int buffer_size,
-                         std::shared_ptr<BufferControlBlock>* sender, int 
exec_timeout);
+                         std::shared_ptr<BufferControlBlock>* sender, int 
exec_timeout,
+                         int batch_size);
 
     // fetch data result to FE
     void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx);
diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h
index 78082956d0e..df1b7a808d9 100644
--- a/be/src/runtime/result_writer.h
+++ b/be/src/runtime/result_writer.h
@@ -47,7 +47,7 @@ public:
     [[nodiscard]] bool output_object_data() const { return 
_output_object_data; }
 
     // Write is sync, it will do real IO work.
-    virtual Status write(vectorized::Block& block) = 0;
+    virtual Status write(RuntimeState* state, vectorized::Block& block) = 0;
 
     void set_output_object_data(bool output_object_data) {
         _output_object_data = output_object_data;
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index d4d20ea5a48..8078467d5ca 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -432,7 +432,7 @@ Status PointQueryExecutor::_lookup_row_data() {
                     _reusable->get_col_default_values(), 
_reusable->include_col_uids());
         }
         if (!_reusable->missing_col_uids().empty()) {
-            if 
(!_reusable->runtime_state().enable_short_circuit_query_access_column_store()) {
+            if 
(!_reusable->runtime_state()->enable_short_circuit_query_access_column_store()) 
{
                 std::string missing_columns;
                 for (int cid : _reusable->missing_col_uids()) {
                     missing_columns += 
_tablet->tablet_schema()->column_by_uid(cid).name() + ",";
@@ -487,10 +487,10 @@ Status PointQueryExecutor::_lookup_row_data() {
 }
 
 template <typename MysqlWriter>
-Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block,
-                        PTabletKeyLookupResponse* response) {
+Status serialize_block(RuntimeState* state, MysqlWriter& mysql_writer, 
vectorized::Block& block,
+                       PTabletKeyLookupResponse* response) {
     block.clear_names();
-    RETURN_IF_ERROR(mysql_writer.write(block));
+    RETURN_IF_ERROR(mysql_writer.write(state, block));
     assert(mysql_writer.results().size() == 1);
     uint8_t* buf = nullptr;
     uint32_t len = 0;
@@ -508,11 +508,13 @@ Status PointQueryExecutor::_output_data() {
         if (_binary_row_format) {
             vectorized::VMysqlResultWriter<true> mysql_writer(nullptr, 
_reusable->output_exprs(),
                                                               nullptr);
-            RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, 
_response));
+            RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(), 
mysql_writer,
+                                            *_result_block, _response));
         } else {
             vectorized::VMysqlResultWriter<false> mysql_writer(nullptr, 
_reusable->output_exprs(),
                                                                nullptr);
-            RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, 
_response));
+            RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(), 
mysql_writer,
+                                            *_result_block, _response));
         }
         VLOG_DEBUG << "dump block " << _result_block->dump_data();
     } else {
diff --git a/be/src/service/point_query_executor.h 
b/be/src/service/point_query_executor.h
index 1bed53891c3..f374e094806 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -98,7 +98,7 @@ public:
 
     const std::unordered_set<int32_t> include_col_uids() const { return 
_include_col_uids; }
 
-    const RuntimeState& runtime_state() const { return *_runtime_state; }
+    RuntimeState* runtime_state() { return _runtime_state.get(); }
 
 private:
     // caching TupleDescriptor, output_expr, etc...
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp 
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index d646cf66f34..b23d1668465 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -53,7 +53,7 @@ void VArrowFlightResultWriter::_init_profile() {
     _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", 
TUnit::BYTES);
 }
 
-Status VArrowFlightResultWriter::write(Block& input_block) {
+Status VArrowFlightResultWriter::write(RuntimeState* state, Block& 
input_block) {
     SCOPED_TIMER(_append_row_batch_timer);
     Status status = Status::OK();
     if (UNLIKELY(input_block.rows() == 0)) {
@@ -80,7 +80,7 @@ Status VArrowFlightResultWriter::write(Block& input_block) {
         SCOPED_TIMER(_result_send_timer);
         // If this is a dry run task, no need to send data block
         if (!_is_dry_run) {
-            status = _sinker->add_arrow_batch(result);
+            status = _sinker->add_arrow_batch(state, result);
         }
         if (status.ok()) {
             _written_rows += num_rows;
diff --git a/be/src/vec/sink/varrow_flight_result_writer.h 
b/be/src/vec/sink/varrow_flight_result_writer.h
index 862b074cb35..ab2578421c8 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.h
+++ b/be/src/vec/sink/varrow_flight_result_writer.h
@@ -44,7 +44,7 @@ public:
 
     Status init(RuntimeState* state) override;
 
-    Status write(Block& block) override;
+    Status write(RuntimeState* state, Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp 
b/be/src/vec/sink/vmysql_result_writer.cpp
index 804f50f0fc8..45941173d4d 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -102,7 +102,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
 }
 
 template <bool is_binary_format>
-Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) {
+Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& 
input_block) {
     SCOPED_TIMER(_append_row_batch_timer);
     Status status = Status::OK();
     if (UNLIKELY(input_block.rows() == 0)) {
@@ -194,7 +194,7 @@ Status VMysqlResultWriter<is_binary_format>::write(Block& 
input_block) {
         // If this is a dry run task, no need to send data block
         if (!_is_dry_run) {
             if (_sinker) {
-                status = _sinker->add_batch(result);
+                status = _sinker->add_batch(state, result);
             } else {
                 _results.push_back(std::move(result));
             }
diff --git a/be/src/vec/sink/vmysql_result_writer.h 
b/be/src/vec/sink/vmysql_result_writer.h
index da3cdcf0690..306d062a6be 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -47,7 +47,7 @@ public:
 
     Status init(RuntimeState* state) override;
 
-    Status write(Block& block) override;
+    Status write(RuntimeState* state, Block& block) override;
 
     Status close(Status status) override;
 
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 814d1b754c4..42fd8468e86 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -140,7 +140,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
             }
 
             auto block = _get_block_from_queue();
-            auto status = write(*block);
+            auto status = write(state, *block);
             if (!status.ok()) [[unlikely]] {
                 std::unique_lock l(_m);
                 _writer_status.update(status);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 2703330406c..fc8aacdbfa1 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -105,7 +105,7 @@ VIcebergTableWriter::_to_iceberg_partition_columns() {
     return partition_columns;
 }
 
-Status VIcebergTableWriter::write(vectorized::Block& block) {
+Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& 
block) {
     SCOPED_RAW_TIMER(&_send_data_ns);
     if (block.rows() == 0) {
         return Status::OK();
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index 35e71d1960f..e2e582e04ad 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -54,7 +54,7 @@ public:
 
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
-    Status write(vectorized::Block& block) override;
+    Status write(RuntimeState* state, vectorized::Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 96c4edc82b5..c897892cbfc 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -194,7 +194,7 @@ std::string VFileResultWriter::_file_format_to_name() {
     }
 }
 
-Status VFileResultWriter::write(Block& block) {
+Status VFileResultWriter::write(RuntimeState* state, Block& block) {
     if (block.rows() == 0) {
         return Status::OK();
     }
@@ -291,7 +291,8 @@ Status VFileResultWriter::_send_result() {
     attach_infos.insert(std::make_pair("URL", file_url));
 
     result->result_batch.__set_attached_infos(attach_infos);
-    RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send 
outfile result");
+    RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result),
+                                   "failed to send outfile result");
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h 
b/be/src/vec/sink/writer/vfile_result_writer.h
index 44b0695505f..42753a5e261 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -60,7 +60,7 @@ public:
 
     VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
-    Status write(Block& block) override;
+    Status write(RuntimeState* state, Block& block) override;
 
     Status close(Status exec_status) override;
 
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp 
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index 0e64060eb0b..f90c7134ccd 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -81,7 +81,7 @@ Status VHiveTableWriter::open(RuntimeState* state, 
RuntimeProfile* profile) {
     return Status::OK();
 }
 
-Status VHiveTableWriter::write(vectorized::Block& block) {
+Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
     SCOPED_RAW_TIMER(&_send_data_ns);
 
     if (block.rows() == 0) {
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h 
b/be/src/vec/sink/writer/vhive_table_writer.h
index 4989ba443c7..6c8b972f280 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.h
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -41,13 +41,13 @@ class VHiveTableWriter final : public AsyncResultWriter {
 public:
     VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
-    ~VHiveTableWriter() = default;
+    ~VHiveTableWriter() override = default;
 
     Status init_properties(ObjectPool* pool);
 
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
-    Status write(vectorized::Block& block) override;
+    Status write(RuntimeState* state, vectorized::Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp 
b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
index 9493bfbf072..b7c8d1f78dd 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
@@ -60,7 +60,7 @@ VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
                                    const VExprContextSPtrs& output_expr_ctxs)
         : AsyncResultWriter(output_expr_ctxs), 
JdbcConnector(create_connect_param(t_sink)) {}
 
-Status VJdbcTableWriter::write(vectorized::Block& block) {
+Status VJdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
     auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h 
b/be/src/vec/sink/writer/vjdbc_table_writer.h
index a683259c992..b8216c3bcd6 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.h
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.h
@@ -44,7 +44,7 @@ public:
         return init_to_write(profile);
     }
 
-    Status write(vectorized::Block& block) override;
+    Status write(RuntimeState* state, vectorized::Block& block) override;
 
     Status finish(RuntimeState* state) override { return 
JdbcConnector::finish_trans(); }
 
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp 
b/be/src/vec/sink/writer/vmysql_table_writer.cpp
index d9ca6d96f99..45afe8ce019 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp
@@ -109,7 +109,7 @@ Status VMysqlTableWriter::open(RuntimeState* state, 
RuntimeProfile* profile) {
     return Status::OK();
 }
 
-Status VMysqlTableWriter::write(vectorized::Block& block) {
+Status VMysqlTableWriter::write(RuntimeState* state, vectorized::Block& block) 
{
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
     auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h 
b/be/src/vec/sink/writer/vmysql_table_writer.h
index 856d0a21ec5..072885b176b 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.h
+++ b/be/src/vec/sink/writer/vmysql_table_writer.h
@@ -51,7 +51,7 @@ public:
     // connect to mysql server
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
-    Status write(vectorized::Block& block) override;
+    Status write(RuntimeState* state, vectorized::Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp 
b/be/src/vec/sink/writer/vodbc_table_writer.cpp
index da068c3d677..c70bdd4ca19 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp
@@ -45,7 +45,7 @@ VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink& 
t_sink,
                                    const VExprContextSPtrs& output_expr_ctxs)
         : AsyncResultWriter(output_expr_ctxs), 
ODBCConnector(create_connect_param(t_sink)) {}
 
-Status VOdbcTableWriter::write(vectorized::Block& block) {
+Status VOdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
     auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h 
b/be/src/vec/sink/writer/vodbc_table_writer.h
index 687b5106a8b..fa4dc47b77f 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.h
+++ b/be/src/vec/sink/writer/vodbc_table_writer.h
@@ -44,7 +44,7 @@ public:
         return init_to_write(profile);
     }
 
-    Status write(vectorized::Block& block) override;
+    Status write(RuntimeState* state, vectorized::Block& block) override;
 
     Status finish(RuntimeState* state) override { return 
ODBCConnector::finish_trans(); }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 4f0f11e9db8..a7df57e1beb 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1369,7 +1369,7 @@ Status VTabletWriter::_send_new_partition_batch() {
         //  2. deal batched block
         //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
         _row_distribution.clear_batching_stats();
-        RETURN_IF_ERROR(this->write(tmp_block));
+        RETURN_IF_ERROR(this->write(_state, tmp_block));
         _row_distribution._batching_block->set_mutable_columns(
                 tmp_block.mutate_columns()); // Recovery back
         _row_distribution._batching_block->clear_column_data();
@@ -1668,7 +1668,7 @@ void VTabletWriter::_generate_index_channels_payloads(
     }
 }
 
-Status VTabletWriter::write(doris::vectorized::Block& input_block) {
+Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block& 
input_block) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     Status status = Status::OK();
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 21d7b1c9f17..b9fbc4d0873 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -544,7 +544,7 @@ class VTabletWriter final : public AsyncResultWriter {
 public:
     VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
-    Status write(Block& block) override;
+    Status write(RuntimeState* state, Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 3c8dede657f..9bd154ce212 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -373,7 +373,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, 
int64_t partition_id,
     return Status::OK();
 }
 
-Status VTabletWriterV2::write(Block& input_block) {
+Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     Status status = Status::OK();
 
@@ -502,7 +502,7 @@ Status VTabletWriterV2::_send_new_partition_batch() {
         //  2. deal batched block
         //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
         _row_distribution.clear_batching_stats();
-        RETURN_IF_ERROR(this->write(tmp_block));
+        RETURN_IF_ERROR(this->write(_state, tmp_block));
         _row_distribution._batching_block->set_mutable_columns(
                 tmp_block.mutate_columns()); // Recovery back
         _row_distribution._batching_block->clear_column_data();
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index ff31e1552dd..363dea54c3b 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -106,7 +106,7 @@ public:
 
     ~VTabletWriterV2() override;
 
-    Status write(Block& block) override;
+    Status write(RuntimeState* state, Block& block) override;
 
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp 
b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
index 5ba8af8b81f..97e78f05c54 100644
--- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
@@ -317,7 +317,7 @@ void serialize_and_deserialize_mysql_test() {
     // mysql_writer init
     vectorized::VMysqlResultWriter<false> mysql_writer(nullptr, 
_output_vexpr_ctxs, nullptr);
 
-    Status st = mysql_writer.write(block);
+    Status st = mysql_writer.write(&runtime_stat, block);
     EXPECT_TRUE(st.ok());
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to