github-actions[bot] commented on code in PR #61495:
URL: https://github.com/apache/doris/pull/61495#discussion_r2959361692


##########
be/src/exec/operator/bucketed_aggregation_sink_operator.h:
##########
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "exec/operator/operator.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_profile.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+
+class BucketedAggSinkOperatorX;
+
+/// Sink-side local state for bucketed hash aggregation.
+/// Each pipeline instance builds 256 per-bucket hash tables (two-level hash 
table).
+/// No locking: each instance writes to per_instance_data[_instance_idx].
+class BucketedAggSinkLocalState : public 
PipelineXSinkLocalState<BucketedAggSharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(BucketedAggSinkLocalState);
+    using Base = PipelineXSinkLocalState<BucketedAggSharedState>;
+    BucketedAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state);
+    ~BucketedAggSinkLocalState() override = default;
+
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
+    Status close(RuntimeState* state, Status exec_status) override;
+
+private:
+    friend class BucketedAggSinkOperatorX;
+
+    Status _execute_with_serialized_key(Block* block);
+    void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& 
key_columns,
+                                  uint32_t num_rows);
+    Status _create_agg_status(AggregateDataPtr data);
+    Status _destroy_agg_status(AggregateDataPtr data);
+    Status _init_hash_method(const VExprContextSPtrs& probe_exprs);
+    size_t _get_hash_table_size() const;
+    void _update_memusage();
+
+    int _instance_idx = 0;
+    /// Pointers into shared_state->per_instance_data[_instance_idx].
+    std::vector<BucketedAggDataVariants*> _bucket_agg_data; // [256]
+    Arena* _arena = nullptr;
+
+    /// Per-instance clones of aggregate evaluators. Required because
+    /// AggFnEvaluator::_calc_argument_columns() mutates internal state
+    /// (_agg_columns), which causes data races when multiple sink instances
+    /// share the same evaluator and call execute_batch_add() concurrently.
+    std::vector<AggFnEvaluator*> _aggregate_evaluators;
+
+    PODArray<AggregateDataPtr> _places;
+
+    /// Pre-grouped row indices by bucket, reused across blocks.
+    /// _bucket_row_indices[b] holds row indices that map to bucket b.
+    DorisVector<uint32_t> _bucket_row_indices[BUCKETED_AGG_NUM_BUCKETS];
+
+    RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
+    RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
+    RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
+    RuntimeProfile::Counter* _build_timer = nullptr;
+    RuntimeProfile::Counter* _expr_timer = nullptr;
+    RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
+    RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
+    RuntimeProfile::Counter* _memory_usage_arena = nullptr;
+};
+
+/// Bucketed hash aggregation sink operator.
+/// Fuses local + global aggregation for single-BE deployments.
+/// Each pipeline instance builds 256 per-bucket hash tables from raw input.
+/// The source operator then merges across instances per-bucket (second-phase 
agg).
+class BucketedAggSinkOperatorX final : public 
DataSinkOperatorX<BucketedAggSinkLocalState> {
+public:
+    BucketedAggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, 
const TPlanNode& tnode,
+                             const DescriptorTbl& descs);
+    ~BucketedAggSinkOperatorX() override = default;
+
+    Status init(const TDataSink& tsink) override {
+        return Status::InternalError("{} should not init with TDataSink",
+                                     
DataSinkOperatorX<BucketedAggSinkLocalState>::_name);
+    }
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+    Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+
+    // No local exchange needed — each instance builds its own hash tables 
independently.
+    DataDistribution required_data_distribution(RuntimeState* state) const 
override {
+        return DataDistribution(ExchangeType::NOOP);
+    }
+
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) override { 
return 0; }

Review Comment:
   **Significant: Memory reservation completely bypassed**
   
   `get_reserve_mem_size()` returns 0, which disables the pipeline memory 
reservation protocol for this operator. For comparison, the existing 
`AggSinkOperatorX::get_reserve_mem_size()` returns 
`hash_table->estimate_memory(batch_size) + _memory_usage_last_executing`.
   
   With 256 hash tables per instance × N pipeline instances, hash table resizes 
