This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 1217886d675 branch-4.1: [Improvement](function) support window funnel 
v2 #61566 (#61935)
1217886d675 is described below

commit 1217886d67558683b14b37947fab7c3a11e2941a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Apr 1 13:36:21 2026 +0800

    branch-4.1: [Improvement](function) support window funnel v2 #61566 (#61935)
    
    Cherry-picked from #61566
    
    Co-authored-by: Pxl <[email protected]>
---
 .../aggregate_function_simple_factory.cpp          |    2 +
 .../aggregate/aggregate_function_window_funnel.cpp |    3 +-
 .../aggregate/aggregate_function_window_funnel.h   |    2 +-
 ...cpp => aggregate_function_window_funnel_v2.cpp} |   28 +-
 .../aggregate_function_window_funnel_v2.h          |  613 ++++++++++
 .../exprs/aggregate/vec_window_funnel_v2_test.cpp  | 1167 ++++++++++++++++++++
 .../doris/catalog/BuiltinAggregateFunctions.java   |    4 +-
 .../expressions/functions/agg/WindowFunnel.java    |    2 +-
 .../agg/{WindowFunnel.java => WindowFunnelV2.java} |   33 +-
 .../visitor/AggregateFunctionVisitor.java          |    5 +
 .../apache/doris/nereids/types/AggStateType.java   |    1 +
 .../analysis/CheckExpressionLegalityTest.java      |   27 +
 .../data/nereids_p0/aggregate/window_funnel.out    |    2 +-
 .../data/nereids_p0/aggregate/window_funnel_v2.out |   88 ++
 .../test_aggregate_window_functions.out            |   10 +-
 .../nereids_p0/aggregate/window_funnel_v2.groovy   |  492 +++++++++
 16 files changed, 2443 insertions(+), 36 deletions(-)

diff --git a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp 
b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
index c939a08dd08..d6d60250d0d 100644
--- a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
@@ -57,6 +57,7 @@ void 
register_aggregate_function_percentile(AggregateFunctionSimpleFactory& fact
 void 
register_aggregate_function_percentile_old(AggregateFunctionSimpleFactory& 
factory);
 void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& 
factory);
 void 
register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory& 
factory);
+void 
register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory& 
factory);
 void register_aggregate_function_regr_union(AggregateFunctionSimpleFactory& 
factory);
 void register_aggregate_function_retention(AggregateFunctionSimpleFactory& 
factory);
 void 
register_aggregate_function_percentile_approx(AggregateFunctionSimpleFactory& 
factory);
@@ -111,6 +112,7 @@ AggregateFunctionSimpleFactory& 
AggregateFunctionSimpleFactory::instance() {
         register_aggregate_function_percentile_approx(instance);
         register_aggregate_function_window_funnel(instance);
         register_aggregate_function_window_funnel_old(instance);
+        register_aggregate_function_window_funnel_v2(instance);
         register_aggregate_function_regr_union(instance);
         register_aggregate_function_retention(instance);
         register_aggregate_function_orthogonal_bitmap(instance);
diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp 
b/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
index 3ef25d7de11..fbd9fde900e 100644
--- a/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
@@ -47,7 +47,8 @@ AggregateFunctionPtr 
create_aggregate_function_window_funnel(const std::string&
 }
 
 void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& 
factory) {
-    factory.register_function_both("window_funnel", 
create_aggregate_function_window_funnel);
+    factory.register_function_both("window_funnel_v1", 
create_aggregate_function_window_funnel);
+    factory.register_alias("window_funnel_v1", "window_funnel");
 }
 void 
