This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d4d556851da [improve](analytic) support window function execute with
incremental mode (#52138)
d4d556851da is described below
commit d4d556851dad1cfb57d77f7d57855340830d9a4d
Author: zhangstar333 <[email protected]>
AuthorDate: Mon Jul 7 10:03:25 2025 +0800
[improve](analytic) support window function execute with incremental mode
(#52138)
### What problem does this PR solve?
Problem Summary:
```
some agg function used as window function could support incremental
mode,
eg: sum(col) over (rows between 3 preceding and 3 following),
before the sum[i] need a for loop[current - 3, current + 3), sum all of
data
but now could resue the previous result, sum[i] = sum[i-1] - col[x] +
col[y],
so we only need one sub and one add at the sum[i-1] res data,
the test performance will be add soon.
```
---
be/src/pipeline/exec/analytic_sink_operator.cpp | 50 ++--
be/src/pipeline/exec/analytic_sink_operator.h | 5 +-
.../vec/aggregate_functions/aggregate_function.h | 52 +++-
.../aggregate_functions/aggregate_function_avg.h | 77 +++++-
.../aggregate_functions/aggregate_function_count.h | 46 ++++
.../aggregate_function_java_udaf.h | 11 +-
.../aggregate_function_min_max.h | 98 ++++++++
.../aggregate_functions/aggregate_function_null.h | 106 +++++++-
.../aggregate_function_reader_first_last.h | 2 +-
.../aggregate_functions/aggregate_function_sum.h | 58 +++++
.../aggregate_function_window.h | 14 +-
.../window_functions/test_window_fn.out | Bin 7822 -> 16313 bytes
.../window_functions/test_window_fn.groovy | 273 ++++++++++++++++++++-
13 files changed, 752 insertions(+), 40 deletions(-)
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 455f41fcd51..bf9bb08aca0 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -70,7 +70,7 @@ Status AnalyticSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_executor.get_next_impl =
&AnalyticSinkLocalState::_get_next_for_sliding_rows;
}
_streaming_mode = true;
-
+ _support_incremental_calculate = (p._has_window_start &&
p._has_window_end);
if (p._has_window_start) { //calculate start boundary
TAnalyticWindowBoundary b = p._window.window_start;
if (b.__isset.rows_offset_value) { //[offset , ]
@@ -114,6 +114,8 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) {
_offsets_of_aggregate_states.resize(_agg_functions_size);
_result_column_nullable_flags.resize(_agg_functions_size);
_result_column_could_resize.resize(_agg_functions_size);
+ _use_null_result.resize(_agg_functions_size, 0);
+ _could_use_previous_result.resize(_agg_functions_size, 0);
for (int i = 0; i < _agg_functions_size; ++i) {
_agg_functions[i] = p._agg_functions[i]->clone(state,
state->obj_pool());
@@ -132,6 +134,8 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) {
if
(PARTITION_FUNCTION_SET.contains(_agg_functions[i]->function()->get_name())) {
_streaming_mode = false;
}
+ _support_incremental_calculate &=
+ _agg_functions[i]->function()->supported_incremental_mode();
}
_partition_exprs_size = p._partition_by_eq_expr_ctxs.size();
@@ -185,8 +189,6 @@ Status AnalyticSinkLocalState::close(RuntimeState* state,
Status exec_status) {
return PipelineXSinkLocalState<AnalyticSharedState>::close(state,
exec_status);
}
-//TODO: eg sum/avg/count/min/max ROWS BETWEEN N PRECEDING AND M FOLLOWING
-//maybe could be optimized caculate at cumulative mode
bool AnalyticSinkLocalState::_get_next_for_sliding_rows(int64_t
current_block_rows,
int64_t
current_block_base_pos) {
const bool is_n_following_frame = _rows_end_offset > 0;
@@ -199,13 +201,18 @@ bool
AnalyticSinkLocalState::_get_next_for_sliding_rows(int64_t current_block_ro
_need_more_data = true;
break;
}
- _reset_agg_status();
+ if (_support_incremental_calculate) {
+ _execute_for_function<true>(_partition_by_pose.start,
_partition_by_pose.end,
+ current_row_start, current_row_end);
+ } else {
+ _reset_agg_status();
+ // Eg: rows between unbounded preceding and 10 preceding
+ // Make sure range_start <= range_end
+ current_row_start = std::min(current_row_start, current_row_end);
+ _execute_for_function(_partition_by_pose.start,
_partition_by_pose.end,
+ current_row_start, current_row_end);
+ }
- // Eg: rows between unbounded preceding and 10 preceding
- // Make sure range_start <= range_end
- current_row_start = std::min(current_row_start, current_row_end);
- _execute_for_function(_partition_by_pose.start,
_partition_by_pose.end, current_row_start,
- current_row_end);
int64_t pos = current_pos_in_block();
_insert_result_info(pos, pos + 1);
_current_row_position++;
@@ -366,21 +373,27 @@ Status AnalyticSinkLocalState::_execute_impl() {
return Status::OK();
}
+template <bool incremental>
void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start,
int64_t partition_end,
int64_t frame_start,
int64_t frame_end) {
// here is the core function, should not add timer
for (size_t i = 0; i < _agg_functions_size; ++i) {
- if (_result_column_nullable_flags[i] && _current_window_empty) {
- continue;
- }
std::vector<const vectorized::IColumn*> agg_columns;
for (int j = 0; j < _agg_input_columns[i].size(); ++j) {
agg_columns.push_back(_agg_input_columns[i][j].get());
}
- _agg_functions[i]->function()->add_range_single_place(
- partition_start, partition_end, frame_start, frame_end,
- _fn_place_ptr + _offsets_of_aggregate_states[i],
agg_columns.data(),
- _agg_arena_pool.get());
+ if constexpr (incremental) {
+ _agg_functions[i]->function()->execute_function_with_incremental(
+ partition_start, partition_end, frame_start, frame_end,
+ _fn_place_ptr + _offsets_of_aggregate_states[i],
agg_columns.data(),
+ _agg_arena_pool.get(), false, false, false,
&_use_null_result[i],
+ &_could_use_previous_result[i]);
+ } else {
+ _agg_functions[i]->function()->add_range_single_place(
+ partition_start, partition_end, frame_start, frame_end,
+ _fn_place_ptr + _offsets_of_aggregate_states[i],
agg_columns.data(),
+ _agg_arena_pool.get(), &(_use_null_result[i]),
&_could_use_previous_result[i]);
+ }
}
}
@@ -388,8 +401,7 @@ void AnalyticSinkLocalState::_insert_result_info(int64_t
start, int64_t end) {
// here is the core function, should not add timer
for (size_t i = 0; i < _agg_functions_size; ++i) {
if (_result_column_nullable_flags[i]) {
- if (_current_window_empty) {
- //TODO need check this logical???
+ if (_use_null_result[i]) {
_result_window_columns[i]->insert_many_defaults(end - start);
} else {
auto* dst =
@@ -898,6 +910,8 @@ Status
AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
}
void AnalyticSinkLocalState::_reset_agg_status() {
+ _use_null_result.assign(_agg_functions_size, 0);
+ _could_use_previous_result.assign(_agg_functions_size, 0);
for (size_t i = 0; i < _agg_functions_size; ++i) {
_agg_functions[i]->reset(_fn_place_ptr +
_offsets_of_aggregate_states[i]);
}
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 45b870eacfb..73fb1ef386d 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -88,6 +88,7 @@ private:
bool _get_next_for_sliding_rows(int64_t current_block_rows, int64_t
current_block_base_pos);
void _init_result_columns();
+ template <bool incremental = false>
void _execute_for_function(int64_t partition_start, int64_t partition_end,
int64_t frame_start,
int64_t frame_end);
void _insert_result_info(int64_t start, int64_t end);
@@ -144,8 +145,10 @@ private:
};
executor _executor;
- bool _current_window_empty = false;
+ std::vector<uint8_t> _use_null_result;
+ std::vector<uint8_t> _could_use_previous_result;
bool _streaming_mode = false;
+ bool _support_incremental_calculate = true;
bool _need_more_data = false;
int64_t _current_row_position = 0;
int64_t _output_block_index = 0;
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h
b/be/src/vec/aggregate_functions/aggregate_function.h
index f4b98052e29..b6304212c49 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -211,7 +211,8 @@ public:
virtual void add_range_single_place(int64_t partition_start, int64_t
partition_end,
int64_t frame_start, int64_t frame_end,
AggregateDataPtr place, const
IColumn** columns,
- Arena*) const = 0;
+ Arena* arena, UInt8* use_null_result,
+ UInt8* could_use_previous_result)
const = 0;
virtual void streaming_agg_serialize(const IColumn** columns,
BufferWritable& buf,
const size_t num_rows, Arena*) const
= 0;
@@ -246,6 +247,41 @@ public:
}
}
+ /// some agg function like sum/count/avg/min/max could support incremental
mode,
+ /// eg sum(col) over (rows between 3 preceding and 3 following), could
resue the previous result
+ /// sum[i] = sum[i-1] - col[x] + col[y]
+ virtual bool supported_incremental_mode() const { return false; }
+
+ /**
+ * Executes the aggregate function in incremental mode.
+ * This is a virtual function that should be overridden by aggregate
functions supporting incremental calculation.
+ *
+ * @param partition_start Start position of the current partition
(inclusive)
+ * @param partition_end End position of the current partition (exclusive)
+ * @param frame_start Start position of the current window frame (inclusive)
+ * @param frame_end End position of the current window frame (exclusive)
+ * @param place Memory location to store aggregation results
+ * @param columns Input columns for aggregation
+ * @param arena Memory pool for allocations
+ * @param previous_is_nul Whether previous value is NULL, if true, no need
to subtract previous value
+ * @param end_is_nul Whether the end boundary is NULL, if true, no need to
add end value
+ * @param has_null Whether the current column contains NULL values
+ * @param use_null_result Output: whether to use NULL as result when the
frame is empty
+ * @param could_use_previous_result Output: whether previous result can be
reused
+ * @throws doris::Exception when called on a function that doesn't support
incremental mode
+ */
+ virtual void execute_function_with_incremental(int64_t partition_start,
int64_t partition_end,
+ int64_t frame_start,
int64_t frame_end,
+ AggregateDataPtr place,
const IColumn** columns,
+ Arena* arena, bool
previous_is_nul,
+ bool end_is_nul, bool
has_null,
+ UInt8* use_null_result,
+ UInt8*
could_use_previous_result) const {
+ throw doris::Exception(Status::FatalError(
+ "Aggregate function " + get_name() +
+ " does not support cumulative mode, but it is called in
cumulative mode"));
+ }
+
protected:
DataTypes argument_types;
int version {};
@@ -318,17 +354,25 @@ public:
derived->add(place, columns, i, arena);
}
}
- //now this is use for sum/count/avg/min/max win function, other win
function should override this function in class
-
//stddev_pop/stddev_samp/variance_pop/variance_samp/hll_union_agg/group_concat
+
void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
- Arena* arena) const override {
+ Arena* arena, UInt8* use_null_result,
+ UInt8* could_use_previous_result) const
override {
const Derived* derived = assert_cast<const Derived*>(this);
frame_start = std::max<int64_t>(frame_start, partition_start);
frame_end = std::min<int64_t>(frame_end, partition_end);
for (int64_t i = frame_start; i < frame_end; ++i) {
derived->add(place, columns, i, arena);
}
+ if (frame_start >= frame_end) {
+ if (!*could_use_previous_result) {
+ *use_null_result = true;
+ }
+ } else {
+ *use_null_result = false;
+ *could_use_previous_result = true;
+ }
}
void add_batch_range(size_t batch_begin, size_t batch_end,
AggregateDataPtr place,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h
b/be/src/vec/aggregate_functions/aggregate_function_avg.h
index facef6ac874..14b7e2bea27 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_avg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h
@@ -141,19 +141,34 @@ public:
}
}
- void add(AggregateDataPtr __restrict place, const IColumn** columns,
ssize_t row_num,
- Arena*) const override {
+ template <bool is_add>
+ void update_value(AggregateDataPtr __restrict place, const IColumn**
columns,
+ ssize_t row_num) const {
#ifdef __clang__
#pragma clang fp reassociate(on)
#endif
const auto& column =
assert_cast<const ColVecType&,
TypeCheckOnRelease::DISABLE>(*columns[0]);
- if constexpr (is_decimal(T)) {
- this->data(place).sum +=
(DataType)column.get_data()[row_num].value;
+ if constexpr (is_add) {
+ if constexpr (is_decimal(T)) {
+ this->data(place).sum +=
(DataType)column.get_data()[row_num].value;
+ } else {
+ this->data(place).sum += (DataType)column.get_data()[row_num];
+ }
+ ++this->data(place).count;
} else {
- this->data(place).sum += (DataType)column.get_data()[row_num];
+ if constexpr (is_decimal(T)) {
+ this->data(place).sum -=
(DataType)column.get_data()[row_num].value;
+ } else {
+ this->data(place).sum -= (DataType)column.get_data()[row_num];
+ }
+ --this->data(place).count;
}
- ++this->data(place).count;
+ }
+
+ void add(AggregateDataPtr __restrict place, const IColumn** columns,
ssize_t row_num,
+ Arena*) const override {
+ update_value<true>(place, columns, row_num);
}
void reset(AggregateDataPtr place) const override {
@@ -278,6 +293,56 @@ public:
return std::make_shared<DataTypeFixedLengthObject>();
}
+ bool supported_incremental_mode() const override { return true; }
+
+ void execute_function_with_incremental(int64_t partition_start, int64_t
partition_end,
+ int64_t frame_start, int64_t
frame_end,
+ AggregateDataPtr place, const
IColumn** columns,
+ Arena* arena, bool previous_is_nul,
bool end_is_nul,
+ bool has_null, UInt8*
use_null_result,
+ UInt8* could_use_previous_result)
const override {
+ int64_t current_frame_start = std::max<int64_t>(frame_start,
partition_start);
+ int64_t current_frame_end = std::min<int64_t>(frame_end,
partition_end);
+ if (current_frame_start >= current_frame_end) {
+ *use_null_result = true;
+ return;
+ }
+ if (*could_use_previous_result) {
+ auto outcoming_pos = frame_start - 1;
+ auto incoming_pos = frame_end - 1;
+ if (!previous_is_nul && outcoming_pos >= partition_start &&
+ outcoming_pos < partition_end) {
+ update_value<false>(place, columns, outcoming_pos);
+ }
+ if (!end_is_nul && incoming_pos >= partition_start && incoming_pos
< partition_end) {
+ update_value<true>(place, columns, incoming_pos);
+ }
+ } else {
+ this->add_range_single_place(partition_start, partition_end,
frame_start, frame_end,
+ place, columns, arena,
use_null_result,
+ could_use_previous_result);
+ }
+ }
+
+ void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
+ int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
+ Arena* arena, UInt8* use_null_result,
+ UInt8* could_use_previous_result) const
override {
+ auto current_frame_start = std::max<int64_t>(frame_start,
partition_start);
+ auto current_frame_end = std::min<int64_t>(frame_end, partition_end);
+ if (current_frame_start >= current_frame_end) {
+ if (!*could_use_previous_result) {
+ *use_null_result = true;
+ }
+ } else {
+ for (size_t row_num = current_frame_start; row_num <
current_frame_end; ++row_num) {
+ update_value<true>(place, columns, row_num);
+ }
+ *use_null_result = false;
+ *could_use_previous_result = true;
+ }
+ }
+
private:
UInt32 scale;
};
diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h
b/be/src/vec/aggregate_functions/aggregate_function_count.h
index 8915b88ea3b..99e7422b108 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_count.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_count.h
@@ -174,6 +174,23 @@ public:
DataTypePtr get_serialized_type() const override {
return std::make_shared<DataTypeFixedLengthObject>();
}
+
+ void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
+ int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
+ Arena* arena, UInt8* use_null_result,
+ UInt8* could_use_previous_result) const
override {
+ frame_start = std::max<int64_t>(frame_start, partition_start);
+ frame_end = std::min<int64_t>(frame_end, partition_end);
+ if (frame_start >= frame_end) {
+ if (!*could_use_previous_result) {
+ *use_null_result = true;
+ }
+ } else {
+ AggregateFunctionCount::data(place).count += frame_end -
frame_start;
+ *use_null_result = false;
+ *could_use_previous_result = true;
+ }
+ }
};
// TODO: Maybe AggregateFunctionCountNotNullUnary should be a subclass of
AggregateFunctionCount
@@ -313,6 +330,35 @@ public:
DataTypePtr get_serialized_type() const override {
return std::make_shared<DataTypeFixedLengthObject>();
}
+
+ void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
+ int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
+ Arena* arena, UInt8* use_null_result,
+ UInt8* could_use_previous_result) const
override {
+ frame_start = std::max<int64_t>(frame_start, partition_start);
+ frame_end = std::min<int64_t>(frame_end, partition_end);
+ if (frame_start >= frame_end) {
+ if (!*could_use_previous_result) {
+ *use_null_result = true;
+ }
+ } else {
+ const auto& nullable_column =
+ assert_cast<const ColumnNullable&,
TypeCheckOnRelease::DISABLE>(*columns[0]);
+ size_t count = 0;
+ if (nullable_column.has_null()) {
+ for (int64_t i = frame_start; i < frame_end; ++i) {
+ if (!nullable_column.is_null_at(i)) {
+ ++count;
+ }
+ }
+ } else {
+ count = frame_end - frame_start;
+ }
+ *use_null_result = false;
+ *could_use_previous_result = true;
+ AggregateFunctionCountNotNullUnary::data(place).count += count;
+ }
+ }
};
} // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
index 63c0f03cdfc..f8815c30d70 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
@@ -354,13 +354,22 @@ public:
void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
- Arena*) const override {
+ Arena*, UInt8* current_window_empty,
+ UInt8* current_window_has_inited) const
override {
frame_start = std::max<int64_t>(frame_start, partition_start);
frame_end = std::min<int64_t>(frame_end, partition_end);
int64_t places_address = reinterpret_cast<int64_t>(place);
Status st = this->data(_exec_place)
.add(places_address, true, columns, frame_start,
frame_end,
argument_types, 0);
+ if (frame_start >= frame_end) {
+ if (!*current_window_has_inited) {
+ *current_window_empty = true;
+ }
+ } else {
+ *current_window_empty = false;
+ *current_window_has_inited = true;
+ }
if (UNLIKELY(!st.ok())) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string());
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
index a139e0e6a47..fafbc2b36c0 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
@@ -31,6 +31,7 @@
#include "common/cast_set.h"
#include "common/logging.h"
+#include "runtime/primitive_type.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_fixed_length_object.h"
@@ -165,6 +166,15 @@ public:
}
}
+ bool check_if_equal(const IColumn& column, size_t row_num) const {
+ if (!has()) {
+ return false;
+ }
+ return assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
+ TypeCheckOnRelease::DISABLE>(column)
+ .get_data()[row_num] == value;
+ }
+
bool change_if_greater(const Self& to, Arena*) {
if (to.has() && (!has() || to.value > value)) {
change(to, nullptr);
@@ -304,6 +314,15 @@ public:
}
}
+ bool check_if_equal(const IColumn& column, size_t row_num) const {
+ if (!has()) {
+ return false;
+ }
+ return assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
+ TypeCheckOnRelease::DISABLE>(column)
+ .get_data()[row_num] == value;
+ }
+
void change_first_time(const IColumn& column, size_t row_num, Arena*) {
if (UNLIKELY(!has())) {
change(column, row_num, nullptr);
@@ -472,6 +491,14 @@ public:
}
}
+ bool check_if_equal(const IColumn& column, size_t row_num) const {
+ if (!has()) {
+ return false;
+ }
+ return assert_cast<const ColumnString&,
TypeCheckOnRelease::DISABLE>(column).get_data_at(
+ row_num) == get_string_ref();
+ }
+
void change_first_time(const IColumn& column, size_t row_num, Arena*) {
if (UNLIKELY(!has())) {
change(column, row_num, nullptr);
@@ -633,6 +660,8 @@ struct SingleValueDataComplexType {
void change_if_better(const Self& to, Arena* arena) {
this->change_first_time(to, nullptr); }
+ bool check_if_equal(const IColumn& column, size_t row_num) const { return
false; }
+
private:
bool has_value = false;
MutableColumnPtr column_data;
@@ -825,6 +854,75 @@ public:
return std::make_shared<DataTypeString>();
}
}
+
+ bool supported_incremental_mode() const override { return !(Data::IS_ANY);
}
+
+ void execute_function_with_incremental(int64_t partition_start, int64_t
partition_end,
+ int64_t frame_start, int64_t
frame_end,
+ AggregateDataPtr place, const
IColumn** columns,
+ Arena* arena, bool previous_is_nul,
bool end_is_nul,
+ bool has_null, UInt8*
use_null_result,
+ UInt8* could_use_previous_result)
const override {
+ int64_t current_frame_start = std::max<int64_t>(frame_start,
partition_start);
+ int64_t current_frame_end = std::min<int64_t>(frame_end,
partition_end);
+ if (current_frame_start >= current_frame_end) {
+ *use_null_result = true;
+ return;
+ }
+ if (*could_use_previous_result) {
+ auto outcoming_pos = frame_start - 1;
+ auto incoming_pos = frame_end - 1;
+ if (!previous_is_nul && outcoming_pos >= partition_start &&
+ outcoming_pos < partition_end) {
+ if (this->data(place).check_if_equal(*columns[0],
outcoming_pos)) {
+ this->data(place).reset();
+ if (has_null) {
+ const auto& null_map_data =
+ assert_cast<const
ColumnUInt8*>(columns[1])->get_data();
+ for (size_t i = current_frame_start; i <
current_frame_end; ++i) {
+ if (null_map_data[i] == 0) {
+
this->data(place).change_if_better(*columns[0], i, arena);
+ }
+ }
+ } else {
+ this->add_range_single_place(partition_start,
partition_end,
+ current_frame_start,
current_frame_end, place,
+ columns, arena,
use_null_result,
+
could_use_previous_result);
+ }
+ return;
+ }
+ }
+ if (!end_is_nul && incoming_pos >= partition_start && incoming_pos
< partition_end) {
+ this->data(place).change_if_better(*columns[0], incoming_pos,
arena);
+ }
+
+ } else {
+ this->add_range_single_place(partition_start, partition_end,
frame_start, frame_end,
+ place, columns, arena,
use_null_result,
+ could_use_previous_result);
+ }
+ }
+
+ void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
+ int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
+ Arena* arena, UInt8* use_null_result,
+ UInt8* could_use_previous_result) const
override {
+ auto current_frame_start = std::max<int64_t>(frame_start,
partition_start);
+ auto current_frame_end = std::min<int64_t>(frame_end, partition_end);
+
+ if (current_frame_start >= current_frame_end) {
+ if (!*could_use_previous_result) {
+ *use_null_result = true;
+ }
+ } else {
+ for (size_t row_num = current_frame_start; row_num <
current_frame_end; ++row_num) {
+ this->data(place).change_if_better(*columns[0], row_num,
nullptr);
+ }
+ *use_null_result = false;
+ *could_use_previous_result = true;
+ }
+ }
};
template <template <typename> class Data>
diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h
b/be/src/vec/aggregate_functions/aggregate_function_null.h
index b46bcbe3537..274a39d8409 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_null.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_null.h
@@ -28,6 +28,7 @@
#include "vec/aggregate_functions/aggregate_function_distinct.h"
#include "vec/columns/column_nullable.h"
#include "vec/common/assert_cast.h"
+#include "vec/core/types.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/io/io_helper.h"
@@ -39,6 +40,7 @@ class AggregateFunctionNullBaseInline : public
IAggregateFunctionHelper<Derived>
protected:
std::unique_ptr<NestFunction> nested_function;
size_t prefix_size;
+ mutable int64_t null_count = 0;
/** In addition to data for nested aggregate function, we keep a flag
* indicating - was there at least one non-NULL value accumulated.
@@ -91,7 +93,7 @@ public:
String get_name() const override {
/// This is just a wrapper. The function for Nullable arguments is
named the same as the nested function itself.
- return nested_function->get_name();
+ return "Nullable(" + nested_function->get_name() + ")";
}
DataTypePtr get_return_type() const override {
@@ -110,6 +112,7 @@ public:
void reset(AggregateDataPtr place) const override {
init_flag(place);
nested_function->reset(nested_place(place));
+ null_count = 0;
}
bool has_trivial_destructor() const override {
@@ -281,6 +284,107 @@ public:
false);
}
}
+
+ void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
+ int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
+ Arena* arena, UInt8* use_null_result,
+ UInt8* could_use_previous_result) const
override {
+ auto current_frame_start = std::max<int64_t>(frame_start,
partition_start);
+ auto current_frame_end = std::min<int64_t>(frame_end, partition_end);
+ if (current_frame_start >= current_frame_end) {
+ if (!*could_use_previous_result) {
+ this->init_flag(place);
+ *use_null_result = true;
+ return;
+ }
+ } else {
+ *use_null_result = false;
+ *could_use_previous_result = true;
+ }
+ const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+ bool has_null = column->has_null();
+ if (has_null) {
+ for (size_t i = current_frame_start; i < current_frame_end; ++i) {
+ this->add(place, columns, i, arena);
+ }
+ } else {
+ const IColumn* nested_column = &(column->get_nested_column());
+ this->set_flag(place);
+ this->nested_function->add_range_single_place(
+ partition_start, partition_end, frame_start, frame_end,
+ this->nested_place(place), &nested_column, arena,
use_null_result,
+ could_use_previous_result);
+ }
+ }
+
+ bool supported_incremental_mode() const override {
+ return this->nested_function->supported_incremental_mode();
+ }
+
+ void execute_function_with_incremental(int64_t partition_start, int64_t
partition_end,
+ int64_t frame_start, int64_t
frame_end,
+ AggregateDataPtr place, const
IColumn** columns,
+ Arena* arena, bool previous_is_nul,
bool end_is_nul,
+ bool has_null, UInt8*
use_null_result,
+ UInt8* could_use_previous_result)
const override {
+ int64_t current_frame_start = std::max<int64_t>(frame_start,
partition_start);
+ int64_t current_frame_end = std::min<int64_t>(frame_end,
partition_end);
+ if (current_frame_start >= current_frame_end) {
+ *use_null_result = true;
+ this->init_flag(place);
+ return;
+ }
+
+ DCHECK(columns[0]->is_nullable()) << columns[0]->get_name();
+ const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+ const IColumn* nested_column = &column->get_nested_column();
+
+ if (!column->has_null()) {
+ if (*could_use_previous_result) {
+ this->nested_function->execute_function_with_incremental(
+ partition_start, partition_end, frame_start, frame_end,
+ this->nested_place(place), &nested_column, arena,
previous_is_nul,
+ end_is_nul, false, use_null_result,
could_use_previous_result);
+ } else {
+ this->nested_function->add_range_single_place(
+ partition_start, partition_end, frame_start, frame_end,
+ this->nested_place(place), &nested_column, arena,
use_null_result,
+ could_use_previous_result);
+ }
+ this->set_flag(place);
+ return;
+ }
+
+ const auto* __restrict null_map_data =
column->get_null_map_data().data();
+ if (*could_use_previous_result) {
+ auto outcoming_pos = frame_start - 1;
+ auto incoming_pos = frame_end - 1;
+ bool is_previous_frame_start_null = false;
+ if (outcoming_pos >= partition_start && outcoming_pos <
partition_end &&
+ null_map_data[outcoming_pos] == 1) {
+ is_previous_frame_start_null = true;
+ this->null_count--;
+ }
+ bool is_current_frame_end_null = false;
+ if (incoming_pos >= partition_start && incoming_pos <
partition_end &&
+ null_map_data[incoming_pos] == 1) {
+ is_current_frame_end_null = true;
+ this->null_count++;
+ }
+ const IColumn* columns_tmp[2] {nested_column,
&(*column->get_null_map_column_ptr())};
+ this->nested_function->execute_function_with_incremental(
+ partition_start, partition_end, frame_start, frame_end,
+ this->nested_place(place), columns_tmp, arena,
is_previous_frame_start_null,
+ is_current_frame_end_null, true, use_null_result,
could_use_previous_result);
+ if (current_frame_end - current_frame_start != this->null_count) {
+ this->set_flag(place);
+ }
+ } else {
+ this->add_range_single_place(partition_start, partition_end,
frame_start, frame_end,
+ place, columns, arena,
use_null_result,
+ could_use_previous_result);
+ }
+ }
};
template <typename NestFuction, bool result_is_nullable>
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
index 6daa0f99fef..e7b77494bd2 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
@@ -237,7 +237,7 @@ public:
void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
- Arena*) const override {
+ Arena* arena, UInt8*, UInt8*) const override {
throw doris::Exception(
Status::FatalError("ReaderFunctionData do not support
add_range_single_place"));
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h
b/be/src/vec/aggregate_functions/aggregate_function_sum.h
index 608d0f1dc1d..8e4d647b20c 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h
@@ -212,6 +212,64 @@ public:
return std::make_shared<DataTypeFixedLengthObject>();
}
+ bool supported_incremental_mode() const override { return true; }
+
+ void execute_function_with_incremental(int64_t partition_start, int64_t
partition_end,
+ int64_t frame_start, int64_t
frame_end,
+ AggregateDataPtr place, const
IColumn** columns,
+ Arena* arena, bool previous_is_nul,
bool end_is_nul,
+ bool has_null, UInt8*
use_null_result,
+ UInt8* could_use_previous_result)
const override {
+ int64_t current_frame_start = std::max<int64_t>(frame_start,
partition_start);
+ int64_t current_frame_end = std::min<int64_t>(frame_end,
partition_end);
+
+ if (current_frame_start >= current_frame_end) {
+ *use_null_result = true;
+ return;
+ }
+ if (*could_use_previous_result) {
+ const auto& column =
+ assert_cast<const ColVecType&,
TypeCheckOnRelease::DISABLE>(*columns[0]);
+ const auto* data = column.get_data().data();
+ auto outcoming_pos = frame_start - 1;
+ auto incoming_pos = frame_end - 1;
+ if (!previous_is_nul && outcoming_pos >= partition_start &&
+ outcoming_pos < partition_end) {
+ this->data(place).sum -= data[outcoming_pos];
+ }
+ if (!end_is_nul && incoming_pos >= partition_start && incoming_pos
< partition_end) {
+ this->data(place).sum += data[incoming_pos];
+ }
+ } else {
+ this->add_range_single_place(partition_start, partition_end,
frame_start, frame_end,
+ place, columns, arena,
use_null_result,
+ could_use_previous_result);
+ }
+ }
+
+ void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
+ int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
+ Arena* arena, UInt8* use_null_result,
+ UInt8* could_use_previous_result) const
override {
+ auto current_frame_start = std::max<int64_t>(frame_start,
partition_start);
+ auto current_frame_end = std::min<int64_t>(frame_end, partition_end);
+
+ if (current_frame_start >= current_frame_end) {
+ if (!*could_use_previous_result) {
+ *use_null_result = true;
+ }
+ } else {
+ const auto& column =
+ assert_cast<const ColVecType&,
TypeCheckOnRelease::DISABLE>(*columns[0]);
+ for (size_t row_num = current_frame_start; row_num <
current_frame_end; ++row_num) {
+ this->data(place).add(typename
PrimitiveTypeTraits<TResult>::ColumnItemType(
+ column.get_data()[row_num]));
+ }
+ *use_null_result = false;
+ *could_use_previous_result = true;
+ }
+ }
+
private:
UInt32 scale;
};
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h
b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 811d5d2a175..d0288941554 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -63,7 +63,7 @@ public:
void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
- Arena*) const override {
+ Arena*, UInt8*, UInt8*) const override {
++data(place).count;
}
@@ -112,7 +112,7 @@ public:
void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
- Arena*) const override {
+ Arena*, UInt8*, UInt8*) const override {
int64_t peer_group_count = frame_end - frame_start;
if (WindowFunctionRank::data(place).peer_group_start != frame_start) {
WindowFunctionRank::data(place).peer_group_start = frame_start;
@@ -168,7 +168,7 @@ public:
void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
- Arena*) const override {
+ Arena*, UInt8*, UInt8*) const override {
if (WindowFunctionDenseRank::data(place).peer_group_start !=
frame_start) {
WindowFunctionDenseRank::data(place).peer_group_start =
frame_start;
WindowFunctionDenseRank::data(place).rank++;
@@ -226,7 +226,7 @@ public:
void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
- Arena*) const override {
+ Arena*, UInt8*, UInt8*) const override {
int64_t peer_group_count = frame_end - frame_start;
if (WindowFunctionPercentRank::data(place).peer_group_start !=
frame_start) {
WindowFunctionPercentRank::data(place).peer_group_start =
frame_start;
@@ -296,7 +296,7 @@ public:
void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
- Arena*) const override {
+ Arena*, UInt8*, UInt8*) const override {
check_default(place, partition_start, partition_end);
int64_t peer_group_count = frame_end - frame_start;
if (WindowFunctionCumeDist::data(place).peer_group_start !=
frame_start) {
@@ -352,7 +352,7 @@ public:
void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
- Arena*) const override {
+ Arena*, UInt8*, UInt8*) const override {
// some variables are partition related, but there is no chance to
init them
// when the new partition arrives, so we calculate them every time now.
// Partition = big_bucket_num * big_bucket_size + small_bucket_num *
small_bucket_size
@@ -655,7 +655,7 @@ public:
void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
- Arena*) const override {
+ Arena*, UInt8*, UInt8*) const override {
this->data(place).add_range_single_place(partition_start,
partition_end, frame_start,
frame_end, columns);
}
diff --git
a/regression-test/data/query_p0/sql_functions/window_functions/test_window_fn.out
b/regression-test/data/query_p0/sql_functions/window_functions/test_window_fn.out
index 016d3a299dc..c4ea6bc9c2c 100644
Binary files
a/regression-test/data/query_p0/sql_functions/window_functions/test_window_fn.out
and
b/regression-test/data/query_p0/sql_functions/window_functions/test_window_fn.out
differ
diff --git
a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
index dd875749c91..111a85a5701 100644
---
a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
+++
b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
@@ -399,7 +399,278 @@ suite("test_window_fn") {
sql """set enable_nereids_planner=true;"""
sql """SELECT SUM(MAX(c1) OVER (PARTITION BY c2, c3)) FROM
test_window_in_agg;"""
- sql "DROP TABLE IF EXISTS test_window_in_agg;"
+ sql "DROP TABLE IF EXISTS test2;"
+
+ sql """ DROP TABLE IF EXISTS test2; """
+
+ sql """ CREATE TABLE IF NOT EXISTS test2 (
+ `pk` int NULL,
+ `col_datetime_3__undef_signed_not_null` datetime(3) not
null
+ )
+ DUPLICATE KEY(pk)
+ DISTRIBUTED BY HASH(pk) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """ INSERT into test2 (pk, col_datetime_3__undef_signed_not_null)
values
+ ('0', '2005-01-11
03:43:25.000'),
+ ('1', '2000-05-27
10:52:55.000'),
+ ('2', '2003-07-22
04:04:57.000'),
+ ('3', '2024-07-01
00:00:00.000'),
+ ('4', '9999-12-31
00:00:00.000'),
+ ('5', '2022-03-13
01:30:00'),
+ ('6', '2022-03-13
04:45:00'),
+ ('7', '2022-03-13
07:15:00'),
+ ('8', '2022-03-13
10:05:00'),
+ ('9', '2022-03-13
12:50:00');
+
+ """
+
+ qt_sql_window_null """
+ select col_datetime_3__undef_signed_not_null,pk,
max(col_datetime_3__undef_signed_not_null) over (order by pk rows between 4
preceding and 2 preceding) as res from test2 order by pk;
+ """
+
+
+ sql "DROP TABLE IF EXISTS test_baseall_null"
+ sql """
+ CREATE TABLE IF NOT EXISTS `test_baseall_null` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(10, 6) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" =
"1")
+ """
+
+ streamLoad {
+ table "test_baseall_null"
+ db 'regression_test_query_p0_sql_functions_window_functions'
+ set 'column_separator', ','
+ file "../../baseall.txt"
+ }
+ sql "sync"
+
+ sql "DROP TABLE IF EXISTS test_baseall_not"
+ sql """
+ CREATE TABLE IF NOT EXISTS `test_baseall_not` (
+ `k0` boolean not null comment "",
+ `k1` tinyint(4) not null comment "",
+ `k2` smallint(6) not null comment "",
+ `k3` int(11) not null comment "",
+ `k4` bigint(20) not null comment "",
+ `k5` decimal(10, 6) not null comment "",
+ `k6` char(5) not null comment "",
+ `k10` date not null comment "",
+ `k11` datetime not null comment "",
+ `k7` varchar(20) not null comment "",
+ `k8` double max not null comment "",
+ `k9` float sum not null comment "",
+ `k12` string replace not null comment "",
+ `k13` largeint(40) replace not null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" =
"1")
+ """
+
+ qt_sql_table_count """
+ select count(1) from test_baseall_null;
+ """
+
+ sql """
+ insert into test_baseall_not select * from test_baseall_null where k1
is not null;
+ """
+
+ qt_sql_table_count2 """
+ select count(1) from test_baseall_not;
+ """
+
+ qt_sql_test_1 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows between
unbounded preceding and 2 following) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_test_2 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 range between
unbounded preceding and unbounded following) from test_baseall_not order by
k6,k1;
+ """
+
+ qt_sql_test_3 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows between 2
following and 3 following) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_test_4 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows between 4
preceding and 2 preceding) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_test_5 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows between
unbounded preceding and 2 following) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_6 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 range between
unbounded preceding and unbounded following) from test_baseall_null order by
k6,k1;
+ """
+
+ qt_sql_test_7 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows between 2
following and 3 following) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_8 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows between 4
preceding and 2 preceding) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_1 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows 2
preceding) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_2 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows current
row) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_3 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
CURRENT ROW AND UNBOUNDED FOLLOWING) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_4 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows unbounded
preceding) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_5 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 range unbounded
preceding) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_6 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 range BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_7 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_8 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_not order by
k6,k1;
+ """
+
+ qt_sql_9 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 range BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_not order by
k6,k1;
+ """
+
+ qt_sql_10 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2
PRECEDING AND 3 FOLLOWING) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_11 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 3
PRECEDING AND 2 PRECEDING) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_12 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2
FOLLOWING AND 3 FOLLOWING) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_13 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
UNBOUNDED PRECEDING AND 3 FOLLOWING) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_14 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
UNBOUNDED PRECEDING AND 2 PRECEDING) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_15 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2
PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_16 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2
FOLLOWING AND UNBOUNDED FOLLOWING) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_17 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 1
PRECEDING AND CURRENT ROW) from test_baseall_not order by k6,k1;
+ """
+
+ qt_sql_18 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
CURRENT ROW AND 1 FOLLOWING) from test_baseall_not order by k6,k1;
+ """
+
+
+ qt_sql_test_1 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows 2
preceding) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_2 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows current
row) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_3 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
CURRENT ROW AND UNBOUNDED FOLLOWING) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_4 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows unbounded
preceding) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_5 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 range unbounded
preceding) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_6 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 range BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_7 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_8 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_null order by
k6,k1;
+ """
+
+ qt_sql_test_9 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 range BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_null order by
k6,k1;
+ """
+
+ qt_sql_test_10 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2
PRECEDING AND 3 FOLLOWING) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_11 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 3
PRECEDING AND 2 PRECEDING) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_12 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2
FOLLOWING AND 3 FOLLOWING) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_13 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
UNBOUNDED PRECEDING AND 3 FOLLOWING) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_14 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
UNBOUNDED PRECEDING AND 2 PRECEDING) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_15 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2
PRECEDING AND UNBOUNDED FOLLOWING) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_16 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 2
FOLLOWING AND UNBOUNDED FOLLOWING) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_17 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN 1
PRECEDING AND CURRENT ROW) from test_baseall_null order by k6,k1;
+ """
+
+ qt_sql_test_18 """
+ select k6,k1,sum(k1) over(partition by k6 order by k1 rows BETWEEN
CURRENT ROW AND 1 FOLLOWING) from test_baseall_null order by k6,k1;
+ """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]