can cause massive uncontrolled memory growth with no back-pressure mechanism. 
There is also:
   - No `SCOPED_PEAK_MEM` instrumentation (unlike the regular agg operator)
   - No spill path as a fallback
   - No `_memory_sufficient_dependency` wiring (per `be/src/exec/AGENTS.md` 
requirements)
   
   Even if spill support is deferred, the reservation protocol should still 
provide accurate estimates so the scheduler can apply back-pressure before OOM.



##########
be/src/exec/operator/bucketed_aggregation_sink_operator.cpp:
##########
@@ -0,0 +1,477 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/operator/bucketed_aggregation_sink_operator.h"
+
+#include <memory>
+#include <string>
+
+#include "common/status.h"
+#include "exec/common/hash_table/hash.h"
+#include "exec/operator/operator.h"
+#include "exprs/vectorized_agg_fn.h"
+#include "runtime/runtime_profile.h"
+#include "runtime/thread_context.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+
+BucketedAggSinkLocalState::BucketedAggSinkLocalState(DataSinkOperatorXBase* 
parent,
+                                                     RuntimeState* state)
+        : Base(parent, state) {}
+
+Status BucketedAggSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_init_timer);
+
+    _instance_idx = info.task_idx;
+
+    // Sink dependencies start as ready=false by default. We must explicitly
+    // set them to ready so the pipeline task can execute (call sink()).
+    // This follows the same pattern as HashJoinBuildSinkLocalState::init().
+    _dependency->set_ready();
+
+    _hash_table_size_counter = ADD_COUNTER(custom_profile(), "HashTableSize", 
TUnit::UNIT);
+    _hash_table_memory_usage =
+            ADD_COUNTER_WITH_LEVEL(Base::custom_profile(), 
"MemoryUsageHashTable", TUnit::BYTES, 1);
+
+    _build_timer = ADD_TIMER(Base::custom_profile(), "BuildTime");
+    _expr_timer = ADD_TIMER(Base::custom_profile(), "ExprTime");
+    _hash_table_compute_timer = ADD_TIMER(Base::custom_profile(), 
"HashTableComputeTime");
+    _hash_table_emplace_timer = ADD_TIMER(Base::custom_profile(), 
"HashTableEmplaceTime");
+    _hash_table_input_counter =
+            ADD_COUNTER(Base::custom_profile(), "HashTableInputCount", 
TUnit::UNIT);
+    _memory_usage_arena = ADD_COUNTER(custom_profile(), "MemoryUsageArena", 
TUnit::BYTES);
+
+    return Status::OK();
+}
+
+Status BucketedAggSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+
+    auto& p = Base::_parent->template cast<BucketedAggSinkOperatorX>();
+    auto& shared_state = *Base::_shared_state;
+
+    // Initialize per-instance data and shared metadata. Multiple sink 
instances call open()
+    // concurrently, so all shared-state writes must be inside call_once to 
avoid data races.
+    Status init_status;
+    shared_state.init_instances(state->task_num(), [&]() {
+        // Copy metadata to shared state (once, from the first instance to 
reach here).
+        shared_state.align_aggregate_states = p._align_aggregate_states;
+        shared_state.total_size_of_aggregate_states = 
p._total_size_of_aggregate_states;
+        shared_state.offsets_of_aggregate_states = 
p._offsets_of_aggregate_states;
+        shared_state.make_nullable_keys = p._make_nullable_keys;
+
+        shared_state.probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
+        for (size_t i = 0; i < shared_state.probe_expr_ctxs.size(); i++) {
+            auto st = p._probe_expr_ctxs[i]->clone(state, 
shared_state.probe_expr_ctxs[i]);
+            if (!st) {
+                init_status = st;
+                return;
+            }
+        }
+
+        for (auto& evaluator : p._aggregate_evaluators) {
+            
shared_state.aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+        }
+
+        // Detect simple_count: exactly one COUNT(*) with no args, with GROUP 
BY present.
+        // Bucketed agg always has GROUP BY (without-key not supported).
+        if (p._aggregate_evaluators.size() == 1 &&
+            p._aggregate_evaluators[0]->function()->get_name() == "count" &&
+            
p._aggregate_evaluators[0]->function()->get_argument_types().empty()) {
+            shared_state.use_simple_count = true;
+        }
+    });
+    RETURN_IF_ERROR(init_status);
+
+    // Now safe to access per_instance_data since init_instances has been 
called.
+    auto& inst = shared_state.per_instance_data[_instance_idx];
+    _arena = inst.arena.get();
+
+    // Clone aggregate evaluators for this sink instance. Each instance needs
+    // its own evaluators because AggFnEvaluator::_calc_argument_columns()
+    // mutates internal state (_agg_columns), causing data races when multiple
+    // sink instances call execute_batch_add() concurrently on shared 
evaluators.
+    for (auto& evaluator : p._aggregate_evaluators) {
+        _aggregate_evaluators.push_back(evaluator->clone(state, p._pool));
+    }
+
+    // Set up _bucket_agg_data as 256 pointers into this instance's bucket 
hash tables.
+    _bucket_agg_data.resize(BUCKETED_AGG_NUM_BUCKETS);
+    for (int b = 0; b < BUCKETED_AGG_NUM_BUCKETS; ++b) {
+        _bucket_agg_data[b] = inst.bucket_agg_data[b].get();
+    }
+
+    // Initialize hash method for all 256 bucket hash tables. Each bucket uses
+    // the same hash key type. We read shared_state.probe_expr_ctxs here, which
+    // is safe because init_instances has completed (call_once guarantees 
visibility).
+    RETURN_IF_ERROR(_init_hash_method(shared_state.probe_expr_ctxs));
+
+    return Status::OK();
+}
+
+Status BucketedAggSinkLocalState::_create_agg_status(AggregateDataPtr data) {
+    auto& shared_state = *Base::_shared_state;
+    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+        try {
+            _aggregate_evaluators[i]->create(data + 
shared_state.offsets_of_aggregate_states[i]);
+        } catch (...) {
+            for (int j = 0; j < i; ++j) {
+                _aggregate_evaluators[j]->destroy(data +
+                                                  
shared_state.offsets_of_aggregate_states[j]);
+            }
+            throw;
+        }
+    }
+    return Status::OK();
+}
+
+Status BucketedAggSinkLocalState::_destroy_agg_status(AggregateDataPtr data) {
+    auto& shared_state = *Base::_shared_state;
+    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+        _aggregate_evaluators[i]->function()->destroy(data +
+                                                      
shared_state.offsets_of_aggregate_states[i]);
+    }
+    return Status::OK();
+}
+
+Status BucketedAggSinkLocalState::_execute_with_serialized_key(Block* block) {
+    SCOPED_TIMER(_build_timer);
+    DCHECK(!Base::_shared_state->probe_expr_ctxs.empty());
+
+    auto& shared_state = *Base::_shared_state;
+    auto& p = Base::_parent->template cast<BucketedAggSinkOperatorX>();
+    size_t key_size = shared_state.probe_expr_ctxs.size();
+    ColumnRawPtrs key_columns(key_size);
+
+    {
+        SCOPED_TIMER(_expr_timer);
+        for (size_t i = 0; i < key_size; ++i) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(shared_state.probe_expr_ctxs[i]->execute(block, 
&result_column_id));

Review Comment:
   **Critical: Data race on shared `probe_expr_ctxs`**
   
   `shared_state.probe_expr_ctxs[i]->execute()` is called by ALL sink instances 
concurrently on the SAME shared `VExprContext` objects. 
`VExprContext::execute()` mutates `_last_result_column_id` (a non-atomic `int`) 
and potentially `FunctionContext` internal state.
   
   The `probe_expr_ctxs` were cloned once in `call_once` (lines 88-94) into 
shared state, then used concurrently by all sink instances without per-instance 
cloning.
   
   Note that `_aggregate_evaluators` are already correctly cloned per-instance 
(lines 113-115) with a comment explaining the same class of bug. The same 
treatment is needed here.
   
   **Fix**: Each sink instance should clone its own `probe_expr_ctxs` in 
`open()` (similar to lines 113-115), and `_execute_with_serialized_key` should 
use the per-instance clones instead of `shared_state.probe_expr_ctxs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to