register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory& 
factory) {
     
BeExecVersionManager::registe_restrict_function_compatibility("window_funnel");
diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel.h 
b/be/src/exprs/aggregate/aggregate_function_window_funnel.h
index 9908c8c8808..4e739474ad1 100644
--- a/be/src/exprs/aggregate/aggregate_function_window_funnel.h
+++ b/be/src/exprs/aggregate/aggregate_function_window_funnel.h
@@ -56,7 +56,7 @@ namespace doris {
 
 enum class WindowFunnelMode : Int64 { INVALID, DEFAULT, DEDUPLICATION, FIXED, 
INCREASE };
 
-WindowFunnelMode string_to_window_funnel_mode(const String& string) {
+inline WindowFunnelMode string_to_window_funnel_mode(const String& string) {
     if (string == "default") {
         return WindowFunnelMode::DEFAULT;
     } else if (string == "deduplication") {
diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp 
b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.cpp
similarity index 58%
copy from be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
copy to be/src/exprs/aggregate/aggregate_function_window_funnel_v2.cpp
index 3ef25d7de11..8cb73675c34 100644
--- a/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exprs/aggregate/aggregate_function_window_funnel.h"
+#include "exprs/aggregate/aggregate_function_window_funnel_v2.h"
 
 #include <string>
 
@@ -28,17 +28,19 @@
 namespace doris {
 #include "common/compile_check_begin.h"
 
-AggregateFunctionPtr create_aggregate_function_window_funnel(const 
std::string& name,
-                                                             const DataTypes& 
argument_types,
-                                                             const 
DataTypePtr& result_type,
-                                                             const bool 
result_is_nullable,
-                                                             const 
AggregateFunctionAttr& attr) {
-    if (argument_types.size() < 3) {
-        LOG(WARNING) << "window_funnel's argument less than 3.";
+AggregateFunctionPtr create_aggregate_function_window_funnel_v2(const 
std::string& name,
+                                                                const 
DataTypes& argument_types,
+                                                                const 
DataTypePtr& result_type,
+                                                                const bool 
result_is_nullable,
+                                                                const 
AggregateFunctionAttr& attr) {
+    if (argument_types.size() < 4) {
+        LOG(WARNING) << "window_funnel_v2 requires at least 4 arguments 
(window, mode, timestamp, "
+                        "and at least 1 boolean condition), but got "
+                     << argument_types.size() << ".";
         return nullptr;
     }
     if (argument_types[2]->get_primitive_type() == TYPE_DATETIMEV2) {
-        return creator_without_type::create<AggregateFunctionWindowFunnel>(
+        return creator_without_type::create<AggregateFunctionWindowFunnelV2>(
                 argument_types, result_is_nullable, attr);
     } else {
         LOG(WARNING) << "Only support DateTime type as window argument!";
@@ -46,10 +48,8 @@ AggregateFunctionPtr 
create_aggregate_function_window_funnel(const std::string&
     }
 }
 
-void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& 
factory) {
-    factory.register_function_both("window_funnel", 
create_aggregate_function_window_funnel);
-}
-void 
register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory& 
factory) {
-    
BeExecVersionManager::registe_restrict_function_compatibility("window_funnel");
+void 
register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory& 
factory) {
+    factory.register_function_both("window_funnel_v2", 
create_aggregate_function_window_funnel_v2);
 }
+
 } // namespace doris
diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h 
b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h
new file mode 100644
index 00000000000..8038c0bef47
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h
@@ -0,0 +1,613 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <iterator>
+#include <utility>
+#include <vector>
+
+#include "common/cast_set.h"
+#include "common/exception.h"
+#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/column/column_string.h"
+#include "core/data_type/data_type_number.h"
+#include "core/types.h"
+#include "core/value/vdatetime_value.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_window_funnel.h" // for 
WindowFunnelMode, string_to_window_funnel_mode
+#include "util/var_int.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class Arena;
+class BufferReadable;
+class BufferWritable;
+class IColumn;
+} // namespace doris
+
+namespace doris {
+
+/// Merge two event lists, utilizing sorted flags to optimize.
+/// After merge, all events are in `events_list` and it is sorted.
+template <typename T>
+void merge_events_list(T& events_list, size_t prefix_size, bool prefix_sorted, 
bool suffix_sorted) {
+    if (!prefix_sorted && !suffix_sorted) {
+        std::stable_sort(std::begin(events_list), std::end(events_list));
+    } else {
+        const auto begin = std::begin(events_list);
+        const auto middle = std::next(begin, prefix_size);
+        const auto end = std::end(events_list);
+
+        if (!prefix_sorted) {
+            std::stable_sort(begin, middle);
+        }
+        if (!suffix_sorted) {
+            std::stable_sort(middle, end);
+        }
+        std::inplace_merge(begin, middle, end);
+    }
+}
+
+/// V2 state: stores only matched events as (timestamp, event_index) pairs.
+/// Compared to V1 which stores all rows with N boolean columns, V2 only stores
+/// events that actually match at least one condition, dramatically reducing 
memory.
+///
+/// To prevent same-row multi-condition chain advancement (where a single row
+/// matching multiple conditions could incorrectly advance the funnel through
+/// multiple levels), we use the high bit of event_idx as a "same-row 
continuation"
+/// flag. When a row matches multiple conditions, the first event stored for 
that
+/// row has bit 7 = 0, and subsequent events from the same row have bit 7 = 1.
+/// The algorithm uses this to ensure each funnel step comes from a different 
row.
+///
+/// This approach adds ZERO storage overhead — each event remains 9 bytes 
(UInt64 + UInt8).
+struct WindowFunnelStateV2 {
+    /// (timestamp_int_val, 1-based event_index with continuation flag in bit 
7)
+    ///
+    /// Bit layout of event_idx:
+    ///   bit 7 (0x80): continuation flag — 1 means this event is from the 
same row
+    ///                  as the preceding event in events_list
+    ///   bits 0-6:     actual 1-based event index (supports up to 127 
conditions)
+    ///
+    /// Sorted by timestamp only (via operator<). stable_sort preserves 
insertion order
+    /// for equal timestamps, so same-row events remain consecutive after 
sorting.
+    struct TimestampEvent {
+        UInt64 timestamp;
+        UInt8 event_idx; // includes continuation flag in bit 7
+
+        /// Sort by timestamp only. For same timestamp, stable_sort preserves 
insertion
+        /// order, keeping same-row events consecutive.
+        bool operator<(const TimestampEvent& other) const { return timestamp < 
other.timestamp; }
+        bool operator<=(const TimestampEvent& other) const { return timestamp 
<= other.timestamp; }
+    };
+
+    static constexpr UInt8 CONTINUATION_FLAG = 0x80;
+    static constexpr UInt8 EVENT_IDX_MASK = 0x7F;
+    static constexpr int MAX_EVENT_CONDITIONS = EVENT_IDX_MASK;
+
+    static void validate_event_count(int count) {
+        if (count < 0 || count > MAX_EVENT_CONDITIONS) {
+            throw Exception(
+                    ErrorCode::INVALID_ARGUMENT,
+                    "window_funnel_v2 supports at most {} conditions because 
event indexes are "
+                    "encoded in 7 bits, but got {}",
+                    MAX_EVENT_CONDITIONS, count);
+        }
+    }
+
+    /// Extract the actual 1-based event index (stripping continuation flag).
+    static int get_event_idx(UInt8 raw) { return (raw & EVENT_IDX_MASK); }
+    /// Check if this event is a continuation of the same row as the previous 
event.
+    static bool is_continuation(UInt8 raw) { return (raw & CONTINUATION_FLAG) 
!= 0; }
+
+    static constexpr int64_t WINDOW_UNSET = -1;
+
+    int event_count = 0;
+    int64_t window = WINDOW_UNSET;
+    WindowFunnelMode window_funnel_mode = WindowFunnelMode::INVALID;
+    bool sorted = true;
+    std::vector<TimestampEvent> events_list;
+
+    WindowFunnelStateV2() = default;
+    WindowFunnelStateV2(int arg_event_count) : event_count(arg_event_count) {}
+
+    void reset() {
+        events_list.clear();
+        sorted = true;
+    }
+
+    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, 
WindowFunnelMode mode) {
+        window = win;
+        window_funnel_mode = mode;
+
+        // get_data() returns DateV2Value<DateTimeV2ValueType>; convert to 
packed UInt64
+        auto timestamp = assert_cast<const 
ColumnVector<TYPE_DATETIMEV2>&>(*arg_columns[2])
+                                 .get_data()[row_num]
+                                 .to_date_int_val();
+
+        // Iterate from last event to first (reverse order).
+        // This ensures that after stable_sort, events with the same timestamp
+        // appear in descending event_index order, which is important for 
correct
+        // matching when one row satisfies multiple conditions.
+        //
+        // The first event stored for this row has continuation=0;
+        // subsequent events from the same row have continuation=1 (bit 7 set).
+        bool first_match = true;
+        for (int i = event_count - 1; i >= 0; --i) {
+            auto event_val =
+                    assert_cast<const ColumnUInt8&>(*arg_columns[3 + 
i]).get_data()[row_num];
+            if (event_val) {
+                UInt8 packed_idx = cast_set<UInt8>(i + 1);
+                if (!first_match) {
+                    packed_idx |= CONTINUATION_FLAG;
+                }
+                first_match = false;
+                TimestampEvent new_event {timestamp, packed_idx};
+                if (sorted && !events_list.empty()) {
+                    sorted = events_list.back() <= new_event;
+                }
+                events_list.emplace_back(new_event);
+            }
+        }
+    }
+
+    void sort() {
+        if (!sorted) {
+            std::stable_sort(std::begin(events_list), std::end(events_list));
+            sorted = true;
+        }
+    }
+
+    void merge(const WindowFunnelStateV2& other) {
+        if (other.events_list.empty()) {
+            return;
+        }
+
+        if (events_list.empty()) {
+            events_list = other.events_list;
+            sorted = other.sorted;
+        } else {
+            const auto prefix_size = events_list.size();
+            events_list.insert(std::end(events_list), 
std::begin(other.events_list),
+                               std::end(other.events_list));
+            // Both stable_sort and inplace_merge preserve relative order of 
equal elements.
+            // Since same-row events have the same timestamp (and thus compare 
equal in
+            // the primary sort key), they remain consecutive after merge — 
preserving
+            // the validity of continuation flags.
+            merge_events_list(events_list, prefix_size, sorted, other.sorted);
+            sorted = true;
+        }
+
+        event_count = event_count > 0 ? event_count : other.event_count;
+        window = window != WINDOW_UNSET ? window : other.window;
+        window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID
+                                     ? other.window_funnel_mode
+                                     : window_funnel_mode;
+    }
+
+    void write(BufferWritable& out) const {
+        write_var_int(event_count, out);
+        write_var_int(window, out);
+        
write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
+                      out);
+        write_var_int(sorted ? 1 : 0, out);
+        write_var_int(cast_set<Int64>(events_list.size()), out);
+        for (const auto& evt : events_list) {
+            // Use fixed-size binary write for timestamp (8 bytes) and 
event_idx (1 byte).
+            // The event_idx byte includes the continuation flag in bit 7.
+            out.write(reinterpret_cast<const char*>(&evt.timestamp), 
sizeof(evt.timestamp));
+            out.write(reinterpret_cast<const char*>(&evt.event_idx), 
sizeof(evt.event_idx));
+        }
+    }
+
+    void read(BufferReadable& in) {
+        Int64 tmp = 0;
+        read_var_int(tmp, in);
+        event_count = cast_set<int>(tmp);
+
+        read_var_int(window, in);
+
+        read_var_int(tmp, in);
+        window_funnel_mode = static_cast<WindowFunnelMode>(tmp);
+
+        read_var_int(tmp, in);
+        sorted = (tmp != 0);
+
+        Int64 size = 0;
+        read_var_int(size, in);
+        events_list.clear();
+        events_list.resize(size);
+        for (Int64 i = 0; i < size; ++i) {
+            in.read(reinterpret_cast<char*>(&events_list[i].timestamp),
+                    sizeof(events_list[i].timestamp));
+            in.read(reinterpret_cast<char*>(&events_list[i].event_idx),
+                    sizeof(events_list[i].event_idx));
+        }
+    }
+
+    using DateValueType = DateV2Value<DateTimeV2ValueType>;
+
+    /// Reconstruct DateV2Value from packed UInt64.
+    static DateValueType _ts_from_int(UInt64 packed) { return 
DateValueType(packed); }
+
+    /// Check if `current_ts` is within `window` seconds of `base_ts`.
+    /// Both are packed UInt64 from DateV2Value::to_date_int_val().
+    bool _within_window(UInt64 base_ts, UInt64 current_ts) const {
+        DateValueType end_ts = _ts_from_int(base_ts);
+        TimeInterval interval(SECOND, window, false);
+        end_ts.template date_add_interval<SECOND>(interval);
+        return current_ts <= end_ts.to_date_int_val();
+    }
+
+    /// Track (first_timestamp, last_timestamp, last_event_list_idx) for each 
event level.
+    /// Uses packed UInt64 values; 0 means unset for first_ts.
+    /// last_list_idx tracks the position in events_list of the event that set 
this level,
+    /// used to check continuation flag on subsequent events to detect 
same-row advancement.
+    struct TimestampPair {
+        UInt64 first_ts = 0;
+        UInt64 last_ts = 0;
+        size_t last_list_idx = 0;
+        bool has_value() const { return first_ts != 0; }
+        void reset() {
+            first_ts = 0;
+            last_ts = 0;
+            last_list_idx = 0;
+        }
+    };
+
+    int get() const {
+        if (event_count == 0 || events_list.empty()) {
+            return 0;
+        }
+        if (window < 0) {
+            throw Exception(ErrorCode::INVALID_ARGUMENT,
+                            "the sliding time window must be a positive 
integer, but got: {}",
+                            window);
+        }
+
+        switch (window_funnel_mode) {
+        case WindowFunnelMode::DEFAULT:
+            return _get_default();
+        case WindowFunnelMode::INCREASE:
+            return _get_increase();
+        case WindowFunnelMode::DEDUPLICATION:
+            return _get_deduplication();
+        case WindowFunnelMode::FIXED:
+            return _get_fixed();
+        default:
+            throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel 
mode");
+        }
+    }
+
+private:
+    /// DEFAULT mode: O(N) single-pass algorithm.
+    /// Uses events_timestamp array to track the (first, last) timestamps for 
each level.
+    /// For each event in sorted order:
+    ///   - If it's event 0, start a new potential chain
+    ///   - If its predecessor level has been matched, within time window, AND 
from a
+    ///     different row (checked via continuation flag), extend the chain
+    ///
+    /// In DEFAULT mode, unconditionally overwriting events_timestamp[0] when 
a new event-0
+    /// appears is safe: timestamps are monotonically non-decreasing, higher 
levels retain
+    /// the old chain's first_ts, and the <= window check still succeeds.
+    int _get_default() const {
+        std::vector<TimestampPair> events_timestamp(event_count);
+
+        for (size_t i = 0; i < events_list.size(); ++i) {
+            const auto& evt = events_list[i];
+            int event_idx = get_event_idx(evt.event_idx) - 1;
+
+            if (event_idx == 0) {
+                events_timestamp[0] = {evt.timestamp, evt.timestamp, i};
+            } else if (events_timestamp[event_idx - 1].has_value() &&
+                       !_is_same_row(events_timestamp[event_idx - 
1].last_list_idx, i)) {
+                // Must be from a DIFFERENT row than the predecessor level
+                if (_within_window(events_timestamp[event_idx - 1].first_ts, 
evt.timestamp)) {
+                    events_timestamp[event_idx] = {events_timestamp[event_idx 
- 1].first_ts,
+                                                   evt.timestamp, i};
+                    if (event_idx + 1 == event_count) {
+                        return event_count;
+                    }
+                }
+            }
+        }
+
+        for (int event = event_count; event > 0; --event) {
+            if (events_timestamp[event - 1].has_value()) {
+                return event;
+            }
+        }
+        return 0;
+    }
+
+    /// INCREASE mode: multi-pass algorithm matching V1 semantics.
+    ///
+    /// A single-pass approach cannot correctly handle INCREASE mode because 
when a new
+    /// event-0 appears, the old chain and the new chain have different 
trade-offs:
+    /// - The old chain has an earlier last_ts (better for the strict-increase 
check)
+    /// - The new chain has a later first_ts (better for the time-window check)
+    /// Neither dominates the other, so both must be tried independently.
+    ///
+    /// This method iterates over each event-0 occurrence as a potential chain 
start,
+    /// then scans forward to build the longest matching chain from that start.
+    /// The maximum chain length across all starts is returned.
+    ///
+    /// Complexity: O(M_event0 × N_matched) worst-case, where M_event0 is the 
count of
+    /// event-0 occurrences and N_matched is the total matched-event count. In 
practice
+    /// N_matched is much smaller than total rows (V2 only stores matched 
events), and
+    /// the remaining-events pruning eliminates start points that cannot 
improve max_level,
+    /// so the typical case is significantly better than worst-case.
+    int _get_increase() const {
+        int max_level = 0;
+        const size_t list_size = events_list.size();
+
+        for (size_t start = 0; start < list_size; ++start) {
+            // Remaining-events pruning: from this start point, at most
+            // (list_size - start) events remain. If that can't beat 
max_level, stop.
+            // This also prunes all subsequent start points since they have 
even fewer.
+            if (static_cast<int>(list_size - start) <= max_level) {
+                break;
+            }
+
+            int start_event = get_event_idx(events_list[start].event_idx) - 1;
+            if (start_event != 0) {
+                continue;
+            }
+
+            // Try building a chain from this event-0
+            std::vector<TimestampPair> events_timestamp(event_count);
+            events_timestamp[0] = {events_list[start].timestamp, 
events_list[start].timestamp,
+                                   start};
+            int curr_level = 0;
+
+            for (size_t i = start + 1; i < list_size; ++i) {
+                const auto& evt = events_list[i];
+                int event_idx = get_event_idx(evt.event_idx) - 1;
+
+                if (event_idx == 0) {
+                    // Another event-0: this chain's event-0 phase is done; 
skip
+                    continue;
+                }
+
+                if (events_timestamp[event_idx - 1].has_value() &&
+                    !_is_same_row(events_timestamp[event_idx - 
1].last_list_idx, i)) {
+                    bool matched =
+                            _within_window(events_timestamp[event_idx - 
1].first_ts, evt.timestamp);
+                    matched = matched && events_timestamp[event_idx - 
1].last_ts < evt.timestamp;
+                    if (matched) {
+                        events_timestamp[event_idx] = 
{events_timestamp[event_idx - 1].first_ts,
+                                                       evt.timestamp, i};
+                        if (event_idx > curr_level) {
+                            curr_level = event_idx;
+                        }
+                        if (event_idx + 1 == event_count) {
+                            return event_count;
+                        }
+                    }
+                }
+            }
+
+            if (curr_level + 1 > max_level) {
+                max_level = curr_level + 1;
+            }
+        }
+        return max_level;
+    }
+
+    /// DEDUPLICATION mode: if a previously matched event level appears again,
+    /// the current chain is terminated and max_level is updated.
+    /// This preserves V1 semantics where duplicate events break the chain.
+    int _get_deduplication() const {
+        std::vector<TimestampPair> events_timestamp(event_count);
+        int max_level = -1;
+        int curr_level = -1;
+
+        for (size_t i = 0; i < events_list.size(); ++i) {
+            const auto& evt = events_list[i];
+            int event_idx = get_event_idx(evt.event_idx) - 1;
+
+            if (event_idx == 0) {
+                // Duplicate of event 0: terminate current chain first
+                if (events_timestamp[0].has_value()) {
+                    if (curr_level > max_level) {
+                        max_level = curr_level;
+                    }
+                    _eliminate_chain(curr_level, events_timestamp);
+                }
+                // Start a new chain
+                events_timestamp[0] = {evt.timestamp, evt.timestamp, i};
+                curr_level = 0;
+            } else if (events_timestamp[event_idx].has_value()) {
+                // Duplicate event detected: this level was already matched
+                if (curr_level > max_level) {
+                    max_level = curr_level;
+                }
+                // Eliminate current chain
+                _eliminate_chain(curr_level, events_timestamp);
+            } else if (events_timestamp[event_idx - 1].has_value() &&
+                       !_is_same_row(events_timestamp[event_idx - 
1].last_list_idx, i)) {
+                // Must be from a DIFFERENT row than the predecessor level
+                if (_promote_level(events_timestamp, evt.timestamp, i, 
event_idx, curr_level,
+                                   false)) {
+                    return event_count;
+                }
+            }
+        }
+
+        if (curr_level > max_level) {
+            return curr_level + 1;
+        }
+        return max_level + 1;
+    }
+
+    /// FIXED mode (StarRocks-style semantics): if a matched event appears 
whose
+    /// predecessor level has NOT been matched, the chain is broken (event 
level jumped).
+    /// Note: V2 semantics differ from V1. V1 checks physical row adjacency;
+    /// V2 checks event level continuity (unmatched rows don't break the 
chain).
+    int _get_fixed() const {
+        std::vector<TimestampPair> events_timestamp(event_count);
+        int max_level = -1;
+        int curr_level = -1;
+        bool first_event = false;
+
+        for (size_t i = 0; i < events_list.size(); ++i) {
+            const auto& evt = events_list[i];
+            int event_idx = get_event_idx(evt.event_idx) - 1;
+
+            if (event_idx == 0) {
+                // Save current chain before starting a new one
+                if (events_timestamp[0].has_value()) {
+                    if (curr_level > max_level) {
+                        max_level = curr_level;
+                    }
+                    _eliminate_chain(curr_level, events_timestamp);
+                }
+                events_timestamp[0] = {evt.timestamp, evt.timestamp, i};
+                curr_level = 0;
+                first_event = true;
+            } else if (first_event && !events_timestamp[event_idx - 
1].has_value()) {
+                // Event level jumped: predecessor was not matched
+                if (curr_level >= 0) {
+                    if (curr_level > max_level) {
+                        max_level = curr_level;
+                    }
+                    _eliminate_chain(curr_level, events_timestamp);
+                }
+            } else if (events_timestamp[event_idx - 1].has_value() &&
+                       !_is_same_row(events_timestamp[event_idx - 
1].last_list_idx, i)) {
+                // Must be from a DIFFERENT row than the predecessor level
+                if (_promote_level(events_timestamp, evt.timestamp, i, 
event_idx, curr_level,
+                                   false)) {
+                    return event_count;
+                }
+            }
+        }
+
+        if (curr_level > max_level) {
+            return curr_level + 1;
+        }
+        return max_level + 1;
+    }
+
+    /// Clear the current event chain back to the beginning.
+    static void _eliminate_chain(int& curr_level, std::vector<TimestampPair>& 
events_timestamp) {
+        for (; curr_level >= 0; --curr_level) {
+            events_timestamp[curr_level].reset();
+        }
+    }
+
+    /// Check if the event at `current_idx` in events_list is from the same 
original row
+    /// as the event at `prev_idx`. Same-row events are consecutive in the 
sorted list
+    /// with continuation flags set. We walk backwards from current_idx 
checking if every
+    /// event between prev_idx+1 and current_idx has the continuation flag set.
+    bool _is_same_row(size_t prev_idx, size_t current_idx) const {
+        if (current_idx <= prev_idx) {
+            return false;
+        }
+        // Check that all events from prev_idx+1 to current_idx have the 
continuation flag.
+        // If any of them doesn't, there's a row boundary in between.
+        for (size_t j = prev_idx + 1; j <= current_idx; ++j) {
+            if (!is_continuation(events_list[j].event_idx)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /// Try to promote the chain to the next level.
+    /// Returns true if we've matched all events (early termination).
+    bool _promote_level(std::vector<TimestampPair>& events_timestamp, UInt64 
ts, size_t list_idx,
+                        int event_idx, int& curr_level, bool increase_mode) 
const {
+        bool matched = _within_window(events_timestamp[event_idx - 
1].first_ts, ts);
+        if (increase_mode) {
+            matched = matched && events_timestamp[event_idx - 1].last_ts < ts;
+        }
+        if (matched) {
+            events_timestamp[event_idx] = {events_timestamp[event_idx - 
1].first_ts, ts, list_idx};
+            if (event_idx > curr_level) {
+                curr_level = event_idx;
+            }
+            if (event_idx + 1 == event_count) {
+                return true;
+            }
+        }
+        return false;
+    }
+};
+
+class AggregateFunctionWindowFunnelV2
+        : public IAggregateFunctionDataHelper<WindowFunnelStateV2, 
AggregateFunctionWindowFunnelV2>,
+          MultiExpression,
+          NullableAggregateFunction {
+public:
+    AggregateFunctionWindowFunnelV2(const DataTypes& argument_types_)
+            : IAggregateFunctionDataHelper<WindowFunnelStateV2, 
AggregateFunctionWindowFunnelV2>(
+                      argument_types_) {
+        WindowFunnelStateV2::validate_event_count(
+                cast_set<int>(IAggregateFunction::get_argument_types().size() 
- 3));
+    }
+
+    void create(AggregateDataPtr __restrict place) const override {
+        new (place) WindowFunnelStateV2(
+                cast_set<int>(IAggregateFunction::get_argument_types().size() 
- 3));
+    }
+
+    String get_name() const override { return "window_funnel_v2"; }
+
+    DataTypePtr get_return_type() const override { return 
std::make_shared<DataTypeInt32>(); }
+
+    void reset(AggregateDataPtr __restrict place) const override { 
this->data(place).reset(); }
+
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
+             Arena&) const override {
+        const auto& win = assert_cast<const 
ColumnInt64&>(*columns[0]).get_data()[row_num];
+        StringRef mode = columns[1]->get_data_at(row_num);
+        this->data(place).add(columns, row_num, win,
+                              string_to_window_funnel_mode(mode.to_string()));
+    }
+
+    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+               Arena&) const override {
+        this->data(place).merge(this->data(rhs));
+    }
+
+    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
+        this->data(place).write(buf);
+    }
+
+    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+                     Arena&) const override {
+        this->data(place).read(buf);
+    }
+
+    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
+        this->data(const_cast<AggregateDataPtr>(place)).sort();
+        assert_cast<ColumnInt32&>(to).get_data().push_back(
+                IAggregateFunctionDataHelper<WindowFunnelStateV2,
+                                             
AggregateFunctionWindowFunnelV2>::data(place)
+                        .get());
+    }
+
+protected:
+    using IAggregateFunction::version;
+};
+
+} // namespace doris
+
+#include "common/compile_check_end.h"
diff --git a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp 
b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp
new file mode 100644
index 00000000000..6e82d430513
--- /dev/null
+++ b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp
@@ -0,0 +1,1167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <stddef.h>
+
+#include <memory>
+#include <ostream>
+
+#include "core/column/column_string.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_date_or_datetime_v2.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/string_buffer.hpp"
+#include "core/value/vdatetime_value.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+#include "gtest/gtest_pred_impl.h"
+
+namespace doris {
+class IColumn;
+} // namespace doris
+
+namespace doris {
+
+void 
register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory& 
factory);
+
+class VWindowFunnelV2Test : public testing::Test {
+public:
+    AggregateFunctionPtr agg_function;
+
+    VWindowFunnelV2Test() {}
+
+    void SetUp() {
+        AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
+        DataTypes data_types = {
+                std::make_shared<DataTypeInt64>(),      
std::make_shared<DataTypeString>(),
+                std::make_shared<DataTypeDateTimeV2>(), 
std::make_shared<DataTypeUInt8>(),
+                std::make_shared<DataTypeUInt8>(),      
std::make_shared<DataTypeUInt8>(),
+                std::make_shared<DataTypeUInt8>()};
+        agg_function = factory.get("window_funnel_v2", data_types, nullptr, 
false,
+                                   BeExecVersionManager::get_newest_version());
+        EXPECT_NE(agg_function, nullptr);
+    }
+
+    void TearDown() {}
+
+    Arena arena;
+};
+
+TEST_F(VWindowFunnelV2Test, testEmpty) {
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+
+    ColumnString buf;
+    VectorBufferWriter buf_writer(buf);
+    agg_function->serialize(place, buf_writer);
+    buf_writer.commit();
+    LOG(INFO) << "buf size : " << buf.size();
+    VectorBufferReader buf_reader(buf.get_data_at(0));
+    agg_function->deserialize(place, buf_reader, arena);
+
+    std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place2 = memory2.get();
+    agg_function->create(place2);
+
+    agg_function->merge(place, place2, arena);
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    EXPECT_EQ(column_result.get_data()[0], 0);
+
+    ColumnInt32 column_result2;
+    agg_function->insert_result_into(place2, column_result2);
+    EXPECT_EQ(column_result2.get_data()[0], 0);
+
+    agg_function->destroy(place);
+    agg_function->destroy(place2);
+}
+
+TEST_F(VWindowFunnelV2Test, testSerialize) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(2));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    for (int i = 0; i < NUM_CONDS; i++) {
+        agg_function->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    EXPECT_EQ(column_result.get_data()[0], 3);
+
+    ColumnString buf;
+    VectorBufferWriter buf_writer(buf);
+    agg_function->serialize(place, buf_writer);
+    buf_writer.commit();
+    agg_function->destroy(place);
+
+    std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place2 = memory2.get();
+    agg_function->create(place2);
+
+    VectorBufferReader buf_reader(buf.get_data_at(0));
+    agg_function->deserialize(place2, buf_reader, arena);
+
+    ColumnInt32 column_result2;
+    agg_function->insert_result_into(place2, column_result2);
+    EXPECT_EQ(column_result2.get_data()[0], 3);
+    agg_function->destroy(place2);
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultSortedNoMerge) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    for (int win = 0; win < NUM_CONDS + 1; win++) {
+        auto column_window = ColumnInt64::create();
+        for (int i = 0; i < NUM_CONDS; i++) {
+            column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+        }
+
+        std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_function->create(place);
+        const IColumn* column[7] = {column_window.get(),    column_mode.get(),
+                                    column_timestamp.get(), 
column_event1.get(),
+                                    column_event2.get(),    
column_event3.get(),
+                                    column_event4.get()};
+        for (int i = 0; i < NUM_CONDS; i++) {
+            agg_function->add(place, column, i, arena);
+        }
+
+        ColumnInt32 column_result;
+        agg_function->insert_result_into(place, column_result);
+        EXPECT_EQ(column_result.get_data()[0],
+                  win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+        agg_function->destroy(place);
+    }
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultSortedMerge) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    for (int win = 0; win < NUM_CONDS + 1; win++) {
+        auto column_window = ColumnInt64::create();
+        for (int i = 0; i < NUM_CONDS; i++) {
+            column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+        }
+
+        std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_function->create(place);
+        const IColumn* column[7] = {column_window.get(),    column_mode.get(),
+                                    column_timestamp.get(), 
column_event1.get(),
+                                    column_event2.get(),    
column_event3.get(),
+                                    column_event4.get()};
+        for (int i = 0; i < NUM_CONDS; i++) {
+            agg_function->add(place, column, i, arena);
+        }
+
+        std::unique_ptr<char[]> memory2(new 
char[agg_function->size_of_data()]);
+        AggregateDataPtr place2 = memory2.get();
+        agg_function->create(place2);
+
+        agg_function->merge(place2, place, arena);
+        ColumnInt32 column_result;
+        agg_function->insert_result_into(place2, column_result);
+        EXPECT_EQ(column_result.get_data()[0],
+                  win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+        agg_function->destroy(place);
+        agg_function->destroy(place2);
+    }
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultReverseSortedNoMerge) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, NUM_CONDS - i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    for (int win = 0; win < NUM_CONDS + 1; win++) {
+        auto column_window = ColumnInt64::create();
+        for (int i = 0; i < NUM_CONDS; i++) {
+            column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+        }
+
+        std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_function->create(place);
+        const IColumn* column[7] = {column_window.get(),    column_mode.get(),
+                                    column_timestamp.get(), 
column_event1.get(),
+                                    column_event2.get(),    
column_event3.get(),
+                                    column_event4.get()};
+        for (int i = 0; i < NUM_CONDS; i++) {
+            agg_function->add(place, column, i, arena);
+        }
+
+        LOG(INFO) << "win " << win;
+        ColumnInt32 column_result;
+        agg_function->insert_result_into(place, column_result);
+        EXPECT_EQ(column_result.get_data()[0],
+                  win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+        agg_function->destroy(place);
+    }
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultReverseSortedMerge) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, NUM_CONDS - i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    for (int win = 0; win < NUM_CONDS + 1; win++) {
+        auto column_window = ColumnInt64::create();
+        for (int i = 0; i < NUM_CONDS; i++) {
+            column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+        }
+
+        std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_function->create(place);
+        const IColumn* column[7] = {column_window.get(),    column_mode.get(),
+                                    column_timestamp.get(), 
column_event1.get(),
+                                    column_event2.get(),    
column_event3.get(),
+                                    column_event4.get()};
+        for (int i = 0; i < NUM_CONDS; i++) {
+            agg_function->add(place, column, i, arena);
+        }
+
+        std::unique_ptr<char[]> memory2(new 
char[agg_function->size_of_data()]);
+        AggregateDataPtr place2 = memory2.get();
+        agg_function->create(place2);
+
+        agg_function->merge(place2, place, arena);
+        ColumnInt32 column_result;
+        agg_function->insert_result_into(place2, column_result);
+        EXPECT_EQ(column_result.get_data()[0],
+                  win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+        agg_function->destroy(place);
+        agg_function->destroy(place2);
+    }
+}
+
+// Test that V2 only stores matched events (unmatched rows are not stored)
+// This verifies the core memory optimization.
+TEST_F(VWindowFunnelV2Test, testOnlyMatchedEventsStored) {
+    const int NUM_ROWS = 6;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    // 4 events, but rows 2 and 4 (0-indexed) match nothing
+    // Row 0: event1=true
+    // Row 1: event2=true
+    // Row 2: all false (no match)
+    // Row 3: event3=true
+    // Row 4: all false (no match)
+    // Row 5: event4=true
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_function->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    // All 4 events matched in order within the window
+    EXPECT_EQ(column_result.get_data()[0], 4);
+    agg_function->destroy(place);
+}
+
+// Test INCREASE mode: timestamps must be strictly increasing
+TEST_F(VWindowFunnelV2Test, testIncreaseMode) {
+    const int NUM_ROWS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    // Events 2 and 3 have the same timestamp
+    VecDateTimeValue tv0, tv1, tv2, tv3;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
+    tv2.unchecked_set_time(2022, 2, 28, 0, 0, 1); // same as tv1
+    tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_function->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    // Event 2 and 3 have same timestamp, so increase mode breaks at event 3
+    // Chain: event1(t=0) -> event2(t=1), event3 has same ts as event2, so 
fails
+    // Result: 2
+    EXPECT_EQ(column_result.get_data()[0], 2);
+    agg_function->destroy(place);
+}
+
+// Test DEDUPLICATION mode: duplicate events break the chain
+TEST_F(VWindowFunnelV2Test, testDeduplicationMode) {
+    const int NUM_ROWS = 5;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("deduplication"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    VecDateTimeValue tv0, tv1, tv2, tv3, tv4;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
+    tv2.unchecked_set_time(2022, 2, 28, 0, 0, 2);
+    tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
+    tv4.unchecked_set_time(2022, 2, 28, 0, 0, 4);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    auto dtv2_4 = tv4.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+    column_timestamp->insert_data((char*)&dtv2_4, 0);
+
+    // Events: event1, event2, event1(dup!), event3, event4
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // duplicate
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_function->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    // Chain: event1(t=0) -> event2(t=1), then event1 dup at t=2 breaks chain 
(max_level=2)
+    // New chain: event1(t=2) -> event3(t=3) -> event4(t=4): level=3 but 
event2 not matched
+    // Actually event1(t=2) starts new chain, event3(t=3) needs event2 first, 
so new chain = 1
+    // max(2, 1) = 2
+    EXPECT_EQ(column_result.get_data()[0], 2);
+    agg_function->destroy(place);
+}
+
+// Test FIXED mode (StarRocks-style): event level must not jump
+TEST_F(VWindowFunnelV2Test, testFixedMode) {
+    const int NUM_ROWS = 5;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("fixed"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    VecDateTimeValue tv0, tv1, tv2, tv3, tv4;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
+    tv2.unchecked_set_time(2022, 2, 28, 0, 0, 2);
+    tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
+    tv4.unchecked_set_time(2022, 2, 28, 0, 0, 4);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    auto dtv2_4 = tv4.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+    column_timestamp->insert_data((char*)&dtv2_4, 0);
+
+    // Events: event1, event2, event4(jump! skips event3), event3, event4
+    // In V2 fixed mode (StarRocks-style), event4 at t=2 has no predecessor 
(event3 not matched),
+    // so the chain breaks.
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1)); // jump
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_function->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    // Chain: event1(t=0) -> event2(t=1), then event4(t=2) jumps (no event3 
predecessor),
+    // chain breaks, max_level=2.
+    // No further complete chain starts since event3 and event4 happen after.
+    EXPECT_EQ(column_result.get_data()[0], 2);
+    agg_function->destroy(place);
+}
+
+// Test that same-row multi-condition matching does NOT advance the chain
+// through multiple levels. This tests the continuation flag logic.
+// Scenario: window_funnel(86400, 'default', ts, xwhat=1, xwhat!=2, xwhat=3)
+// Row 0 (xwhat=1): matches cond0=T (xwhat=1), cond1=T (1!=2), cond2=F → 2 
conditions on same row
+// Row 1 (xwhat=2): matches nothing (cond1: 2!=2=F)
+// Row 2 (xwhat=3): matches cond1=T (3!=2), cond2=T (xwhat=3) → 2 conditions 
on same row
+// Correct result: 2 (cond0 from row0, cond1 from row2). NOT 3.
+// Without the continuation flag, row0 would advance through both cond0 and 
cond1,
+// then row2 would match cond2 → result 3 (wrong).
+TEST_F(VWindowFunnelV2Test, testSameRowMultiConditionDefault) {
+    // 3 conditions (instead of 4), so we need a different setup
+    AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
+    DataTypes data_types_3 = {
+            std::make_shared<DataTypeInt64>(),      
std::make_shared<DataTypeString>(),
+            std::make_shared<DataTypeDateTimeV2>(), 
std::make_shared<DataTypeUInt8>(),
+            std::make_shared<DataTypeUInt8>(),      
std::make_shared<DataTypeUInt8>()};
+    auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, 
false,
+                                  BeExecVersionManager::get_newest_version());
+    ASSERT_NE(agg_func_3, nullptr);
+
+    const int NUM_ROWS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    // Row 0: ts=10:41:00  (xwhat=1)
+    // Row 1: ts=13:28:02  (xwhat=2)
+    // Row 2: ts=16:15:01  (xwhat=3)
+    // Row 3: ts=19:05:04  (xwhat=4)
+    VecDateTimeValue tv0, tv1, tv2, tv3;
+    tv0.unchecked_set_time(2022, 3, 12, 10, 41, 0);
+    tv1.unchecked_set_time(2022, 3, 12, 13, 28, 2);
+    tv2.unchecked_set_time(2022, 3, 12, 16, 15, 1);
+    tv3.unchecked_set_time(2022, 3, 12, 19, 5, 4);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+    // cond0: xwhat=1 (only row0 matches)
+    auto column_cond0 = ColumnUInt8::create();
+    column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row0: 
xwhat=1 → T
+    column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1: 
xwhat=2 → F
+    column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row2: 
xwhat=3 → F
+    column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row3: 
xwhat=4 → F
+
+    // cond1: xwhat!=2 (rows 0,2,3 match)
+    auto column_cond1 = ColumnUInt8::create();
+    column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row0: 1!=2 
→ T
+    column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1: 2!=2 
→ F
+    column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row2: 3!=2 
→ T
+    column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row3: 4!=2 
→ T
+
+    // cond2: xwhat=3 (only row2 matches)
+    auto column_cond2 = ColumnUInt8::create();
+    column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row0: F
+    column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1: F
+    column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row2: T
+    column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row3: F
+
+    // window = 86400 seconds (24 hours)
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_func_3->create(place);
+    const IColumn* column[6] = {column_window.get(), column_mode.get(),  
column_timestamp.get(),
+                                column_cond0.get(),  column_cond1.get(), 
column_cond2.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_func_3->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_func_3->insert_result_into(place, column_result);
+    // Without continuation flag: row0 matches cond0+cond1 (same row advances 
both),
+    // row2 matches cond2 → result=3 (WRONG)
+    // With continuation flag: row0 sets cond0 only (cond1 is same-row 
continuation),
+    // row2's cond1 extends chain (different row), but cond2 from row2 is 
same-row → stops at 2
+    EXPECT_EQ(column_result.get_data()[0], 2);
+    agg_func_3->destroy(place);
+}
+
+// Test same-row multi-condition with ALL conditions matching the same event 
name
+// window_funnel(big_window, 'default', ts, event='登录', event='登录', 
event='登录', ...)
+// A single row matching all 4 conditions should only count as level 1 (not 4).
+TEST_F(VWindowFunnelV2Test, testSameRowAllConditionsMatch) {
+    auto column_mode = ColumnString::create();
+    column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+
+    auto column_timestamp = ColumnDateTimeV2::create();
+    VecDateTimeValue tv0;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+
+    // All 4 conditions match the single row
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    agg_function->add(place, column, 0, arena);
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    // Only 1 row matching all conditions → can only reach level 1
+    // because each funnel step must come from a different row
+    EXPECT_EQ(column_result.get_data()[0], 1);
+    agg_function->destroy(place);
+}
+
+// Test INCREASE mode: event-0 re-occurrence should not break an already-valid 
chain.
+// Counterexample from code review:
+//   3 conditions, window=100s, INCREASE mode
+//   Row A (t=0s): event1 only
+//   Row B (t=50s): event1 only  (new event-0 overwrites old)
+//   Row C (t=50s): event2 only
+//   Row D (t=60s): event3 only
+// Correct result: 3 (chain starting from t=0: e1@0 -> e2@50 -> e3@60)
+// Bug (before fix): returned 1 because overwriting events_timestamp[0] with 
t=50
+// caused the INCREASE check (50 < 50) to fail for e2.
+TEST_F(VWindowFunnelV2Test, testIncreaseModeEvent0Overwrite) {
+    AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
+    DataTypes data_types_3 = {
+            std::make_shared<DataTypeInt64>(),      
std::make_shared<DataTypeString>(),
+            std::make_shared<DataTypeDateTimeV2>(), 
std::make_shared<DataTypeUInt8>(),
+            std::make_shared<DataTypeUInt8>(),      
std::make_shared<DataTypeUInt8>()};
+    auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, 
false,
+                                  BeExecVersionManager::get_newest_version());
+    ASSERT_NE(agg_func_3, nullptr);
+
+    const int NUM_ROWS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    // Row 0: t=0s, Row 1: t=50s, Row 2: t=50s, Row 3: t=60s
+    VecDateTimeValue tv0, tv1, tv2, tv3;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    tv1.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+    tv2.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+    tv3.unchecked_set_time(2022, 2, 28, 0, 1, 0);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+    // Row 0: event1=T, event2=F, event3=F
+    // Row 1: event1=T, event2=F, event3=F  (duplicate event-0)
+    // Row 2: event1=F, event2=T, event3=F
+    // Row 3: event1=F, event2=F, event3=T
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(100));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_func_3->create(place);
+    const IColumn* column[6] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_func_3->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_func_3->insert_result_into(place, column_result);
+    // Chain from t=0: e1@0 -> e2@50 (50>0 ✓) -> e3@60 (60>50 ✓) = 3
+    EXPECT_EQ(column_result.get_data()[0], 3);
+    agg_func_3->destroy(place);
+}
+
+// Test INCREASE mode: later event-0 starts a better chain when early chain 
cannot complete.
+// 3 conditions, window=5s, INCREASE mode
+//   Row 0 (t=0s): event1
+//   Row 1 (t=50s): event1  (new start — old chain can't reach e2 within 5s)
+//   Row 2 (t=51s): event2
+//   Row 3 (t=52s): event3
+// Correct result: 3 (chain starting from t=50: e1@50 -> e2@51 -> e3@52)
+TEST_F(VWindowFunnelV2Test, testIncreaseModeNewChainBetter) {
+    AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
+    DataTypes data_types_3 = {
+            std::make_shared<DataTypeInt64>(),      
std::make_shared<DataTypeString>(),
+            std::make_shared<DataTypeDateTimeV2>(), 
std::make_shared<DataTypeUInt8>(),
+            std::make_shared<DataTypeUInt8>(),      
std::make_shared<DataTypeUInt8>()};
+    auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, 
false,
+                                  BeExecVersionManager::get_newest_version());
+    ASSERT_NE(agg_func_3, nullptr);
+
+    const int NUM_ROWS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    VecDateTimeValue tv0, tv1, tv2, tv3;
+    tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+    tv1.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+    tv2.unchecked_set_time(2022, 2, 28, 0, 0, 51);
+    tv3.unchecked_set_time(2022, 2, 28, 0, 0, 52);
+    auto dtv2_0 = tv0.to_datetime_v2();
+    auto dtv2_1 = tv1.to_datetime_v2();
+    auto dtv2_2 = tv2.to_datetime_v2();
+    auto dtv2_3 = tv3.to_datetime_v2();
+    column_timestamp->insert_data((char*)&dtv2_0, 0);
+    column_timestamp->insert_data((char*)&dtv2_1, 0);
+    column_timestamp->insert_data((char*)&dtv2_2, 0);
+    column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(5));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_func_3->create(place);
+    const IColumn* column[6] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get()};
+    for (int i = 0; i < NUM_ROWS; i++) {
+        agg_func_3->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_func_3->insert_result_into(place, column_result);
+    // Old chain from t=0 reaches only level 1 (e2@51 is 51s away, outside 5s 
window)
+    // New chain from t=50: e1@50 -> e2@51 (51>50 ✓, within 5s) -> e3@52 
(52>51 ✓) = 3
+    EXPECT_EQ(column_result.get_data()[0], 3);
+    agg_func_3->destroy(place);
+}
+
+// Test INCREASE mode: old chain is better than new chain (max_level 
preserved).
+// 3 conditions, window=100s, INCREASE mode
+//   Row 0 (t=0s): event1
+//   Row 1 (t=10s): event2
+//   Row 2 (t=50s): event1  (restarts chain, old chain had level=2)
+//   No more events after the restart — new chain can only reach level 1.
+// Correct result: 2 (old chain: e1@0 -> e2@10)
+TEST_F(VWindowFunnelV2Test, testIncreaseModeOldChainBetter) {
+    AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
+    DataTypes data_types_3 = {
+            std::make_shared<DataTypeInt64>(),      
std::make_shared<DataTypeString>(),
+            std::make_shared<DataTypeDateTimeV2>(), 
std::make_shared<DataTypeUInt8>(),
+            std::make_shared<DataTypeUInt8>(),      
std::make_shared<DataTypeUInt8>()};
+    auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, 
false,
+                                  BeExecVersionManager::get_newest_version());
+    ASSERT_NE(agg_func_3, nullptr);
+
+    const int NUM_ROWS = 3;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_ROWS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+    }
+    {
+        auto column_timestamp = ColumnDateTimeV2::create();
+        VecDateTimeValue tv0, tv1, tv2;
+        tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+        tv1.unchecked_set_time(2022, 2, 28, 0, 0, 10);
+        tv2.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+        auto dtv2_0 = tv0.to_datetime_v2();
+        auto dtv2_1 = tv1.to_datetime_v2();
+        auto dtv2_2 = tv2.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2_0, 0);
+        column_timestamp->insert_data((char*)&dtv2_1, 0);
+        column_timestamp->insert_data((char*)&dtv2_2, 0);
+
+        // Row 0: e1=T, Row 1: e2=T, Row 2: e1=T (restart)
+        auto column_event1 = ColumnUInt8::create();
+        column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+        column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+        column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+        auto column_event2 = ColumnUInt8::create();
+        column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+        column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+        column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+        auto column_event3 = ColumnUInt8::create();
+        column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+        column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+        column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+        auto column_window = ColumnInt64::create();
+        for (int i = 0; i < NUM_ROWS; i++) {
+            column_window->insert(Field::create_field<TYPE_BIGINT>(100));
+        }
+
+        std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_func_3->create(place);
+        const IColumn* column[6] = {column_window.get(),    column_mode.get(),
+                                    column_timestamp.get(), 
column_event1.get(),
+                                    column_event2.get(),    
column_event3.get()};
+        for (int i = 0; i < NUM_ROWS; i++) {
+            agg_func_3->add(place, column, i, arena);
+        }
+
+        ColumnInt32 column_result;
+        agg_func_3->insert_result_into(place, column_result);
+        // Old chain: e1@0 -> e2@10 = level 2
+        // New chain from t=50: only e1@50, no further events = level 1
+        // max(2, 1) = 2
+        EXPECT_EQ(column_result.get_data()[0], 2);
+        agg_func_3->destroy(place);
+    }
+    {
+        auto column_timestamp = ColumnDateTimeV2::create();
+        VecDateTimeValue tv0, tv1, tv2, tv3;
+        tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+        tv1.unchecked_set_time(2022, 2, 28, 0, 0, 10);
+        tv2.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+        tv3.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+        auto dtv2_0 = tv0.to_datetime_v2();
+        auto dtv2_1 = tv1.to_datetime_v2();
+        auto dtv2_2 = tv2.to_datetime_v2();
+        auto dtv2_3 = tv3.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2_0, 0);
+        column_timestamp->insert_data((char*)&dtv2_1, 0);
+        column_timestamp->insert_data((char*)&dtv2_2, 0);
+        column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+        auto column_event1 = ColumnUInt8::create();
+        column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+        column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+        column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+        column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+        auto column_event2 = ColumnUInt8::create();
+        column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+        column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+        column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+        column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+        auto column_event3 = ColumnUInt8::create();
+        column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+        column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+        column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+        column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+        auto column_window = ColumnInt64::create();
+        for (int i = 0; i < NUM_ROWS; i++) {
+            column_window->insert(Field::create_field<TYPE_BIGINT>(100));
+        }
+
+        std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_func_3->create(place);
+        const IColumn* column[6] = {column_window.get(),    column_mode.get(),
+                                    column_timestamp.get(), 
column_event1.get(),
+                                    column_event2.get(),    
column_event3.get()};
+        for (int i = 0; i < NUM_ROWS; i++) {
+            agg_func_3->add(place, column, i, arena);
+        }
+
+        ColumnInt32 column_result;
+        agg_func_3->insert_result_into(place, column_result);
+        // Old chain: e1@0 -> e2@10 = level 2
+        // New chain from t=50: only e1@50, e3@50 can't advance (no e2 matched)
+        // max(2, 1) = 2
+        EXPECT_EQ(column_result.get_data()[0], 2);
+        agg_func_3->destroy(place);
+    }
+}
+
+} // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
index f7f01eaeda0..f98c3e14df8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
@@ -92,6 +92,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.agg.TopNWeighted;
 import org.apache.doris.nereids.trees.expressions.functions.agg.Variance;
 import org.apache.doris.nereids.trees.expressions.functions.agg.VarianceSamp;
 import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnel;
