This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 2d099802744 [fix] std::sort coredump and low mem mode buffer limit 
(#46006)
2d099802744 is described below

commit 2d09980274434df222a25a7243302502e075776d
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Thu Dec 26 14:54:10 2024 +0800

    [fix] std::sort coredump and low mem mode buffer limit (#46006)
---
 be/src/pipeline/dependency.h                       | 10 +++---
 be/src/pipeline/exec/multi_cast_data_streamer.h    |  6 +---
 .../local_exchange_sink_operator.cpp               |  2 +-
 be/src/runtime/runtime_state.h                     |  8 +++++
 be/src/vec/spill/spill_stream_manager.cpp          | 40 +++++-----------------
 .../java/org/apache/doris/qe/SessionVariable.java  | 10 ++++--
 gensrc/thrift/PaloInternalService.thrift           |  2 ++
 7 files changed, 35 insertions(+), 43 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 13f983db3dd..a023e5661e2 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -840,8 +840,9 @@ public:
         }
     }
 
-    virtual void set_low_memory_mode() {
-        _buffer_mem_limit = 
std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024);
+    virtual void set_low_memory_mode(RuntimeState* state) {
+        _buffer_mem_limit = 
std::min<int64_t>(config::local_exchange_buffer_mem_limit,
+                                              
state->low_memory_mode_buffer_limit());
     }
 };
 
@@ -888,8 +889,9 @@ struct LocalMergeExchangeSharedState : public 
LocalExchangeSharedState {
         source_deps[channel_id]->set_ready();
     }
 
-    void set_low_memory_mode() override {
-        _buffer_mem_limit = 
std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024);
+    void set_low_memory_mode(RuntimeState* state) override {
+        _buffer_mem_limit = 
std::min<int64_t>(config::local_exchange_buffer_mem_limit,
+                                              
state->low_memory_mode_buffer_limit());
         DCHECK(!_queues_mem_usage.empty());
         _each_queue_limit =
                 std::max<int64_t>(64 * 1024, _buffer_mem_limit / 
_queues_mem_usage.size());
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h 
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 0b925f7a5fe..4669a0389c7 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -81,11 +81,7 @@ public:
         _process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT);
     };
 
-    ~MultiCastDataStreamer() {
-        for (auto& item : _spill_readers) {
-            DCHECK(item.empty());
-        }
-    }
+    ~MultiCastDataStreamer() = default;
 
     Status pull(RuntimeState* state, int sender_idx, vectorized::Block* block, 
bool* eos);
 
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 968812e594e..7f9228e246d 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -146,7 +146,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
     if (state->get_query_ctx()->low_memory_mode()) {
-        local_state._shared_state->set_low_memory_mode();
+        local_state._shared_state->set_low_memory_mode(state);
         local_state._exchanger->set_low_memory_mode();
     }
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 2220c1fc41e..a47c5d6f202 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -26,6 +26,7 @@
 #include <stdint.h>
 
 #include <atomic>
+#include <cstdint>
 #include <fstream>
 #include <functional>
 #include <memory>
@@ -599,6 +600,13 @@ public:
         return 32;
     }
 
