This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4c215b1721e [opt](Arena)Release Arena memory earlier in pipeline
operators. (#59045)
4c215b1721e is described below
commit 4c215b1721eaf9f7294b891bd775a9ca7eb132ad
Author: Mryange <[email protected]>
AuthorDate: Wed Jan 7 17:07:47 2026 +0800
[opt](Arena)Release Arena memory earlier in pipeline operators. (#59045)
### What problem does this PR solve?
Previously we put the Arena into the local state, which is only released
when the fragment is freed.
That was done because the shared state might use memory from the Arena.
```C++
RETURN_IF_ERROR(
Base::_shared_state->aggregate_evaluators[i]->execute_batch_add_selected(
block,
Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
_places.data(), _agg_arena_pool));
```
However, this is actually wrong — we should place the Arena directly
into the shared state so the Arena is released at task granularity
instead of fragment granularity.
```C++
Status PipelineTask::finalize() {
auto fragment = _fragment_context.lock();
if (!fragment) {
return Status::OK();
}
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker());
RETURN_IF_ERROR(_state_transition(State::FINALIZED));
std::unique_lock<std::mutex> lc(_dependency_lock);
_sink_shared_state.reset();
_op_shared_states.clear();
_shared_state_map.clear();
_block.reset();
_operators.clear();
_sink.reset();
_pipeline.reset();
return Status::OK();
}
```
For operators that don't require shared state, release it in close().
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [x] No need to test or manual test. Explain why:
- [x] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/pipeline/dependency.h | 6 +++
be/src/pipeline/exec/aggregation_sink_operator.cpp | 46 +++++++++++-----------
be/src/pipeline/exec/aggregation_sink_operator.h | 2 -
.../pipeline/exec/aggregation_source_operator.cpp | 13 +++---
be/src/pipeline/exec/aggregation_source_operator.h | 1 -
be/src/pipeline/exec/analytic_sink_operator.cpp | 9 +++--
be/src/pipeline/exec/analytic_sink_operator.h | 1 -
.../distinct_streaming_aggregation_operator.cpp | 4 +-
.../exec/distinct_streaming_aggregation_operator.h | 2 -
be/src/pipeline/exec/set_sink_operator.cpp | 2 +-
be/src/pipeline/exec/set_sink_operator.h | 1 -
.../exec/streaming_aggregation_operator.cpp | 2 +-
.../pipeline/exec/streaming_aggregation_operator.h | 1 -
13 files changed, 46 insertions(+), 44 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 01f37cf133b..1c0e46a2fb7 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -382,6 +382,9 @@ public:
// Refresh the top limit heap with a new row
void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs&
key_columns);
+ vectorized::Arena agg_arena_pool;
+ vectorized::Arena agg_profile_arena;
+
private:
vectorized::MutableColumns _get_keys_hash_table();
@@ -577,6 +580,7 @@ public:
std::mutex buffer_mutex;
bool sink_eos = false;
std::mutex sink_eos_lock;
+ vectorized::Arena agg_arena_pool;
};
struct JoinSharedState : public BasicSharedState {
@@ -699,6 +703,8 @@ public:
std::atomic<bool> ready_for_read = false;
+ vectorized::Arena arena;
+
/// called in setup_local_state
Status hash_table_init();
};
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index a065f53916a..ddb4bbfbe6f 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -102,8 +102,8 @@ Status AggSinkLocalState::open(RuntimeState* state) {
}
if (Base::_shared_state->probe_expr_ctxs.empty()) {
- _agg_data->without_key =
-
reinterpret_cast<vectorized::AggregateDataPtr>(_agg_profile_arena.aligned_alloc(
+ _agg_data->without_key =
reinterpret_cast<vectorized::AggregateDataPtr>(
+ Base::_shared_state->agg_profile_arena.aligned_alloc(
p._total_size_of_aggregate_states,
p._align_aggregate_states));
if (p._is_merge) {
@@ -187,7 +187,7 @@ Status
AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
block,
_agg_data->without_key + Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _agg_arena_pool));
+ Base::_shared_state->agg_arena_pool));
}
return Status::OK();
}
@@ -207,7 +207,7 @@ size_t AggSinkLocalState::_memory_usage() const {
return 0;
}
size_t usage = 0;
- usage += _agg_arena_pool.size();
+ usage += Base::_shared_state->agg_arena_pool.size();
if (Base::_shared_state->aggregate_data_container) {
usage += Base::_shared_state->aggregate_data_container->memory_usage();
@@ -240,7 +240,7 @@ void
AggSinkLocalState::_update_memusage_with_serialized_key() {
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
- int64_t memory_usage_arena = _agg_arena_pool.size();
+ int64_t memory_usage_arena =
Base::_shared_state->agg_arena_pool.size();
int64_t memory_usage_container =
_shared_state->aggregate_data_container->memory_usage();
int64_t hash_table_memory_usage =
data.get_buffer_size_in_bytes();
@@ -321,8 +321,8 @@ Status
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
_places.data(),
Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _deserialize_buffer.data(), column.get(),
_agg_arena_pool,
- rows);
+ _deserialize_buffer.data(), column.get(),
+ Base::_shared_state->agg_arena_pool, rows);
}
} else {
RETURN_IF_ERROR(
@@ -330,7 +330,7 @@ Status
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
block,
Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _places.data(), _agg_arena_pool));
+ _places.data(),
Base::_shared_state->agg_arena_pool));
}
}
} else {
@@ -375,15 +375,15 @@ Status
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
_places.data(),
Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _deserialize_buffer.data(),
column.get(), _agg_arena_pool,
- rows);
+ _deserialize_buffer.data(),
column.get(),
+ Base::_shared_state->agg_arena_pool,
rows);
}
} else {
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_batch_add(
block,
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _places.data(), _agg_arena_pool));
+ _places.data(),
Base::_shared_state->agg_arena_pool));
}
}
}
@@ -423,20 +423,20 @@ Status
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
_agg_data->without_key +
Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- *column, _agg_arena_pool);
+ *column, Base::_shared_state->agg_arena_pool);
} else {
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add(
block,
_agg_data->without_key + Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _agg_arena_pool));
+ Base::_shared_state->agg_arena_pool));
}
}
return Status::OK();
}
void AggSinkLocalState::_update_memusage_without_key() {
- int64_t arena_memory_usage = _agg_arena_pool.size();
+ int64_t arena_memory_usage = Base::_shared_state->agg_arena_pool.size();
COUNTER_SET(_memory_used_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}
@@ -487,7 +487,7 @@ Status
AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
block,
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _places.data(), _agg_arena_pool));
+ _places.data(),
Base::_shared_state->agg_arena_pool));
}
} else {
auto do_aggregate_evaluators = [&] {
@@ -496,7 +496,7 @@ Status
AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
block,
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _places.data(), _agg_arena_pool));
+ _places.data(), Base::_shared_state->agg_arena_pool));
}
return Status::OK();
};
@@ -550,8 +550,8 @@ void
AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
agg_method.init_serialized_keys(key_columns,
num_rows);
auto creator = [this](const auto& ctor, auto& key,
auto& origin) {
- HashMethodType::try_presis_key_and_origin(key,
origin,
-
_agg_arena_pool);
+ HashMethodType::try_presis_key_and_origin(
+ key, origin,
Base::_shared_state->agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(
origin);
@@ -563,7 +563,7 @@ void
AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
};
auto creator_for_null_key = [&](auto& mapped) {
- mapped = _agg_arena_pool.aligned_alloc(
+ mapped =
Base::_shared_state->agg_arena_pool.aligned_alloc(
Base::_parent->template
cast<AggSinkOperatorX>()
._total_size_of_aggregate_states,
Base::_parent->template
cast<AggSinkOperatorX>()
@@ -627,8 +627,8 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
auto creator = [&](const auto& ctor, auto& key,
auto& origin) {
try {
-
HashMethodType::try_presis_key_and_origin(key, origin,
-
_agg_arena_pool);
+ HashMethodType::try_presis_key_and_origin(
+ key, origin,
Base::_shared_state->agg_arena_pool);
_shared_state->refresh_top_limit(i,
key_columns);
auto mapped =
_shared_state->aggregate_data_container->append_data(
@@ -647,7 +647,7 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
};
auto creator_for_null_key = [&](auto& mapped) {
- mapped = _agg_arena_pool.aligned_alloc(
+ mapped =
Base::_shared_state->agg_arena_pool.aligned_alloc(
Base::_parent->template
cast<AggSinkOperatorX>()
._total_size_of_aggregate_states,
Base::_parent->template
cast<AggSinkOperatorX>()
@@ -909,7 +909,7 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState*
state) {
auto& ss = *local_state.Base::_shared_state;
RETURN_IF_ERROR(ss.reset_hash_table());
local_state._serialize_key_arena_memory_usage->set((int64_t)0);
- local_state._agg_arena_pool.clear(true);
+ local_state.Base::_shared_state->agg_arena_pool.clear(true);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 6a89d857df5..6946578109d 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -122,8 +122,6 @@ protected:
vectorized::Block _preagg_block = vectorized::Block();
AggregatedDataVariants* _agg_data = nullptr;
- vectorized::Arena _agg_arena_pool;
- vectorized::Arena _agg_profile_arena;
std::unique_ptr<ExecutorBase> _executor = nullptr;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 24e13d5c0ec..989a7cd478f 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -514,7 +514,8 @@ Status
AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
SCOPED_TIMER(_deserialize_data_timer);
Base::_shared_state->aggregate_evaluators[i]->function()->deserialize_and_merge_vec(
_places.data(),
_shared_state->offsets_of_aggregate_states[i],
- _deserialize_buffer.data(), column.get(), _agg_arena_pool,
rows);
+ _deserialize_buffer.data(), column.get(),
Base::_shared_state->agg_arena_pool,
+ rows);
}
}
@@ -558,7 +559,8 @@ void
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
agg_method.init_serialized_keys(key_columns, num_rows);
auto creator = [this](const auto& ctor, auto& key,
auto& origin) {
- HashMethodType::try_presis_key_and_origin(key,
origin, _agg_arena_pool);
+ HashMethodType::try_presis_key_and_origin(
+ key, origin,
Base::_shared_state->agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(
origin);
@@ -570,7 +572,7 @@ void
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
};
auto creator_for_null_key = [&](auto& mapped) {
- mapped = _agg_arena_pool.aligned_alloc(
+ mapped =
Base::_shared_state->agg_arena_pool.aligned_alloc(
_shared_state->total_size_of_aggregate_states,
_shared_state->align_aggregate_states);
auto st = _create_agg_status(mapped);
@@ -595,8 +597,9 @@ void
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
_memory_usage_container,
static_cast<int64_t>(
_shared_state->aggregate_data_container->memory_usage()));
- COUNTER_SET(_memory_usage_arena,
-
static_cast<int64_t>(_agg_arena_pool.size()));
+ COUNTER_SET(
+ _memory_usage_arena,
+
static_cast<int64_t>(Base::_shared_state->agg_arena_pool.size()));
}},
_shared_state->agg_data->method_variant);
}
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h
b/be/src/pipeline/exec/aggregation_source_operator.h
index d2cff32246a..1bf4edabf5d 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -68,7 +68,6 @@ protected:
vectorized::ColumnRawPtrs& key_columns,
uint32_t num_rows);
vectorized::PODArray<vectorized::AggregateDataPtr> _places;
- vectorized::Arena _agg_arena_pool;
std::vector<char> _deserialize_buffer;
RuntimeProfile::Counter* _get_results_timer = nullptr;
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 7eff2c62cb6..a2f30a30337 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -168,8 +168,8 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) {
_range_between_expr_ctxs[i]->root()->data_type()->create_column();
}
- _fn_place_ptr =
_agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states,
- p._align_aggregate_states);
+ _fn_place_ptr =
_shared_state->agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states,
+
p._align_aggregate_states);
_create_agg_status();
return Status::OK();
}
@@ -388,13 +388,14 @@ void
AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, int6
_agg_functions[i]->function()->execute_function_with_incremental(
partition_start, partition_end, frame_start, frame_end,
_fn_place_ptr + _offsets_of_aggregate_states[i],
agg_columns.data(),
- _agg_arena_pool, false, false, false, &_use_null_result[i],
+ _shared_state->agg_arena_pool, false, false, false,
&_use_null_result[i],
&_could_use_previous_result[i]);
} else {
_agg_functions[i]->function()->add_range_single_place(
partition_start, partition_end, frame_start, frame_end,
_fn_place_ptr + _offsets_of_aggregate_states[i],
agg_columns.data(),
- _agg_arena_pool, &(_use_null_result[i]),
&_could_use_previous_result[i]);
+ _shared_state->agg_arena_pool, &(_use_null_result[i]),
+ &_could_use_previous_result[i]);
}
}
}
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index a35fd91cf05..4815102f6ff 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -133,7 +133,6 @@ private:
size_t _agg_functions_size = 0;
bool _agg_functions_created = false;
vectorized::AggregateDataPtr _fn_place_ptr = nullptr;
- vectorized::Arena _agg_arena_pool;
std::vector<vectorized::AggFnEvaluator*> _agg_functions;
std::vector<size_t> _offsets_of_aggregate_states;
std::vector<bool> _result_column_nullable_flags;
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 1331a9e38ae..6363355db73 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -60,9 +60,7 @@
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
OperatorXBase*
parent)
: PipelineXLocalState<FakeSharedState>(state, parent),
batch_size(state->batch_size()),
- _agg_arena_pool(std::make_unique<vectorized::Arena>()),
_agg_data(std::make_unique<DistinctDataVariants>()),
- _agg_profile_arena(std::make_unique<vectorized::Arena>()),
_child_block(vectorized::Block::create_unique()),
_aggregated_block(vectorized::Block::create_unique()) {}
@@ -459,6 +457,8 @@ Status DistinctStreamingAggLocalState::close(RuntimeState*
state) {
}
}
_cache_block.clear();
+
+ _arena.clear();
return Base::close(state);
}
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index edb4ecbe063..b0baf64ff96 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -72,11 +72,9 @@ private:
bool _should_expand_hash_table = true;
bool _stop_emplace_flag = false;
const int batch_size;
- std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
std::unique_ptr<DistinctDataVariants> _agg_data = nullptr;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
- std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
std::unique_ptr<vectorized::Block> _child_block = nullptr;
bool _child_eos = false;
bool _reach_limit = false;
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index d1580b4f3ac..ae2024f831d 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -128,7 +128,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
vectorized::HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(&local_state,
uint32_t(rows), raw_ptrs, state);
- st = hash_table_build_process(arg, local_state._arena);
+ st = hash_table_build_process(arg,
local_state._shared_state->arena);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index df0e084ac5a..1d70c8681a3 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -57,7 +57,6 @@ private:
vectorized::MutableBlock _mutable_block;
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;
- vectorized::Arena _arena;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _build_timer = nullptr;
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 77673a4a3d3..5959ef2cf03 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -81,7 +81,6 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent),
_agg_data(std::make_unique<AggregatedDataVariants>()),
- _agg_profile_arena(std::make_unique<vectorized::Arena>()),
_child_block(vectorized::Block::create_unique()),
_pre_aggregated_block(vectorized::Block::create_unique()) {}
@@ -1008,6 +1007,7 @@ Status StreamingAggLocalState::close(RuntimeState* state)
{
_agg_data->method_variant);
}
_close_with_serialized_key();
+ _agg_arena_pool.clear(true);
return Base::close(state);
}
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 08e28ab5e9f..71396f351e3 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -103,7 +103,6 @@ private:
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
- std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
std::unique_ptr<AggregateDataContainer> _aggregate_data_container =
nullptr;
bool _reach_limit = false;
size_t _input_num_rows = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]