This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2c6809df05c034a1ff5cc0d8c568968e15cdd594
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Wed Apr 24 17:54:11 2024 +0800

    [improvement](spill) improve config of spill thread pool (#33992)
---
 be/src/common/config.cpp                           |  13 +-
 be/src/common/config.h                             |   4 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |   2 +-
 .../exec/partitioned_aggregation_sink_operator.h   |  46 +++----
 .../partitioned_aggregation_source_operator.cpp    |   2 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  |  12 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   |   4 +-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  | 140 ++++++++++-----------
 .../pipeline/exec/spill_sort_source_operator.cpp   |   3 +-
 be/src/pipeline/pipeline_x/dependency.cpp          |   1 -
 be/src/pipeline/pipeline_x/dependency.h            |   6 +-
 be/src/vec/spill/spill_stream.cpp                  |  64 ++--------
 be/src/vec/spill/spill_stream.h                    |  10 +-
 be/src/vec/spill/spill_stream_manager.cpp          |  25 +---
 be/src/vec/spill/spill_stream_manager.h            |  10 +-
 15 files changed, 120 insertions(+), 222 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index aa50787d5b5..85d17c08e74 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1164,10 +1164,15 @@ DEFINE_String(spill_storage_root_path, "");
 DEFINE_String(spill_storage_limit, "20%");   // 20%
 DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
 DEFINE_mInt32(spill_gc_file_count, "2000");
-DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2");
-DEFINE_Int32(spill_io_thread_pool_queue_size, "1024");
-DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2");
-DEFINE_Int32(spill_async_task_thread_pool_queue_size, "1024");
+DEFINE_Int32(spill_io_thread_pool_thread_num, "-1");
+DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool 
{
+    if (config == -1) {
+        CpuInfo::init();
+        spill_io_thread_pool_thread_num = std::max(48, CpuInfo::num_cores() * 
2);
+    }
+    return true;
+});
+DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");
 
 DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 1e2f71c452b..d2f95653f1e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1253,10 +1253,8 @@ DECLARE_String(spill_storage_root_path);
 DECLARE_String(spill_storage_limit);
 DECLARE_mInt32(spill_gc_interval_ms);
 DECLARE_mInt32(spill_gc_file_count);
-DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num);
+DECLARE_Int32(spill_io_thread_pool_thread_num);
 DECLARE_Int32(spill_io_thread_pool_queue_size);
