jacktengg commented on code in PR #38954:
URL: https://github.com/apache/doris/pull/38954#discussion_r1708992797


##########
be/src/vec/aggregate_functions/aggregate_function_window_funnel.h:
##########
@@ -72,138 +80,233 @@ WindowFunnelMode string_to_window_funnel_mode(const 
String& string) {
     }
 }
 
-template <typename DateValueType, typename NativeType>
+template <TypeIndex TYPE_INDEX, typename NativeType>
 struct WindowFunnelState {
-    std::vector<std::pair<DateValueType, int>> events;
-    int max_event_level;
-    bool sorted;
+    using DateValueType = std::conditional_t<TYPE_INDEX == 
TypeIndex::DateTimeV2,
+                                             DateV2Value<DateTimeV2ValueType>, 
VecDateTimeValue>;
+    int event_count = 0;
     int64_t window;
-    WindowFunnelMode window_funnel_mode;
     bool enable_mode;
+    WindowFunnelMode window_funnel_mode;
+    mutable MutableColumnPtr timestamp_column;
+    mutable MutableColumns event_columns;
+    Block block;
+    SortDescription sort_description {1};
+    bool sorted;
+    bool is_merge;
 
     WindowFunnelState() {
-        sorted = true;
-        max_event_level = 0;
+        event_count = 0;
         window = 0;
         window_funnel_mode = WindowFunnelMode::INVALID;
+
+        sort_description[0].column_number = 0;
+        sort_description[0].direction = 1;
+        sort_description[0].nulls_direction = -1;
+        sorted = false;
+        is_merge = false;
+    }
+    WindowFunnelState(int arg_event_count) : WindowFunnelState() {
+        timestamp_column = ColumnVector<NativeType>::create();
+        event_count = arg_event_count;
+        event_columns.resize(event_count);
+        for (int i = 0; i < event_count; i++) {
+            event_columns[i] = ColumnVector<UInt8>::create();
+        }
     }
 
     void reset() {
-        sorted = true;
-        max_event_level = 0;
         window = 0;
-        events.shrink_to_fit();
+        timestamp_column->clear();
+        for (auto& column : event_columns) {
+            column->clear();
+        }
+        block.clear_column_data();
+        sorted = false;
+        is_merge = false;
     }
 
-    void add(const DateValueType& timestamp, int event_idx, int event_num, 
int64_t win,
-             WindowFunnelMode mode) {
+    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, 
WindowFunnelMode mode) {
         window = win;
-        max_event_level = event_num;
         window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
 
-        if (sorted && events.size() > 0) {
-            if (events.back().first == timestamp) {
-                sorted = events.back().second <= event_idx;
-            } else {
-                sorted = events.back().first < timestamp;
-            }
+        timestamp_column->insert_from(*arg_columns[2], row_num);
+        for (int i = 0; i < event_count; i++) {
+            event_columns[i]->insert_from(*(arg_columns[3 + i]), row_num);
         }
-        events.emplace_back(timestamp, event_idx);
     }
 
     void sort() {
         if (sorted) {
             return;
         }
-        std::stable_sort(events.begin(), events.end());
-    }
+        if (!is_merge) {
+            Block tmp_block;
+            tmp_block.insert({std::move(timestamp_column),
+                              
DataTypeFactory::instance().create_data_type(TYPE_INDEX),
+                              "timestamp"});
+            for (int i = 0; i < event_count; i++) {
+                tmp_block.insert({std::move(event_columns[i]),
+                                  
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
+                                  "event_" + std::to_string(i)});
+            }
 
-    int get() const {
-        if (max_event_level == 0) {
-            return 0;
+            block = tmp_block.clone_without_columns();
+            sort_block(tmp_block, block, sort_description, 0);
+        } else {
+            auto tmp_block = block.clone_without_columns();
+            sort_block(block, tmp_block, sort_description, 0);
+            block = std::move(tmp_block);
         }
-        std::vector<std::optional<std::pair<DateValueType, DateValueType>>> 
events_timestamp(
-                max_event_level);
-        bool is_first_set = false;
-        for (int64_t i = 0; i < events.size(); i++) {
-            const int& event_idx = events[i].second;
-            const DateValueType& timestamp = events[i].first;
-            if (event_idx == 0) {
-                events_timestamp[0] = {timestamp, timestamp};
-                is_first_set = true;
-                continue;
-            }
-            if (window_funnel_mode == WindowFunnelMode::DEDUPLICATION &&
-                events_timestamp[event_idx].has_value()) {
-                break;
-            }
-            if (events_timestamp[event_idx - 1].has_value()) {
-                const DateValueType& first_timestamp =
-                        events_timestamp[event_idx - 1].value().first;
-                DateValueType last_timestamp = first_timestamp;
-                TimeInterval interval(SECOND, window, false);
-                last_timestamp.template date_add_interval<SECOND>(interval);
-
-                if (window_funnel_mode != WindowFunnelMode::INCREASE) {
-                    if (timestamp <= last_timestamp) {
-                        events_timestamp[event_idx] = {first_timestamp, 
timestamp};
-                        if (event_idx + 1 == max_event_level) {
-                            // Usually, max event level is small.
-                            return max_event_level;
+        sorted = true;
+    }
+
+    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
+    int _match_event_list(size_t& start_row, size_t row_count,
+                          const NativeType* timestamp_data) const {
+        int matched_count = 0;
+        DateValueType start_timestamp;
+        DateValueType end_timestamp;
+        TimeInterval interval(SECOND, window, false);
+
+        int column_idx = 1;
+        const auto& first_event_column = block.get_by_position(column_idx);
+        const auto& first_event_data =
+                assert_cast<const 
ColumnVector<UInt8>&>(*first_event_column.column).get_data();
+        auto match_row = simd::find_one(first_event_data.data(), row_count, 
start_row);
+        start_row = match_row + 1;
+        if (match_row < row_count) {
+            auto prev_timestamp = binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+            end_timestamp = prev_timestamp;
+            end_timestamp.template date_add_interval<SECOND>(interval);
+
+            matched_count++;
+
+            column_idx++;
+            auto last_match_row = match_row;
+            for (; column_idx < event_count + 1; column_idx++) {
+                if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
+                    ++match_row;
+                    const auto& event_column = 
block.get_by_position(column_idx);
+                    const auto& event_data =
+                            assert_cast<const 
ColumnVector<UInt8>&>(*event_column.column)
+                                    .get_data();
+                    if (event_data[match_row] == 1) {
+                        auto current_timestamp =
+                                binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                        if (current_timestamp <= end_timestamp) {
+                            matched_count++;
+                            continue;
                         }
                     }
-                } else {
-                    if (timestamp <= last_timestamp &&
-                        events_timestamp[event_idx - 1].value().second < 
timestamp) {
-                        if (!events_timestamp[event_idx].has_value() ||
-                            events_timestamp[event_idx].value().second > 
timestamp) {
-                            events_timestamp[event_idx] = {first_timestamp, 
timestamp};
-                        }
-                        if (event_idx + 1 == max_event_level) {
-                            // Usually, max event level is small.
-                            return max_event_level;
+                    break;
+                }
+                const auto& event_column = block.get_by_position(column_idx);
+                const auto& event_data =
+                        assert_cast<const 
ColumnVector<UInt8>&>(*event_column.column).get_data();
+                match_row = simd::find_one(event_data.data(), row_count, 
match_row + 1);
+                if (match_row < row_count) {
+                    auto current_timestamp =
+                            binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                    bool is_matched = current_timestamp <= end_timestamp;
+                    if (is_matched) {
+                        if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::INCREASE) {
+                            is_matched = current_timestamp > prev_timestamp;
                         }
                     }
-                }
-            } else {
-                if (is_first_set && window_funnel_mode == 
WindowFunnelMode::FIXED) {
-                    for (size_t i = 0; i < events_timestamp.size(); i++) {
-                        if (!events_timestamp[i].has_value()) {
-                            return i;
+                    if (!is_matched) {
+                        break;
+                    }
+                    if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::INCREASE) {
+                        prev_timestamp =
+                                binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                    }
+                    if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::DEDUPLICATION) {
+                        bool is_dup = false;
+                        if (match_row != last_match_row + 1) {
+                            auto rows_to_match = match_row - last_match_row - 
1;
+                            for (int tmp_column_idx = 1; tmp_column_idx < 
column_idx;
+                                 tmp_column_idx++) {
+                                const auto& tmp_event_column =
+                                        block.get_by_position(tmp_column_idx);
+                                const auto& tmp_event_data =
+                                        assert_cast<const 
ColumnVector<UInt8>&>(
+                                                *tmp_event_column.column)
+                                                .get_data();
+                                auto dup_match_row = simd::find_one(
+                                        tmp_event_data.data(), rows_to_match, 
last_match_row + 1);
+                                if (dup_match_row >= rows_to_match) {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to