+import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnelV2;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -187,7 +188,8 @@ public class BuiltinAggregateFunctions implements 
FunctionHelper {
                 agg(TopNWeighted.class, "topn_weighted"),
                 agg(Variance.class, "var_pop", "variance_pop", "variance"),
                 agg(VarianceSamp.class, "var_samp", "variance_samp"),
-                agg(WindowFunnel.class, "window_funnel")
+                agg(WindowFunnel.class, "window_funnel_v1"),
+                agg(WindowFunnelV2.class, "window_funnel_v2", "window_funnel")
         );
 
         ImmutableMap.Builder<String, Boolean> aggFuncNameNullableMapBuilder
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
index fe68663ff5f..848d3cfd6d5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
@@ -65,7 +65,7 @@ public class WindowFunnel extends NullableAggregateFunction
 
     public WindowFunnel(boolean distinct, boolean alwaysNullable, Expression 
arg0, Expression arg1, Expression arg2,
             Expression arg3, Expression... varArgs) {
-        super("window_funnel", distinct, alwaysNullable,
+        super("window_funnel_v1", distinct, alwaysNullable,
                 ExpressionUtils.mergeArguments(arg0, arg1, arg2, arg3, 
varArgs));
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnelV2.java
similarity index 74%
copy from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnelV2.java
index fe68663ff5f..fa4f5d8e469 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnelV2.java
@@ -36,11 +36,14 @@ import com.google.common.collect.ImmutableList;
 import java.util.List;
 
 /**
- * AggregateFunction 'window_funnel'. This class is generated by 
GenerateFunction.
+ * AggregateFunction 'window_funnel_v2'. V2 implementation that only stores 
matched events
+ * as (timestamp, event_index) pairs, dramatically reducing memory usage 
compared to V1.
  */
-public class WindowFunnel extends NullableAggregateFunction
+public class WindowFunnelV2 extends NullableAggregateFunction
         implements ExplicitlyCastableSignature {
 
+    public static final int MAX_EVENT_CONDITIONS = 127;
+
     public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
             FunctionSignature.ret(IntegerType.INSTANCE)
                     .varArgs(BigIntType.INSTANCE, StringType.INSTANCE, 
DateTimeV2Type.WILDCARD,
@@ -51,32 +54,38 @@ public class WindowFunnel extends NullableAggregateFunction
     /**
      * constructor with 4 or more arguments.
      */
-    public WindowFunnel(Expression arg0, Expression arg1, Expression arg2, 
Expression arg3, Expression... varArgs) {
+    public WindowFunnelV2(Expression arg0, Expression arg1, Expression arg2, 
Expression arg3,
+            Expression... varArgs) {
         this(false, arg0, arg1, arg2, arg3, varArgs);
     }
 
     /**
      * constructor with 4 or more arguments.
      */
-    public WindowFunnel(boolean distinct, Expression arg0, Expression arg1, 
Expression arg2,
+    public WindowFunnelV2(boolean distinct, Expression arg0, Expression arg1, 
Expression arg2,
             Expression arg3, Expression... varArgs) {
         this(distinct, false, arg0, arg1, arg2, arg3, varArgs);
     }
 
-    public WindowFunnel(boolean distinct, boolean alwaysNullable, Expression 
arg0, Expression arg1, Expression arg2,
+    public WindowFunnelV2(boolean distinct, boolean alwaysNullable, Expression 
arg0, Expression arg1, Expression arg2,
             Expression arg3, Expression... varArgs) {
-        super("window_funnel", distinct, alwaysNullable,
+        super("window_funnel_v2", distinct, alwaysNullable,
                 ExpressionUtils.mergeArguments(arg0, arg1, arg2, arg3, 
varArgs));
     }
 
     /** constructor for withChildren and reuse signature */
-    private WindowFunnel(NullableAggregateFunctionParams functionParams) {
+    private WindowFunnelV2(NullableAggregateFunctionParams functionParams) {
         super(functionParams);
     }
 
     @Override
     public void checkLegalityBeforeTypeCoercion() {
         String functionName = getName();
+        int eventCount = arity() - 3;
+        if (eventCount > MAX_EVENT_CONDITIONS) {
+            throw new AnalysisException("The " + functionName + " function 
supports at most "
+                    + MAX_EVENT_CONDITIONS + " event conditions, but got " + 
eventCount);
+        }
         if (!getArgumentType(0).isIntegerLikeType()) {
             throw new AnalysisException("The window params of " + functionName 
+ " function must be integer");
         }
@@ -109,19 +118,19 @@ public class WindowFunnel extends 
NullableAggregateFunction
      * withDistinctAndChildren.
      */
     @Override
-    public WindowFunnel withDistinctAndChildren(boolean distinct, 
List<Expression> children) {
+    public WindowFunnelV2 withDistinctAndChildren(boolean distinct, 
List<Expression> children) {
         Preconditions.checkArgument(children.size() >= 4);
-        return new WindowFunnel(getFunctionParams(distinct, children));
+        return new WindowFunnelV2(getFunctionParams(distinct, children));
     }
 
     @Override
-    public WindowFunnel withAlwaysNullable(boolean alwaysNullable) {
-        return new 
WindowFunnel(getAlwaysNullableFunctionParams(alwaysNullable));
+    public WindowFunnelV2 withAlwaysNullable(boolean alwaysNullable) {
+        return new 
WindowFunnelV2(getAlwaysNullableFunctionParams(alwaysNullable));
     }
 
     @Override
     public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
-        return visitor.visitWindowFunnel(this, context);
+        return visitor.visitWindowFunnelV2(this, context);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
index 5493bdc44f9..e8b3ae193db 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
@@ -90,6 +90,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.agg.TopNWeighted;
 import org.apache.doris.nereids.trees.expressions.functions.agg.Variance;
 import org.apache.doris.nereids.trees.expressions.functions.agg.VarianceSamp;
 import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnel;
+import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnelV2;
 import 
org.apache.doris.nereids.trees.expressions.functions.combinator.ForEachCombinator;
 import 
org.apache.doris.nereids.trees.expressions.functions.combinator.MergeCombinator;
 import 
org.apache.doris.nereids.trees.expressions.functions.combinator.UnionCombinator;
@@ -388,6 +389,10 @@ public interface AggregateFunctionVisitor<R, C> {
         return visitNullableAggregateFunction(windowFunnel, context);
     }
 
+    default R visitWindowFunnelV2(WindowFunnelV2 windowFunnelV2, C context) {
+        return visitNullableAggregateFunction(windowFunnelV2, context);
+    }
+
     default R visitMergeCombinator(MergeCombinator combinator, C context) {
         return visitAggregateFunction(combinator, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java
index d70364e4f83..dd159db263b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java
@@ -60,6 +60,7 @@ public class AggStateType extends DataType {
             .put("var_samp", "variance_samp")
             .put("hist", "histogram")
             .put("map_agg", "map_agg_v2")
+            .put("window_funnel", "window_funnel_v2")
             .build();
 
     private final List<DataType> subTypes;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/CheckExpressionLegalityTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/CheckExpressionLegalityTest.java
index 51ef79d2874..c70f3844e59 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/CheckExpressionLegalityTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/CheckExpressionLegalityTest.java
@@ -21,15 +21,24 @@ import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.rules.expression.CheckLegalityAfterRewrite;
 import org.apache.doris.nereids.rules.expression.ExpressionRewrite;
+import org.apache.doris.nereids.trees.expressions.Expression;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.BitmapUnionCount;
 import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnelV2;
+import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
 import org.apache.doris.nereids.util.MemoPatternMatchSupported;
 import org.apache.doris.nereids.util.MemoTestUtils;
 import org.apache.doris.nereids.util.PlanChecker;
 import org.apache.doris.qe.ConnectContext;
 
+import com.google.common.collect.ImmutableList;
 import org.junit.jupiter.api.Test;
 
+import java.util.List;
+
 public class CheckExpressionLegalityTest implements MemoPatternMatchSupported {
     @Test
     public void testAvg() {
@@ -71,4 +80,22 @@ public class CheckExpressionLegalityTest implements 
MemoPatternMatchSupported {
                                 .applyBottomUp(new 
ExpressionRewrite(CheckLegalityAfterRewrite.INSTANCE))
         );
     }
+
+    @Test
+    public void testWindowFunnelV2TooManyConditions() {
+        List<Expression> arguments = ImmutableList.<Expression>builder()
+                .add(new IntegerLiteral(10))
+                .add(new StringLiteral("default"))
+                .add(new DateTimeLiteral("2022-02-28 00:00:00"))
+                .addAll(ImmutableList.copyOf(java.util.Collections.nCopies(
+                        WindowFunnelV2.MAX_EVENT_CONDITIONS + 1, 
BooleanLiteral.TRUE)))
+                .build();
+        WindowFunnelV2 expression = new WindowFunnelV2(arguments.get(0), 
arguments.get(1),
+                arguments.get(2), arguments.get(3), arguments.subList(4, 
arguments.size())
+                        .toArray(new Expression[0]));
+
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
+                "supports at most " + WindowFunnelV2.MAX_EVENT_CONDITIONS + " 
event conditions",
+                expression::checkLegalityBeforeTypeCoercion);
+    }
 }
diff --git a/regression-test/data/nereids_p0/aggregate/window_funnel.out 
b/regression-test/data/nereids_p0/aggregate/window_funnel.out
index f9c24637999..4c8c09490bb 100644
--- a/regression-test/data/nereids_p0/aggregate/window_funnel.out
+++ b/regression-test/data/nereids_p0/aggregate/window_funnel.out
@@ -117,7 +117,7 @@
 100127 2
 
 -- !window_funnel_fixed1 --
-100123 2
+100123 4
 100125 3
 100126 2
 100127 2
diff --git a/regression-test/data/nereids_p0/aggregate/window_funnel_v2.out 
b/regression-test/data/nereids_p0/aggregate/window_funnel_v2.out
new file mode 100644
index 00000000000..5d91239ad73
--- /dev/null
+++ b/regression-test/data/nereids_p0/aggregate/window_funnel_v2.out
@@ -0,0 +1,88 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !v2_default_small_window --
+1
+
+-- !v2_default_large_window --
+2
+
+-- !v2_datetimev2_small --
+1
+
+-- !v2_datetimev2_large --
+2
+
+-- !v2_multi_user_default0 --
+100123 4
+100125 3
+100126 2
+100127 2
+
+-- !v2_multi_user_default1 --
+100123 3
+100125 3
+100126 2
+100127 2
+
+-- !v2_multi_user_default2 --
+100123 1
+100125 1
+100126 1
+100127 1
+
+-- !v2_default_neq --
+100123 4
+100125 3
+100126 2
+100127 2
+
+-- !v2_default_complex --
+100123 4
+100125 2
+100126 0
+100127 1
+
+-- !v2_deduplication0 --
+100123 3
+100125 3
+100126 2
+100127 2
+
+-- !v2_deduplication1 --
+100123 3
+100125 3
+100126 2
+100127 2
+
+-- !v2_fixed0 --
+100123 4
+100125 3
+100126 2
+100127 2
+
+-- !v2_fixed_reorder --
+2
+
+-- !v2_increase0 --
+100123 3
+100125 3
+100126 2
+100127 2
+
+-- !v2_increase_same_ts --
+2
+
+-- !v2_fixed_vs_v1 --
+100123 4
+
+-- !v2_explicit_name --
+100123 4
+
+-- !v2_increase_event0_overwrite --
+3
+
+-- !v2_increase_new_chain_better --
+3
+
+-- !v2_increase_old_chain_better --
+2
+
diff --git 
a/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out
 
b/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out
index eb048bacbb6..3c3f674ce5f 100644
--- 
a/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out
+++ 
b/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out
@@ -807,11 +807,11 @@ shanxi    windows 1
 2      5
 
 -- !agg_window_window_funnel --
-100123 2
-100123 2
-100123 2
-100123 2
-100123 2
+100123 4
+100123 4
+100123 4
+100123 4
+100123 4
 100125 3
 100125 3
 100125 3
diff --git 
a/regression-test/suites/nereids_p0/aggregate/window_funnel_v2.groovy 
b/regression-test/suites/nereids_p0/aggregate/window_funnel_v2.groovy
new file mode 100644
index 00000000000..b43b2584337
--- /dev/null
+++ b/regression-test/suites/nereids_p0/aggregate/window_funnel_v2.groovy
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("window_funnel_v2") {
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+
+    // ==================== Basic DEFAULT mode tests ====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+            xwho varchar(50) NULL COMMENT 'xwho',
+            xwhen datetime COMMENT 'xwhen',
+            xwhat int NULL COMMENT 'xwhat'
+        )
+        DUPLICATE KEY(xwho)
+        DISTRIBUTED BY HASH(xwho) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:41:00', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 16:15:01', 3)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 19:05:04', 4)"
+
+    // window=1 second, only event1 matches (events too far apart)
+    order_qt_v2_default_small_window """
+        select
+            window_funnel(
+                1,
+                'default',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+    // window=20000 seconds, both events match
+    order_qt_v2_default_large_window """
+        select
+            window_funnel(
+                20000,
+                'default',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // ==================== DateTimeV2 precision test ====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+            xwho varchar(50) NULL COMMENT 'xwho',
+            xwhen datetimev2(3) COMMENT 'xwhen',
+            xwhat int NULL COMMENT 'xwhat'
+        )
+        DUPLICATE KEY(xwho)
+        DISTRIBUTED BY HASH(xwho) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:41:00.111111', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02.111111', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 16:15:01.111111', 3)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 19:05:04.111111', 4)"
+
+    order_qt_v2_datetimev2_small """
+        select
+            window_funnel(
+                1,
+                'default',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+    order_qt_v2_datetimev2_large """
+        select
+            window_funnel(
+                20000,
+                'default',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // ==================== Multi-user default mode tests ====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE windowfunnel_v2_test(
+            user_id BIGINT,
+            event_name VARCHAR(64),
+            event_timestamp datetime,
+            phone_brand varchar(64),
+            tab_num int
+        ) distributed by hash(user_id) buckets 3 
properties("replication_num"="1");
+    """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 3),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+            (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+            (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+            (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+            (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+            (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+            (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+            (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+    """
+    // 3 hour window
+    order_qt_v2_multi_user_default0 """
+        SELECT
+            user_id,
+            window_funnel(3600 * 3, "default", event_timestamp, event_name = 
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+    // 5 minute window
+    order_qt_v2_multi_user_default1 """
+        SELECT
+            user_id,
+            window_funnel(300, "default", event_timestamp, event_name = '登录', 
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+    // 30 second window
+    order_qt_v2_multi_user_default2 """
+        SELECT
+            user_id,
+            window_funnel(30, "default", event_timestamp, event_name = '登录', 
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+    // complicate expressions with != condition
+    order_qt_v2_default_neq """
+        SELECT
+            user_id,
+            window_funnel(3600000000, "default", event_timestamp, event_name = 
'登录', event_name != '登陆', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id;
+    """
+    // Complex filter conditions
+    order_qt_v2_default_complex """
+        SELECT
+            user_id,
+            window_funnel(3600000000, "default", event_timestamp,
+                          event_name = '登录' AND phone_brand in ('HONOR', 
'XIAOMI', 'VIVO') AND tab_num not in (4, 5),
+                          event_name = '访问' AND tab_num not in (4, 5),
+                          event_name = '下单' AND tab_num not in (6, 7),
+                          event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id;
+    """
+
+    // ==================== DEDUPLICATION mode tests ====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE windowfunnel_v2_test(
+            user_id BIGINT,
+            event_name VARCHAR(64),
+            event_timestamp datetime,
+            phone_brand varchar(64),
+            tab_num int
+        ) distributed by hash(user_id) buckets 3 
properties("replication_num"="1");
+    """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '登录', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '登录1', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '登录2', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '登录3', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '登录4', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '登录5', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+            (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+            (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+            (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+            (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+            (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+            (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+            (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+    """
+    order_qt_v2_deduplication0 """
+        SELECT
+            user_id,
+            window_funnel(3600, "deduplication", event_timestamp, event_name = 
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // Test dedup with duplicate event2 (访问)
+    sql """ truncate table windowfunnel_v2_test; """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '登录1', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '访问', '2022-05-14 10:04:00', 'HONOR', 3),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+            (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+            (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+            (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+            (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+            (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+            (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+            (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+    """
+    order_qt_v2_deduplication1 """
+        SELECT
+            user_id,
+            window_funnel(3600, "deduplication", event_timestamp, event_name = 
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // ==================== FIXED mode tests (StarRocks-style semantics) 
====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE windowfunnel_v2_test(
+            user_id BIGINT,
+            event_name VARCHAR(64),
+            event_timestamp datetime,
+            phone_brand varchar(64),
+            tab_num int
+        ) distributed by hash(user_id) buckets 3 
properties("replication_num"="1");
+    """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+            (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+            (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+            (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+            (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+            (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+            (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+            (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+    """
+    // Note: In V2 fixed mode (StarRocks-style), unmatched rows don't break 
the chain.
+    // The chain only breaks when a matched event's predecessor level hasn't 
been matched.
+    order_qt_v2_fixed0 """
+        SELECT
+            user_id,
+            window_funnel(3600, "fixed", event_timestamp, event_name = '登录', 
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // Test fixed mode where event order in conditions doesn't match data order
+    sql """ truncate table windowfunnel_v2_test; """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4);
+    """
+    order_qt_v2_fixed_reorder """
+        select
+            window_funnel(
+                20000,
+                'fixed',
+                t.event_timestamp,
+                t.event_name = '登录',
+                t.event_name = '访问',
+                t.event_name = '付款',
+                t.event_name = '下单'
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // ==================== INCREASE mode tests ====================
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE windowfunnel_v2_test(
+            user_id BIGINT,
+            event_name VARCHAR(64),
+            event_timestamp datetime,
+            phone_brand varchar(64),
+            tab_num int
+        ) distributed by hash(user_id) buckets 3 
properties("replication_num"="1");
+    """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '付款', '2022-05-14 10:04:00', 'HONOR', 4),
+            (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+            (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+            (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+            (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+            (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+            (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+            (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+    """
+    order_qt_v2_increase0 """
+        SELECT
+            user_id,
+            window_funnel(3600, "increase", event_timestamp, event_name = 
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // Test increase mode with same-timestamp events
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+            xwho varchar(50) NULL COMMENT 'xwho',
+            xwhen datetimev2(3) COMMENT 'xwhen',
+            xwhat int NULL COMMENT 'xwhat'
+        )
+        DUPLICATE KEY(xwho)
+        DISTRIBUTED BY HASH(xwho) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:41:00.111111', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02.111111', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02.111111', 3)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 15:05:04.111111', 4)"
+    order_qt_v2_increase_same_ts """
+        select
+            window_funnel(
+                20000,
+                'increase',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2,
+                t.xwhat = 3,
+                t.xwhat = 4
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // ==================== V2 FIXED mode key difference from V1 
====================
+    // In V1, unmatched rows (rows that match no event condition) break the 
chain in FIXED mode.
+    // In V2, unmatched rows are not stored, so only matched events with level 
jumps break the chain.
+    // This test shows the behavioral difference.
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE windowfunnel_v2_test(
+            user_id BIGINT,
+            event_name VARCHAR(64),
+            event_timestamp datetime,
+            phone_brand varchar(64),
+            tab_num int
+        ) distributed by hash(user_id) buckets 3 
properties("replication_num"="1");
+    """
+    sql """
+        INSERT INTO windowfunnel_v2_test VALUES
+            (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+            (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+            (100123, '登录2', '2022-05-14 10:03:00', 'HONOR', 3),
+            (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+            (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4);
+    """
+    // V2 fixed mode: 登录2 doesn't match any condition, so it's not stored.
+    // The chain 登录->访问->下单->付款 is unbroken because there are no level jumps.
+    // V1 would return 2 here (登录2 physically breaks adjacency), V2 returns 4.
+    order_qt_v2_fixed_vs_v1 """
+        SELECT
+            user_id,
+            window_funnel(3600, "fixed", event_timestamp, event_name = '登录', 
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // ==================== Test using window_funnel_v2 explicit name 
====================
+    order_qt_v2_explicit_name """
+        SELECT
+            user_id,
+            window_funnel_v2(3600, "fixed", event_timestamp, event_name = 
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+        FROM windowfunnel_v2_test
+        GROUP BY user_id
+        order BY user_id
+    """
+
+    // ==================== INCREASE mode: event-0 re-occurrence bug fix 
====================
+    // Regression test for the bug where a later event-0 overwrites 
events_timestamp[0]
+    // and breaks the INCREASE mode strict-increase check for an already-valid 
chain.
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+    sql """
+        CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+            xwho varchar(50) NULL COMMENT 'xwho',
+            xwhen datetimev2(3) COMMENT 'xwhen',
+            xwhat int NULL COMMENT 'xwhat'
+        )
+        DUPLICATE KEY(xwho)
+        DISTRIBUTED BY HASH(xwho) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+    // Case 1: Old chain (from t=0) is valid and completes all 3 levels.
+    // The duplicate event-0 at t=50 should not destroy it.
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:00.000', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:50.000', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:50.000', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:01:00.000', 3)"
+    order_qt_v2_increase_event0_overwrite """
+        select
+            window_funnel(
+                100,
+                'increase',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2,
+                t.xwhat = 3
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // Case 2: Old chain can't complete (window too small), new chain is 
better.
+    sql """ truncate table windowfunnel_v2_test; """
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:00.000', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:50.000', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:51.000', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:52.000', 3)"
+    order_qt_v2_increase_new_chain_better """
+        select
+            window_funnel(
+                5,
+                'increase',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2,
+                t.xwhat = 3
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    // Case 3: Old chain is better (reached level 2), new chain only reaches 
level 1.
+    sql """ truncate table windowfunnel_v2_test; """
+    sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:00.000', 1)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:10.000', 2)"
+    sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:00:50.000', 1)"
+    order_qt_v2_increase_old_chain_better """
+        select
+            window_funnel(
+                100,
+                'increase',
+                t.xwhen,
+                t.xwhat = 1,
+                t.xwhat = 2,
+                t.xwhat = 3
+            ) AS level
+        from windowfunnel_v2_test t;
+    """
+
+    sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to