-DECLARE_Int32(spill_async_task_thread_pool_thread_num);
-DECLARE_Int32(spill_async_task_thread_pool_queue_size);
 
 DECLARE_mBool(check_segment_when_build_rowset_meta);
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 55a0650dc1f..78079a0ddf8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -253,7 +253,7 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
-    status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
+    status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
             [this, &parent, state, query_id, execution_context, submit_timer] {
                 auto execution_context_lock = execution_context.lock();
                 if (!execution_context_lock) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 7ec582905d0..1755cd866f2 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -83,7 +83,7 @@ public:
                 for (int i = 0; i < Base::_shared_state->partition_count && 
!state->is_cancelled();
                      ++i) {
                     if (spill_infos[i].keys_.size() >= spill_batch_rows) {
-                        status = _async_spill_partition_and_wait(
+                        status = _spill_partition(
                                 state, context, 
Base::_shared_state->spill_partitions[i],
                                 spill_infos[i].keys_, spill_infos[i].values_, 
nullptr, false);
                         RETURN_IF_ERROR(status);
@@ -98,13 +98,13 @@ public:
             auto spill_null_key_data =
                     (hash_null_key_data && i == 
Base::_shared_state->partition_count - 1);
             if (spill_infos[i].keys_.size() > 0 || spill_null_key_data) {
-                status = _async_spill_partition_and_wait(
-                        state, context, 
Base::_shared_state->spill_partitions[i],
-                        spill_infos[i].keys_, spill_infos[i].values_,
-                        spill_null_key_data ? hash_table.template 
get_null_key_data<
-                                                      
vectorized::AggregateDataPtr>()
-                                            : nullptr,
-                        true);
+                status = _spill_partition(state, context, 
Base::_shared_state->spill_partitions[i],
+                                          spill_infos[i].keys_, 
spill_infos[i].values_,
+                                          spill_null_key_data
+                                                  ? hash_table.template 
get_null_key_data<
+                                                            
vectorized::AggregateDataPtr>()
+                                                  : nullptr,
+                                          true);
                 RETURN_IF_ERROR(status);
             }
         }
@@ -120,12 +120,10 @@ public:
     }
 
     template <typename HashTableCtxType, typename KeyType>
-    Status _async_spill_partition_and_wait(RuntimeState* state, 
HashTableCtxType& context,
-                                           AggSpillPartitionSPtr& 
spill_partition,
-                                           std::vector<KeyType>& keys,
-                                           
std::vector<vectorized::AggregateDataPtr>& values,
-                                           const vectorized::AggregateDataPtr 
null_key_data,
-                                           bool is_last) {
+    Status _spill_partition(RuntimeState* state, HashTableCtxType& context,
+                            AggSpillPartitionSPtr& spill_partition, 
std::vector<KeyType>& keys,
+                            std::vector<vectorized::AggregateDataPtr>& values,
+                            const vectorized::AggregateDataPtr null_key_data, 
bool is_last) {
         vectorized::SpillStreamSPtr spill_stream;
         auto status = spill_partition->get_spill_stream(state, 
Base::_parent->node_id(),
                                                         Base::profile(), 
spill_stream);
@@ -148,27 +146,15 @@ public:
             keys.clear();
             values.clear();
         }
-
         status = spill_stream->prepare_spill();
         RETURN_IF_ERROR(status);
 
-        status = ExecEnv::GetInstance()
-                         ->spill_stream_mgr()
-                         
->get_spill_io_thread_pool(spill_stream->get_spill_root_dir())
-                         ->submit_func([this, state, &spill_stream] {
-                             (void)state; // avoid ut compile error
-                             SCOPED_ATTACH_TASK(state);
-                             SCOPED_TIMER(_spill_write_disk_timer);
-                             Status status;
-                             Defer defer {[&]() { 
spill_stream->end_spill(status); }};
-                             status = spill_stream->spill_block(state, block_, 
false);
-                             return status;
-                         });
-        if (!status.ok()) {
-            spill_stream->end_spill(status);
+        {
+            SCOPED_TIMER(_spill_write_disk_timer);
+            status = spill_stream->spill_block(state, block_, false);
         }
         RETURN_IF_ERROR(status);
-        status = spill_partition->wait_spill(state);
+        status = spill_partition->flush_if_full();
         _reset_tmp_data();
         return status;
     }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index a5753ef7654..ff4795f2079 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -211,7 +211,7 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
     submit_timer.start();
 
     RETURN_IF_ERROR(
-            
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
+            
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
                     [this, state, query_id, execution_context, submit_timer] {
                         auto execution_context_lock = execution_context.lock();
                         if (!execution_context_lock) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 2f766511984..0b52e41df28 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -175,8 +175,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
                                                   _spill_write_wait_io_timer);
     }
 
-    auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
-            build_spilling_stream->get_spill_root_dir());
+    auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
     MonotonicStopWatch submit_timer;
@@ -230,8 +229,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
                                             _spill_write_wait_io_timer);
     }
 
-    auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
-            spilling_stream->get_spill_root_dir());
+    auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
 
     auto& blocks = _probe_blocks[partition_index];
     auto& partitioned_block = _partitioned_blocks[partition_index];
@@ -296,7 +294,6 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
 Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t 
partition_index) {
     auto& build_spilling_stream = 
_shared_state->spilled_streams[partition_index];
     if (build_spilling_stream) {
-        build_spilling_stream->end_spill(Status::OK());
         RETURN_IF_ERROR(build_spilling_stream->spill_eof());
         build_spilling_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
                                                  _spill_read_bytes, 
_spill_read_wait_io_timer);
@@ -305,7 +302,6 @@ Status 
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
     auto& probe_spilling_stream = _probe_spilling_streams[partition_index];
 
     if (probe_spilling_stream) {
-        probe_spilling_stream->end_spill(Status::OK());
         RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
         probe_spilling_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
                                                  _spill_read_bytes, 
_spill_read_wait_io_timer);
@@ -387,7 +383,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
         _dependency->set_ready();
     };
 