+    int64_t low_memory_mode_buffer_limit() const {
+        if (_query_options.__isset.low_memory_mode_buffer_limit) {
+            return std::max(_query_options.low_memory_mode_buffer_limit, 
(int64_t)1);
+        }
+        return 32L * 1024 * 1024;
+    }
+
     int spill_revocable_memory_high_watermark_percent() const {
         if (_query_options.__isset.revocable_memory_high_watermark_percent) {
             return _query_options.revocable_memory_high_watermark_percent;
diff --git a/be/src/vec/spill/spill_stream_manager.cpp 
b/be/src/vec/spill/spill_stream_manager.cpp
index b323e1ab2e8..07a947b5ef3 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -108,45 +108,23 @@ Status SpillStreamManager::_init_spill_store_map() {
 
 std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill(
         TStorageMedium::type storage_medium) {
-    std::vector<SpillDataDir*> stores;
+    std::vector<std::pair<SpillDataDir*, double>> stores_with_usage;
     for (auto& [_, store] : _spill_store_map) {
         if (store->storage_medium() == storage_medium && 
!store->reach_capacity_limit(0)) {
-            stores.push_back(store.get());
+            stores_with_usage.emplace_back(store.get(), 
store->_get_disk_usage(0));
         }
     }
-    if (stores.empty()) {
-        return stores;
+    if (stores_with_usage.empty()) {
+        return {};
     }
 
-    std::sort(stores.begin(), stores.end(), [](SpillDataDir* a, SpillDataDir* 
b) {
-        return a->_get_disk_usage(0) < b->_get_disk_usage(0);
-    });
+    std::sort(stores_with_usage.begin(), stores_with_usage.end(),
+              [](auto&& a, auto&& b) { return a.second < b.second; });
 
-    size_t seventy_percent_index = stores.size();
-    size_t eighty_five_percent_index = stores.size();
-    for (size_t index = 0; index < stores.size(); index++) {
-        // If the usage of the store is less than 70%, we choose disk randomly.
-        if (stores[index]->_get_disk_usage(0) > 0.7 && seventy_percent_index 
== stores.size()) {
-            seventy_percent_index = index;
-        }
-        if (stores[index]->_get_disk_usage(0) > 0.85 &&
-            eighty_five_percent_index == stores.size()) {
-            eighty_five_percent_index = index;
-            break;
-        }
-    }
-
-    std::random_device rd;
-    std::mt19937 g(rd());
-    std::shuffle(stores.begin(), stores.begin() + seventy_percent_index, g);
-    if (seventy_percent_index != stores.size()) {
-        std::shuffle(stores.begin() + seventy_percent_index,
-                     stores.begin() + eighty_five_percent_index, g);
-    }
-    if (eighty_five_percent_index != stores.size()) {
-        std::shuffle(stores.begin() + eighty_five_percent_index, stores.end(), 
g);
+    std::vector<SpillDataDir*> stores;
+    for (const auto& [store, _] : stores_with_usage) {
+        stores.emplace_back(store);
     }
-
     return stores;
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index bab6a34528e..7ce2c922caf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -568,8 +568,10 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String SPILL_AGGREGATION_PARTITION_COUNT = 
"spill_aggregation_partition_count";
     public static final String SPILL_STREAMING_AGG_MEM_LIMIT = 
"spill_streaming_agg_mem_limit";
     public static final String SPILL_HASH_JOIN_PARTITION_COUNT = 
"spill_hash_join_partition_count";
-    public static final String SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT = 
"spill_revocable_memory_high_watermark_percent";
+    public static final String SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT =
+            "spill_revocable_memory_high_watermark_percent";
     public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
+    public static final String LOW_MEMORY_MODE_BUFFER_LIMIT = 
"low_memory_mode_buffer_limit";
 
     public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";
 
@@ -2249,6 +2251,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = SPILL_AGGREGATION_PARTITION_COUNT, fuzzy = 
true)
     public int spillAggregationPartitionCount = 32;
 
+    @VariableMgr.VarAttr(name = LOW_MEMORY_MODE_BUFFER_LIMIT, fuzzy = false)
+    public long lowMemoryModeBufferLimit = 33554432;
+
     // The memory limit of streaming agg when spilling is enabled
     // NOTE: streaming agg operator will not spill to disk.
     @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT, fuzzy = true)
@@ -3914,7 +3919,7 @@ public class SessionVariable implements Serializable, 
Writable {
         
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
         tResult.setSkipBadTablet(skipBadTablet);
         tResult.setDisableFileCache(disableFileCache);
-        
+
         // for spill
         tResult.setEnableSpill(enableSpill);
         tResult.setEnableForceSpill(enableForceSpill);
@@ -3928,6 +3933,7 @@ public class SessionVariable implements Serializable, 
Writable {
         
tResult.setRevocableMemoryHighWatermarkPercent(spillRevocableMemoryHighWatermarkPercent);
 
         tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
+        tResult.setLowMemoryModeBufferLimit(lowMemoryModeBufferLimit);
 
         tResult.setEnableLocalMergeSort(enableLocalMergeSort);
         tResult.setEnableParallelResultSink(enableParallelResultSink);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index f99a88e55b6..52388284a73 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -371,10 +371,12 @@ struct TQueryOptions {
   145: optional bool enable_spill = false
   146: optional bool enable_reserve_memory = true
   147: optional i32 revocable_memory_high_watermark_percent = -1
+
   148: optional i64 spill_sort_mem_limit = 134217728
   149: optional i64 spill_sort_batch_bytes = 8388608
   150: optional i32 spill_aggregation_partition_count = 32
   151: optional i32 spill_hash_join_partition_count = 32
+  152: optional i64 low_memory_mode_buffer_limit = 33554432
 
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to