This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 2d099802744 [fix] std::sort coredump and low mem mode buffer limit (#46006) 2d099802744 is described below commit 2d09980274434df222a25a7243302502e075776d Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Thu Dec 26 14:54:10 2024 +0800 [fix] std::sort coredump and low mem mode buffer limit (#46006) --- be/src/pipeline/dependency.h | 10 +++--- be/src/pipeline/exec/multi_cast_data_streamer.h | 6 +--- .../local_exchange_sink_operator.cpp | 2 +- be/src/runtime/runtime_state.h | 8 +++++ be/src/vec/spill/spill_stream_manager.cpp | 40 +++++----------------- .../java/org/apache/doris/qe/SessionVariable.java | 10 ++++-- gensrc/thrift/PaloInternalService.thrift | 2 ++ 7 files changed, 35 insertions(+), 43 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 13f983db3dd..a023e5661e2 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -840,8 +840,9 @@ public: } } - virtual void set_low_memory_mode() { - _buffer_mem_limit = std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024); + virtual void set_low_memory_mode(RuntimeState* state) { + _buffer_mem_limit = std::min<int64_t>(config::local_exchange_buffer_mem_limit, + state->low_memory_mode_buffer_limit()); } }; @@ -888,8 +889,9 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState { source_deps[channel_id]->set_ready(); } - void set_low_memory_mode() override { - _buffer_mem_limit = std::min<int64_t>(config::local_exchange_buffer_mem_limit, 512 * 1024); + void set_low_memory_mode(RuntimeState* state) override { + _buffer_mem_limit = std::min<int64_t>(config::local_exchange_buffer_mem_limit, + state->low_memory_mode_buffer_limit()); DCHECK(!_queues_mem_usage.empty()); _each_queue_limit = std::max<int64_t>(64 * 1024, _buffer_mem_limit / _queues_mem_usage.size()); diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h index 0b925f7a5fe..4669a0389c7 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.h +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -81,11 +81,7 @@ public: _process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT); }; - ~MultiCastDataStreamer() { - for (auto& item : _spill_readers) { - DCHECK(item.empty()); - } - } + ~MultiCastDataStreamer() = default; Status pull(RuntimeState* state, int sender_idx, vectorized::Block* block, bool* eos); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 968812e594e..7f9228e246d 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -146,7 +146,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (state->get_query_ctx()->low_memory_mode()) { - local_state._shared_state->set_low_memory_mode(); + local_state._shared_state->set_low_memory_mode(state); local_state._exchanger->set_low_memory_mode(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 2220c1fc41e..a47c5d6f202 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -26,6 +26,7 @@ #include <stdint.h> #include <atomic> +#include <cstdint> #include <fstream> #include <functional> #include <memory> @@ -599,6 +600,13 @@ public: return 32; } + int64_t low_memory_mode_buffer_limit() const { + if (_query_options.__isset.low_memory_mode_buffer_limit) { + return std::max(_query_options.low_memory_mode_buffer_limit, (int64_t)1); + } + return 32L * 1024 * 1024; + } + int spill_revocable_memory_high_watermark_percent() const { if (_query_options.__isset.revocable_memory_high_watermark_percent) { return _query_options.revocable_memory_high_watermark_percent; diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index b323e1ab2e8..07a947b5ef3 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -108,45 +108,23 @@ Status SpillStreamManager::_init_spill_store_map() { std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill( TStorageMedium::type storage_medium) { - std::vector<SpillDataDir*> stores; + std::vector<std::pair<SpillDataDir*, double>> stores_with_usage; for (auto& [_, store] : _spill_store_map) { if (store->storage_medium() == storage_medium && !store->reach_capacity_limit(0)) { - stores.push_back(store.get()); + stores_with_usage.emplace_back(store.get(), store->_get_disk_usage(0)); } } - if (stores.empty()) { - return stores; + if (stores_with_usage.empty()) { + return {}; } - std::sort(stores.begin(), stores.end(), [](SpillDataDir* a, SpillDataDir* b) { - return a->_get_disk_usage(0) < b->_get_disk_usage(0); - }); + std::sort(stores_with_usage.begin(), stores_with_usage.end(), + [](auto&& a, auto&& b) { return a.second < b.second; }); - size_t seventy_percent_index = stores.size(); - size_t eighty_five_percent_index = stores.size(); - for (size_t index = 0; index < stores.size(); index++) { - // If the usage of the store is less than 70%, we choose disk randomly. - if (stores[index]->_get_disk_usage(0) > 0.7 && seventy_percent_index == stores.size()) { - seventy_percent_index = index; - } - if (stores[index]->_get_disk_usage(0) > 0.85 && - eighty_five_percent_index == stores.size()) { - eighty_five_percent_index = index; - break; - } - } - - std::random_device rd; - std::mt19937 g(rd()); - std::shuffle(stores.begin(), stores.begin() + seventy_percent_index, g); - if (seventy_percent_index != stores.size()) { - std::shuffle(stores.begin() + seventy_percent_index, - stores.begin() + eighty_five_percent_index, g); - } - if (eighty_five_percent_index != stores.size()) { - std::shuffle(stores.begin() + eighty_five_percent_index, stores.end(), g); + std::vector<SpillDataDir*> stores; + for (const auto& [store, _] : stores_with_usage) { + stores.emplace_back(store); } - return stores; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index bab6a34528e..7ce2c922caf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -568,8 +568,10 @@ public class SessionVariable implements Serializable, Writable { public static final String SPILL_AGGREGATION_PARTITION_COUNT = "spill_aggregation_partition_count"; public static final String SPILL_STREAMING_AGG_MEM_LIMIT = "spill_streaming_agg_mem_limit"; public static final String SPILL_HASH_JOIN_PARTITION_COUNT = "spill_hash_join_partition_count"; - public static final String SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT = "spill_revocable_memory_high_watermark_percent"; + public static final String SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT = + "spill_revocable_memory_high_watermark_percent"; public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks"; + public static final String LOW_MEMORY_MODE_BUFFER_LIMIT = "low_memory_mode_buffer_limit"; public static final String GENERATE_STATS_FACTOR = "generate_stats_factor"; @@ -2249,6 +2251,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = SPILL_AGGREGATION_PARTITION_COUNT, fuzzy = true) public int spillAggregationPartitionCount = 32; + @VariableMgr.VarAttr(name = LOW_MEMORY_MODE_BUFFER_LIMIT, fuzzy = false) + public long lowMemoryModeBufferLimit = 33554432; + // The memory limit of streaming agg when spilling is enabled // NOTE: streaming agg operator will not spill to disk. @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT, fuzzy = true) @@ -3914,7 +3919,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner); tResult.setSkipBadTablet(skipBadTablet); tResult.setDisableFileCache(disableFileCache); - + // for spill tResult.setEnableSpill(enableSpill); tResult.setEnableForceSpill(enableForceSpill); @@ -3928,6 +3933,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setRevocableMemoryHighWatermarkPercent(spillRevocableMemoryHighWatermarkPercent); tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); + tResult.setLowMemoryModeBufferLimit(lowMemoryModeBufferLimit); tResult.setEnableLocalMergeSort(enableLocalMergeSort); tResult.setEnableParallelResultSink(enableParallelResultSink); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index f99a88e55b6..52388284a73 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -371,10 +371,12 @@ struct TQueryOptions { 145: optional bool enable_spill = false 146: optional bool enable_reserve_memory = true 147: optional i32 revocable_memory_high_watermark_percent = -1 + 148: optional i64 spill_sort_mem_limit = 134217728 149: optional i64 spill_sort_batch_bytes = 8388608 150: optional i32 spill_aggregation_partition_count = 32 151: optional i32 spill_hash_join_partition_count = 32 + 152: optional i64 low_memory_mode_buffer_limit = 33554432 // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org