-    auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
+    auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     has_data = true;
     _dependency->block();
 
@@ -453,7 +449,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
         _dependency->set_ready();
     };
 
-    auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
+    auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     DCHECK(spill_io_pool != nullptr);
     _dependency->block();
     has_data = true;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index b8454c19bf3..ade9096388b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -219,7 +219,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
 
         _dependency->set_ready();
     };
-    auto* thread_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
+    auto* thread_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     return thread_pool->submit_func(spill_func);
 }
 
@@ -247,7 +247,7 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
         DCHECK(spilling_stream != nullptr);
 
         auto* spill_io_pool =
-                
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
+                
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
         DCHECK(spill_io_pool != nullptr);
         auto execution_context = state->get_task_execution_context();
         _shared_state_holder = _shared_state->shared_from_this();
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index f5e74826bb6..af505099e82 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -233,80 +233,74 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    status = ExecEnv::GetInstance()
-                     ->spill_stream_mgr()
-                     
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
-                     ->submit_func([this, state, query_id, &parent, 
execution_context,
-                                    submit_timer] {
-                         auto execution_context_lock = 
execution_context.lock();
-                         if (!execution_context_lock) {
-                             LOG(INFO) << "query " << print_id(query_id)
-                                       << " execution_context released, maybe 
query was cancelled.";
-                             return Status::OK();
-                         }
-
-                         
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                         SCOPED_ATTACH_TASK(state);
-                         Defer defer {[&]() {
-                             if (!_shared_state->sink_status.ok() || 
state->is_cancelled()) {
-                                 if (!_shared_state->sink_status.ok()) {
-                                     LOG(WARNING) << "query " << 
print_id(query_id) << " sort node "
-                                                  << _parent->id() << " revoke 
memory error: "
-                                                  << 
_shared_state->sink_status;
-                                 }
-                                 _shared_state->close();
-                             } else {
-                                 VLOG_DEBUG << "query " << print_id(query_id) 
<< " sort node "
-                                            << _parent->id() << " revoke 
memory finish";
-                             }
-
-                             
_spilling_stream->end_spill(_shared_state->sink_status);
-                             if (!_shared_state->sink_status.ok()) {
-                                 _shared_state->close();
-                             }
-
-                             _spilling_stream.reset();
-                             if (_eos) {
-                                 _dependency->set_ready_to_read();
-                                 _finish_dependency->set_ready();
-                             } else {
-                                 _dependency->Dependency::set_ready();
-                             }
-                         }};
-
-                         _shared_state->sink_status = 
parent._sort_sink_operator->prepare_for_spill(
-                                 _runtime_state.get());
-                         RETURN_IF_ERROR(_shared_state->sink_status);
-
-                         auto* sink_local_state = 
_runtime_state->get_sink_local_state();
-                         update_profile(sink_local_state->profile());
-
-                         bool eos = false;
-                         vectorized::Block block;
-                         while (!eos && !state->is_cancelled()) {
-                             {
-                                 SCOPED_TIMER(_spill_merge_sort_timer);
-                                 _shared_state->sink_status =
-                                         
parent._sort_sink_operator->merge_sort_read_for_spill(
-                                                 _runtime_state.get(), &block,
-                                                 
_shared_state->spill_block_batch_row_count, &eos);
-                             }
-                             RETURN_IF_ERROR(_shared_state->sink_status);
-                             {
-                                 SCOPED_TIMER(Base::_spill_timer);
-                                 _shared_state->sink_status =
-                                         _spilling_stream->spill_block(state, 
block, eos);
-                             }
-                             RETURN_IF_ERROR(_shared_state->sink_status);
-                             block.clear_column_data();
-                         }
-                         
parent._sort_sink_operator->reset(_runtime_state.get());
-
-                         return Status::OK();
-                     });
-    if (!status.ok()) {
-        _spilling_stream->end_spill(status);
+    status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
+            [this, state, query_id, &parent, execution_context, submit_timer] {
+                auto execution_context_lock = execution_context.lock();
+                if (!execution_context_lock) {
+                    LOG(INFO) << "query " << print_id(query_id)
+                              << " execution_context released, maybe query was 
cancelled.";
+                    return Status::OK();
+                }
 
+                
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+                SCOPED_ATTACH_TASK(state);
+                Defer defer {[&]() {
+                    if (!_shared_state->sink_status.ok() || 
state->is_cancelled()) {
+                        if (!_shared_state->sink_status.ok()) {
+                            LOG(WARNING) << "query " << print_id(query_id) << 
" sort node "
+                                         << _parent->id()
+                                         << " revoke memory error: " << 
_shared_state->sink_status;
+                        }
+                        _shared_state->close();
+                    } else {
+                        VLOG_DEBUG << "query " << print_id(query_id) << " sort 
node "
+                                   << _parent->id() << " revoke memory finish";
+                    }
+
+                    if (!_shared_state->sink_status.ok()) {
+                        _shared_state->close();
+                    }
+
+                    _spilling_stream.reset();
+                    if (_eos) {
+                        _dependency->set_ready_to_read();
+                        _finish_dependency->set_ready();
+                    } else {
+                        _dependency->Dependency::set_ready();
+                    }
+                }};
+
+                _shared_state->sink_status =
+                        
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
+                RETURN_IF_ERROR(_shared_state->sink_status);
+
+                auto* sink_local_state = 
_runtime_state->get_sink_local_state();
+                update_profile(sink_local_state->profile());
+
+                bool eos = false;
+                vectorized::Block block;
+                while (!eos && !state->is_cancelled()) {
+                    {
+                        SCOPED_TIMER(_spill_merge_sort_timer);
+                        _shared_state->sink_status =
+                                
parent._sort_sink_operator->merge_sort_read_for_spill(
+                                        _runtime_state.get(), &block,
+                                        
_shared_state->spill_block_batch_row_count, &eos);
+                    }
+                    RETURN_IF_ERROR(_shared_state->sink_status);
+                    {
+                        SCOPED_TIMER(Base::_spill_timer);
+                        _shared_state->sink_status =
+                                _spilling_stream->spill_block(state, block, 
eos);
+                    }
+                    RETURN_IF_ERROR(_shared_state->sink_status);
+                    block.clear_column_data();
+                }
+                parent._sort_sink_operator->reset(_runtime_state.get());
+
+                return Status::OK();
+            });
+    if (!status.ok()) {
         if (!_eos) {
             Base::_dependency->Dependency::set_ready();
         }
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 107868f968d..115f40a3636 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -148,7 +148,6 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                 _status = tmp_stream->prepare_spill();
                 RETURN_IF_ERROR(_status);
 
-                Defer defer {[&]() { tmp_stream->end_spill(_status); }};
                 _shared_state->sorted_streams.emplace_back(tmp_stream);
 
                 bool eos = false;
@@ -173,7 +172,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
         }
         return Status::OK();
     };
