This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8644573b8f7 branch-3.0: [bug](function) fix first/last value return 
error with ignore null #44996 (#45344)
8644573b8f7 is described below

commit 8644573b8f7ee18a8d9517207fdd8783a3876160
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Dec 16 19:56:45 2024 +0800

    branch-3.0: [bug](function) fix first/last value return error with ignore 
null #44996 (#45344)
    
    Cherry-picked from #44996
    
    Co-authored-by: zhangstar333 <zhangs...@selectdb.com>
---
 be/src/pipeline/exec/analytic_source_operator.cpp  |  14 +-
 .../aggregate_function_reader_first_last.h         |   2 +
 .../aggregate_function_window.cpp                  |   2 +
 .../aggregate_function_window.h                    |  49 +++---
 .../correctness_p0/test_first_value_window.out     | 100 +++++++++++
 .../correctness_p0/test_first_value_window.groovy  | 189 +++++++++++++++++++++
 6 files changed, 328 insertions(+), 28 deletions(-)

diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 643dbe82185..06a6374bbae 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -348,17 +348,17 @@ Status AnalyticLocalState::_get_next_for_rows(size_t 
current_block_rows) {
         int64_t range_start, range_end;
         if 
(!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start &&
             _parent->cast<AnalyticSourceOperatorX>()._window.window_end.type ==
-                    TAnalyticWindowBoundaryType::
-                            CURRENT_ROW) { //[preceding, 
current_row],[current_row, following]
+                    TAnalyticWindowBoundaryType::CURRENT_ROW) {
+            // [preceding, current_row], [current_row, following] rewrite it's 
same
+            // as could reuse the previous calculate result, so don't call 
_reset_agg_status function
+            // going on calculate, add up data, no need to reset state
             range_start = _shared_state->current_row_position;
-            range_end = _shared_state->current_row_position +
-                        1; //going on calculate,add up data, no need to reset 
state
+            range_end = _shared_state->current_row_position + 1;
         } else {
             _reset_agg_status();
             range_end = _shared_state->current_row_position + _rows_end_offset 
+ 1;
-            if (!_parent->cast<AnalyticSourceOperatorX>()
-                         ._window.__isset
-                         .window_start) { //[preceding, offset]        
--unbound: [preceding, following]
+            //[preceding, offset]        --unbound: [preceding, following]
+            if 
(!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start) {
                 range_start = _partition_by_start.pos;
             } else {
                 range_start = _shared_state->current_row_position + 
_rows_start_offset;
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h 
b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
index 1a6ac288583..cc97581d799 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
@@ -144,6 +144,8 @@ public:
 
     bool has_set_value() { return _has_value; }
 
+    bool is_null() { return _data_value.is_null(); }
+
 protected:
     StoreType _data_value;
     bool _has_value = false;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_window.cpp
index 9da838a6b90..2d10083488b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.cpp
@@ -40,6 +40,8 @@ AggregateFunctionPtr 
create_function_lead_lag_first_last(const String& name,
     WhichDataType which(*type);
 
     bool arg_ignore_null_value = false;
+    // FE have rewrite case first_value(k1,false)--->first_value(k1)
+    // so size is 2, must will be arg_ignore_null_value
     if (argument_types.size() == 2) {
         DCHECK(name == "first_value" || name == "last_value") << "invalid 
function name: " << name;
         arg_ignore_null_value = true;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h 
b/be/src/vec/aggregate_functions/aggregate_function_window.h
index cb038fe3116..10eb3866ee3 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -454,31 +454,28 @@ struct WindowFunctionLagImpl : Data {
     static const char* name() { return "lag"; }
 };
 
-// TODO: first_value && last_value in some corner case will be core,
-// if need to simply change it, should set them to always nullable insert into 
null value, and register in cpp maybe be change
-// But it's may be another better way to handle it
 template <typename Data, bool arg_ignore_null = false>
 struct WindowFunctionFirstImpl : Data {
     void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
                                 int64_t frame_end, const IColumn** columns) {
-        if (this->has_set_value()) {
+        // case 1: (has_set_value() = true && arg_ignore_null = false)
+        // case 2: (has_set_value() = true && arg_ignore_null = true && 
is_null() = false)
+        if ((this->has_set_value()) &&
+            (!arg_ignore_null || (arg_ignore_null && !this->is_null()))) {
             return;
         }
-        if (frame_start <= frame_end &&
-            frame_end <= partition_start) { //rewrite last_value when under 
partition
-            this->set_is_null();            //so no need more judge
+        DCHECK_LE(frame_start, frame_end);
+        if (frame_start >= partition_end || frame_end <= partition_start) {
+            this->set_is_null();
             return;
         }
         frame_start = std::max<int64_t>(frame_start, partition_start);
 
         if constexpr (arg_ignore_null) {
             frame_end = std::min<int64_t>(frame_end, partition_end);
-
-            auto& second_arg = assert_cast<const 
ColumnVector<UInt8>&>(*columns[1]);
-            auto ignore_null_value = second_arg.get_data()[0];
-
-            if (ignore_null_value && columns[0]->is_nullable()) {
-                auto& arg_nullable = assert_cast<const 
ColumnNullable&>(*columns[0]);
+            if (columns[0]->is_nullable()) {
+                const auto& arg_nullable = assert_cast<const 
ColumnNullable&>(*columns[0]);
+                // the valid range is: [frame_start, frame_end)
                 while (frame_start < frame_end - 1 && 
arg_nullable.is_null_at(frame_start)) {
                     frame_start++;
                 }
@@ -504,15 +501,25 @@ struct WindowFunctionLastImpl : Data {
 
         if constexpr (arg_ignore_null) {
             frame_start = std::max<int64_t>(frame_start, partition_start);
-
-            auto& second_arg = assert_cast<const 
ColumnVector<UInt8>&>(*columns[1]);
-            auto ignore_null_value = second_arg.get_data()[0];
-
-            if (ignore_null_value && columns[0]->is_nullable()) {
-                auto& arg_nullable = assert_cast<const 
ColumnNullable&>(*columns[0]);
-                while (frame_start < (frame_end - 1) && 
arg_nullable.is_null_at(frame_end - 1)) {
-                    frame_end--;
+            if (columns[0]->is_nullable()) {
+                const auto& arg_nullable = assert_cast<const 
ColumnNullable&>(*columns[0]);
+                // wants find a not null value in [frame_start, frame_end)
+                // iff has find: set_value and return directly
+                // iff not find: the while loop is finished
+                //     case 1: iff has_set_value, means the previous window 
have value, could reuse it, so return directly
+                //     case 2: iff not has_set_value, means there is none 
value, set it's to NULL
+                while (frame_start < frame_end) {
+                    if (arg_nullable.is_null_at(frame_end - 1)) {
+                        frame_end--;
+                    } else {
+                        this->set_value(columns, frame_end - 1);
+                        return;
+                    }
                 }
+                if (!this->has_set_value()) {
+                    this->set_is_null();
+                }
+                return;
             }
         }
 
diff --git a/regression-test/data/correctness_p0/test_first_value_window.out 
b/regression-test/data/correctness_p0/test_first_value_window.out
index 9951ad95c60..73dbcf3ed34 100644
--- a/regression-test/data/correctness_p0/test_first_value_window.out
+++ b/regression-test/data/correctness_p0/test_first_value_window.out
@@ -41,3 +41,103 @@
 11     23      04-23-13        \N      10      10      10
 12     24      02-24-10-21     \N      \N      \N      \N
 
+-- !select_default4 --
+a      1       1       1       0
+a      \N      1       \N      1
+a      \N      1       \N      2
+a      \N      1       \N      3
+b      \N      \N      \N      4
+b      3       3       3       5
+b      \N      3       \N      6
+b      2       2       2       7
+
+-- !select_default5 --
+a      \N      \N      \N      0
+a      1       1       \N      1
+a      \N      1       \N      2
+a      \N      1       \N      3
+b      \N      \N      \N      4
+b      3       3       \N      5
+b      \N      3       \N      6
+b      2       3       \N      7
+
+-- !select_default_desc --
+a      2       3
+a      \N      2
+a      \N      1
+a      1       0
+b      2       7
+b      \N      6
+b      3       5
+b      \N      4
+
+-- !select_default_asc --
+a      1       0
+a      \N      1
+a      \N      2
+a      2       3
+b      \N      4
+b      3       5
+b      \N      6
+b      2       7
+
+-- !select_default_last_rewrite_first --
+a      1       1       0
+a      \N      1       1
+a      \N      1       2
+a      2       1       3
+b      \N      \N      4
+b      3       3       5
+b      \N      3       6
+b      2       3       7
+
+-- !select_default6 --
+a      \N      2       \N      0
+a      1       2       1       1
+a      2       2       2       2
+a      \N      2       2       3
+b      \N      2       \N      4
+b      3       2       3       5
+b      \N      2       3       6
+b      2       2       2       7
+
+-- !select_default_last_rewrite_first2 --
+a      1       1       0
+a      \N      1       1
+a      \N      1       2
+a      2       2       3
+b      \N      \N      4
+b      3       3       5
+b      \N      3       6
+b      2       2       7
+
+-- !select_default7 --
+a      1       1       1       1       1       0
+a      \N      1       1       1       1       1
+a      \N      1       1       1       1       2
+a      2       2       2       2       1       3
+b      \N      \N      \N      \N      \N      4
+b      3       3       3       3       3       5
+b      \N      3       3       3       3       6
+b      2       2       2       2       3       7
+
+-- !select_default8 --
+a      1       2       0
+a      \N      \N      1
+a      \N      \N      2
+a      2       \N      3
+b      \N      2       4
+b      3       \N      5
+b      \N      \N      6
+b      2       \N      7
+
+-- !select_default9 --
+a      1       2       0
+a      \N      \N      1
+a      \N      \N      2
+a      2       \N      3
+b      \N      2       4
+b      3       \N      5
+b      \N      \N      6
+b      2       \N      7
+
diff --git 
a/regression-test/suites/correctness_p0/test_first_value_window.groovy 
b/regression-test/suites/correctness_p0/test_first_value_window.groovy
index 8d0a3097056..7c1582e0e61 100644
--- a/regression-test/suites/correctness_p0/test_first_value_window.groovy
+++ b/regression-test/suites/correctness_p0/test_first_value_window.groovy
@@ -159,4 +159,193 @@ suite("test_first_value_window") {
             ,first_value(`state`, 1) over(partition by `myday` order by 
`time_col` rows between 1 preceding and 1 following) v3
         from ${tableName3} order by `id`, `myday`, `time_col`;
     """
+
+    qt_select_default4 """
+        SELECT uid
+            ,amt
+            ,LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC 
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt1
+            ,LAST_VALUE(amt, false) OVER(PARTITION BY uid ORDER BY time_s ASC 
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt2
+            ,time_s
+        FROM (
+            SELECT 'a' AS uid, 1    AS amt, 0 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+            SELECT 'a' AS uid, null    AS amt, 3 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+            SELECT 'b' AS uid, 3    AS amt, 5 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+            SELECT 'b' AS uid, 2    AS amt, 7 AS time_s 
+            ) t
+        ORDER BY uid, time_s
+        ;
+    """
+
+    qt_select_default5 """
+        SELECT uid
+            ,amt
+            ,FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC 
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt1
+            ,FIRST_VALUE(amt, false) OVER(PARTITION BY uid ORDER BY time_s ASC 
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt2
+            ,time_s
+        FROM (
+            SELECT 'a' AS uid, NULL    AS amt, 0 AS time_s UNION ALL
+            SELECT 'a' AS uid, 1 AS amt, 1 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+            SELECT 'a' AS uid, null    AS amt, 3 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+            SELECT 'b' AS uid, 3    AS amt, 5 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+            SELECT 'b' AS uid, 2    AS amt, 7 AS time_s 
+            ) t
+        ORDER BY uid, time_s
+        ;
+    """
+    qt_select_default_desc """
+        SELECT uid
+            ,amt
+            ,time_s
+        FROM (
+            SELECT 'a' AS uid, 1    AS amt, 0 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+            SELECT 'a' AS uid, 2    AS amt, 3 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+            SELECT 'b' AS uid, 3    AS amt, 5 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+            SELECT 'b' AS uid, 2    AS amt, 7 AS time_s 
+            ) t
+            order by uid,time_s desc;
+    """
+
+    qt_select_default_asc """
+        SELECT uid
+            ,amt
+            ,time_s
+        FROM (
+            SELECT 'a' AS uid, 1    AS amt, 0 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+            SELECT 'a' AS uid, 2    AS amt, 3 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+            SELECT 'b' AS uid, 3    AS amt, 5 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+            SELECT 'b' AS uid, 2    AS amt, 7 AS time_s 
+            ) t
+            order by uid,time_s ASC;
+    """
+
+    // FIRST_VALUE: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+    qt_select_default_last_rewrite_first """ 
+            SELECT uid
+        ,amt
+        ,(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s DESC 
ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt3
+        ,time_s
+    FROM (
+        SELECT 'a' AS uid, 1    AS amt, 0 AS time_s UNION ALL
+        SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+        SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+        SELECT 'a' AS uid, 2    AS amt, 3 AS time_s UNION ALL
+        SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+        SELECT 'b' AS uid, 3    AS amt, 5 AS time_s UNION ALL
+        SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+        SELECT 'b' AS uid, 2    AS amt, 7 AS time_s 
+        ) t
+    ORDER BY uid, time_s;
+    """
+
+    qt_select_default6 """
+        SELECT uid
+        ,amt
+        ,LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED following) amt1
+        ,LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt2
+        ,time_s
+    FROM (
+        SELECT 'a' AS uid, null    AS amt, 0 AS time_s UNION ALL
+        SELECT 'a' AS uid, 1 AS amt, 1 AS time_s UNION ALL
+        SELECT 'a' AS uid, 2 AS amt, 2 AS time_s UNION ALL
+        SELECT 'a' AS uid, null    AS amt, 3 AS time_s UNION ALL
+        SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+        SELECT 'b' AS uid, 3    AS amt, 5 AS time_s UNION ALL
+        SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+        SELECT 'b' AS uid, 2    AS amt, 7 AS time_s 
+        ) t
+    ORDER BY uid, time_s
+    ;
+    """
+
+    //last value: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+    qt_select_default_last_rewrite_first2 """
+            SELECT uid
+        ,amt
+        ,(FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s DESC 
ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt3
+        ,time_s
+        FROM (
+            SELECT 'a' AS uid, 1    AS amt, 0 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+            SELECT 'a' AS uid, 2    AS amt, 3 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+            SELECT 'b' AS uid, 3    AS amt, 5 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+            SELECT 'b' AS uid, 2    AS amt, 7 AS time_s 
+            ) t
+        ORDER BY uid, time_s;
+    """
+
+    qt_select_default7 """
+    SELECT uid
+        ,amt
+        ,COALESCE(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s 
ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) amt1
+        ,COALESCE(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s 
ASC ROWS BETWEEN 100 PRECEDING AND CURRENT ROW)) amt_not
+        ,COALESCE(FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s 
DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt2
+        ,COALESCE(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s 
DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt3
+        ,time_s
+    FROM (
+        SELECT 'a' AS uid, 1    AS amt, 0 AS time_s UNION ALL
+        SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+        SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+        SELECT 'a' AS uid, 2    AS amt, 3 AS time_s UNION ALL
+        SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+        SELECT 'b' AS uid, 3    AS amt, 5 AS time_s UNION ALL
+        SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+        SELECT 'b' AS uid, 2    AS amt, 7 AS time_s 
+        ) t
+    ORDER BY uid, time_s
+    ;
+    """
+
+    qt_select_default8 """
+            SELECT uid
+        ,amt
+        ,(FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ROWS 
between 3 following AND 6 FOLLOWING)) amt3
+        ,time_s
+        FROM (
+            SELECT 'a' AS uid, 1    AS amt, 0 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+            SELECT 'a' AS uid, 2    AS amt, 3 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+            SELECT 'b' AS uid, 3    AS amt, 5 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+            SELECT 'b' AS uid, 2    AS amt, 7 AS time_s 
+            ) t
+        ORDER BY uid, time_s;
+    """
+
+    qt_select_default9 """
+            SELECT uid
+        ,amt
+        ,(FIRST_VALUE(amt) OVER(PARTITION BY uid ORDER BY time_s ROWS between 
3 following AND 6 FOLLOWING)) amt3
+        ,time_s
+        FROM (
+            SELECT 'a' AS uid, 1    AS amt, 0 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+            SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+            SELECT 'a' AS uid, 2    AS amt, 3 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+            SELECT 'b' AS uid, 3    AS amt, 5 AS time_s UNION ALL
+            SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+            SELECT 'b' AS uid, 2    AS amt, 7 AS time_s 
+            ) t
+        ORDER BY uid, time_s;
+    """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to