HappenLee commented on code in PR #50109: URL: https://github.com/apache/doris/pull/50109#discussion_r2050543018
########## be/src/vec/aggregate_functions/aggregate_function_window_funnel.h: ########## @@ -85,71 +111,54 @@ struct WindowFunnelState { int64_t window; bool enable_mode; WindowFunnelMode window_funnel_mode; - mutable vectorized::MutableBlock mutable_block; - ColumnVector<NativeType>::Container* timestamp_column_data = nullptr; - std::vector<ColumnVector<UInt8>::Container*> event_columns_datas; - SortDescription sort_description {1}; + DataValue events_list; WindowFunnelState() { event_count = 0; window = 0; window_funnel_mode = WindowFunnelMode::INVALID; - - sort_description[0].column_number = 0; - sort_description[0].direction = 1; - sort_description[0].nulls_direction = -1; } WindowFunnelState(int arg_event_count) : WindowFunnelState() { event_count = arg_event_count; - event_columns_datas.resize(event_count); - auto timestamp_column = ColumnVector<NativeType>::create(); - - MutableColumns event_columns; - for (int i = 0; i < event_count; i++) { - event_columns.emplace_back(ColumnVector<UInt8>::create()); - } - Block tmp_block; - tmp_block.insert({std::move(timestamp_column), - DataTypeFactory::instance().create_data_type(TYPE_INDEX), "timestamp"}); - for (int i = 0; i < event_count; i++) { - tmp_block.insert({std::move(event_columns[i]), - DataTypeFactory::instance().create_data_type(TypeIndex::UInt8), - "event_" + std::to_string(i)}); - } - - mutable_block = MutableBlock(std::move(tmp_block)); - _reset_columns_ptr(); + events_list.event_columns_data.resize(event_count); } - void _reset_columns_ptr() { - auto& ts_column = mutable_block.get_column_by_position(0); - timestamp_column_data = &assert_cast<ColumnVector<NativeType>&>(*ts_column).get_data(); - for (int i = 0; i != event_count; i++) { - auto& event_column = mutable_block.get_column_by_position(i + 1); - event_columns_datas[i] = &assert_cast<ColumnVector<UInt8>&>(*event_column).get_data(); - } - } - void reset() { mutable_block.clear_column_data(); } + void reset() { events_list.clear(); } void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, WindowFunnelMode mode) { window = win; window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT; - - timestamp_column_data->push_back( + events_list.dt.emplace_back( assert_cast<const ColumnVector<NativeType>&>(*arg_columns[2]).get_data()[row_num]); for (int i = 0; i < event_count; i++) { - event_columns_datas[i]->push_back( + events_list.event_columns_data[i].emplace_back( assert_cast<const ColumnVector<UInt8>&>(*arg_columns[3 + i]) .get_data()[row_num]); } } void sort() { - Block tmp_block = mutable_block.to_block(); - auto block = tmp_block.clone_without_columns(); - sort_block(tmp_block, block, sort_description, 0); - mutable_block = std::move(block); - _reset_columns_ptr(); + std::vector<size_t> indices(events_list.size()); + std::iota(indices.begin(), indices.end(), 0); + std::sort(indices.begin(), indices.end(), + [this](size_t i1, size_t i2) { return events_list.dt[i1] < events_list.dt[i2]; }); + + auto reorder = [&indices](auto& vec) { + std::vector<typename std::decay_t<decltype(vec)>::value_type> temp; + temp.reserve(indices.size()); + for (auto idx : indices) { Review Comment: std::decay_t<deltype(vec)> temp; temp.resize() temp[i] = vec[indices[i]]; std::swap(vec, temp); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org