-    return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
+    return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
             spill_func);
 }
 
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 6dccf15dbd8..ec890dac7f1 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -231,7 +231,6 @@ Status AggSpillPartition::get_spill_stream(RuntimeState* 
state, int node_id,
 }
 void AggSpillPartition::close() {
     if (spilling_stream_) {
-        (void)spilling_stream_->wait_spill();
         spilling_stream_.reset();
     }
     for (auto& stream : spill_streams_) {
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index ebd44bec4d2..d663b0aae8b 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -445,11 +445,9 @@ struct AggSpillPartition {
     Status get_spill_stream(RuntimeState* state, int node_id, RuntimeProfile* 
profile,
                             vectorized::SpillStreamSPtr& spilling_stream);
 
-    // wait for current bock spilling to finish
-    Status wait_spill(RuntimeState* state) {
+    Status flush_if_full() {
         DCHECK(spilling_stream_);
-        auto status = spilling_stream_->wait_spill();
-        RETURN_IF_ERROR(status);
+        Status status;
         // avoid small spill files
         if (spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
             status = spilling_stream_->spill_eof();
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index ed7be9a0b28..e4631f1e1cd 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -42,10 +42,7 @@ SpillStream::SpillStream(RuntimeState* state, int64_t 
stream_id, SpillDataDir* d
           spill_dir_(std::move(spill_dir)),
           batch_rows_(batch_rows),
           batch_bytes_(batch_bytes),
-          profile_(profile) {
-    io_thread_pool_ =
-            
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(data_dir->path());
-}
+          profile_(profile) {}
 
 SpillStream::~SpillStream() {
     bool exists = false;
@@ -70,14 +67,6 @@ void SpillStream::close() {
     }
     VLOG_ROW << "closing: " << stream_id_;
     closed_ = true;
-    if (spill_promise_) {
-        spill_future_.wait();
-        spill_promise_.reset();
-    }
-    if (read_promise_) {
-        read_future_.wait();
-        read_promise_.reset();
-    }
 
     if (writer_) {
         (void)writer_->close();
@@ -97,25 +86,7 @@ const std::string& SpillStream::get_spill_root_dir() const {
     return data_dir_->path();
 }
 Status SpillStream::prepare_spill() {
-    DCHECK(!spill_promise_);
-    RETURN_IF_ERROR(writer_->open());
-
-    spill_promise_ = std::make_unique<std::promise<Status>>();
-    spill_future_ = spill_promise_->get_future();
-    return Status::OK();
-}
-void SpillStream::end_spill(const Status& status) {
-    spill_promise_->set_value(status);
-}
-
-Status SpillStream::wait_spill() {
-    if (spill_promise_) {
-        SCOPED_TIMER(write_wait_io_timer_);
-        auto status = spill_future_.get();
-        spill_promise_.reset();
-        return status;
-    }
-    return Status::OK();
+    return writer_->open();
 }
 
 Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool 
eof) {
@@ -135,34 +106,13 @@ Status SpillStream::spill_eof() {
 }
 
 Status SpillStream::read_next_block_sync(Block* block, bool* eos) {
-    DCHECK(!read_promise_);
     DCHECK(reader_ != nullptr);
-    Status status;
-    read_promise_ = std::make_unique<std::promise<Status>>();
-    read_future_ = read_promise_->get_future();
-    // use thread pool to limit concurrent io tasks
-    status = io_thread_pool_->submit_func([this, block, eos] {
-        SCOPED_ATTACH_TASK(state_);
-        Status st;
-        Defer defer {[&]() { read_promise_->set_value(st); }};
-        st = reader_->open();
-        if (!st.ok()) {
-            return;
-        }
-        st = reader_->read(block, eos);
-    });
-    if (!status.ok()) {
-        LOG(WARNING) << "read spill data failed: " << status;
-        read_promise_.reset();
-        return status;
-    }
+    DCHECK(!_is_reading);
+    _is_reading = true;
+    Defer defer([this] { _is_reading = false; });
 
-    {
-        SCOPED_TIMER(read_wait_io_timer_);
-        status = read_future_.get();
-    }
-    read_promise_.reset();
-    return status;
+    RETURN_IF_ERROR(reader_->open());
+    return reader_->read(block, eos);
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 68abfa9aaf7..638942d1af1 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -55,12 +55,8 @@ public:
 
     Status spill_block(RuntimeState* state, const Block& block, bool eof);
 
-    void end_spill(const Status& status);
-
     Status spill_eof();
 
-    Status wait_spill();
-
     Status read_next_block_sync(Block* block, bool* eos);
 
     void set_write_counters(RuntimeProfile::Counter* serialize_timer,
@@ -91,7 +87,6 @@ private:
     void close();
 
     RuntimeState* state_ = nullptr;
-    ThreadPool* io_thread_pool_;
     int64_t stream_id_;
     std::atomic_bool closed_ = false;
     SpillDataDir* data_dir_ = nullptr;
@@ -99,10 +94,7 @@ private:
     size_t batch_rows_;
     size_t batch_bytes_;
 
-    std::unique_ptr<std::promise<Status>> spill_promise_;
-    std::future<Status> spill_future_;
-    std::unique_ptr<std::promise<Status>> read_promise_;
-    std::future<Status> read_future_;
+    std::atomic_bool _is_reading = false;
 
     SpillWriterUPtr writer_;
     SpillReaderUPtr reader_;
diff --git a/be/src/vec/spill/spill_stream_manager.cpp 
b/be/src/vec/spill/spill_stream_manager.cpp
index 05a2531c466..2042555e49c 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -50,11 +50,6 @@ Status SpillStreamManager::init() {
     LOG(INFO) << "init spill stream manager";
     RETURN_IF_ERROR(_init_spill_store_map());
 
-    int spill_io_thread_count = 
config::spill_io_thread_pool_per_disk_thread_num;
-    if (spill_io_thread_count <= 0) {
-        spill_io_thread_count = 2;
-    }
-    int pool_idx = 0;
     for (const auto& [path, store] : _spill_store_map) {
         auto gc_dir_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX);
         bool exists = true;
@@ -85,20 +80,12 @@ Status SpillStreamManager::init() {
             }
         }
         store->update_spill_data_usage(spill_data_size);
-
-        std::unique_ptr<ThreadPool> io_pool;
-        
static_cast<void>(ThreadPoolBuilder(fmt::format("SpillIOThreadPool-{}", 
pool_idx++))
-                                  .set_min_threads(spill_io_thread_count)
-                                  .set_max_threads(spill_io_thread_count)
-                                  
.set_max_queue_size(config::spill_io_thread_pool_queue_size)
-                                  .build(&io_pool));
-        path_to_io_thread_pool_[path] = std::move(io_pool);
     }
-    static_cast<void>(ThreadPoolBuilder("SpillAsyncTaskThreadPool")
-                              
.set_min_threads(config::spill_async_task_thread_pool_thread_num)
-                              
.set_max_threads(config::spill_async_task_thread_pool_thread_num)
-                              
.set_max_queue_size(config::spill_async_task_thread_pool_queue_size)
-                              .build(&async_task_thread_pool_));
+    static_cast<void>(ThreadPoolBuilder("SpillIOThreadPool")
+                              
.set_min_threads(config::spill_io_thread_pool_thread_num)
+                              
.set_max_threads(config::spill_io_thread_pool_thread_num)
+                              
.set_max_queue_size(config::spill_io_thread_pool_queue_size)
+                              .build(&_spill_io_thread_pool));
 
     RETURN_IF_ERROR(Thread::create(
             "Spill", "spill_gc_thread", [this]() { 
this->_spill_gc_thread_callback(); },
@@ -274,7 +261,7 @@ void SpillStreamManager::gc(int64_t max_file_count) {
 }
 
 void SpillStreamManager::async_cleanup_query(TUniqueId query_id) {
-    (void)get_async_task_thread_pool()->submit_func([this, query_id] {
+    (void)get_spill_io_thread_pool()->submit_func([this, query_id] {
         for (auto& [_, store] : _spill_store_map) {
             std::string query_spill_dir =
                     fmt::format("{}/{}/{}", store->path(), SPILL_DIR_PREFIX, 
print_id(query_id));
diff --git a/be/src/vec/spill/spill_stream_manager.h 
b/be/src/vec/spill/spill_stream_manager.h
index 36062ce0b46..298af77afcc 100644
--- a/be/src/vec/spill/spill_stream_manager.h
+++ b/be/src/vec/spill/spill_stream_manager.h
@@ -112,12 +112,7 @@ public:
 
     void gc(int64_t max_file_count);
 
-    ThreadPool* get_spill_io_thread_pool(const std::string& path) const {
-        const auto it = path_to_io_thread_pool_.find(path);
-        DCHECK(it != path_to_io_thread_pool_.end());
-        return it->second.get();
-    }
-    ThreadPool* get_async_task_thread_pool() const { return 
async_task_thread_pool_.get(); }
+    ThreadPool* get_spill_io_thread_pool() const { return 
_spill_io_thread_pool.get(); }
 
 private:
     Status _init_spill_store_map();
@@ -127,8 +122,7 @@ private:
     std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> 
_spill_store_map;
 
     CountDownLatch _stop_background_threads_latch;
-    std::unique_ptr<ThreadPool> async_task_thread_pool_;
-    std::unordered_map<std::string, std::unique_ptr<ThreadPool>> 
path_to_io_thread_pool_;
+    std::unique_ptr<ThreadPool> _spill_io_thread_pool;
     scoped_refptr<Thread> _spill_gc_thread;
 
     std::atomic_uint64_t id_ = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to