HappenLee commented on code in PR #33173:
URL: https://github.com/apache/doris/pull/33173#discussion_r1555950908


##########
be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp:
##########
@@ -245,17 +245,25 @@ Status 
DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
     _distinct_row.clear();
     _distinct_row.reserve(rows);
 
-    RETURN_IF_CATCH_EXCEPTION(
-            _emplace_into_hash_table_to_distinct(_distinct_row, key_columns, 
rows));
-    // need use _cur_num_rows_returned to decide whether to do continue 
emplace into hash table
-    _cur_num_rows_returned += _distinct_row.size();
+    if (!_stop_emplace_flag) {
+        RETURN_IF_CATCH_EXCEPTION(
+                _emplace_into_hash_table_to_distinct(_distinct_row, 
key_columns, rows));
+    }
 
     bool mem_reuse = 
_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
                      out_block->mem_reuse();
     if (mem_reuse) {
+        if (_stop_emplace_flag && !out_block->empty()) {
+            // when out_block row >= batch_size, push it to data_queue, so 
when _stop_emplace_flag = true, maybe have some data in block
+            // need output those data firstly
+            for (int i = 0; i < rows; ++i) {
+                _distinct_row.push_back(i);
+            }

Review Comment:
   resize + iota:: 0-> rows



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to