This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.0.1 by this push:
     new cedf2f240a [bug] fix window function nullable type bug
cedf2f240a is described below

commit cedf2f240a7f6fad3067f052db56760b0178e94b
Author: starocean999 <[email protected]>
AuthorDate: Tue Jun 21 18:42:21 2022 +0800

    [bug] fix window function nullable type bug
    
    nullable side of outer join should always produce nullable value
---
 be/src/vec/exec/join/vhash_join_node.cpp           |  42 ++-
 be/src/vec/exec/vaggregation_node.cpp              |  60 ++++-
 be/src/vec/exec/vaggregation_node.h                |   3 +
 be/src/vec/exec/vanalytic_eval_node.cpp            |  11 +-
 .../java/org/apache/doris/analysis/Analyzer.java   |   6 -
 .../test_outer_join_with_window_function.out       |   4 +
 .../test_outer_join_with_window_function.groovy    | 289 +++++++++++++++++++++
 7 files changed, 392 insertions(+), 23 deletions(-)

diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index d91ff2d22b..aefe93e89f 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -190,10 +190,15 @@ struct ProcessHashTableProbe {
         if constexpr (!is_semi_anti_join || have_other_join_conjunct) {
             if (_build_blocks.size() == 1) {
                 for (int i = 0; i < column_length; i++) {
-                    auto& column = *_build_blocks[0].get_by_position(i).column;
                     if (output_slot_flags[i]) {
-                        mcol[i + column_offset]->insert_indices_from(column, 
_build_block_rows.data(),
-                                                                     
_build_block_rows.data() + size);
+                        auto column = 
_build_blocks[0].get_by_position(i).column;
+                        if (mcol[i + column_offset]->is_nullable() xor 
column->is_nullable()) {
+                            DCHECK(mcol[i + column_offset]->is_nullable() &&
+                                   !column->is_nullable());
+                            column = make_nullable(column);
+                        }
+                        mcol[i + column_offset]->insert_indices_from(
+                                *column, _build_block_rows.data(), 
_build_block_rows.data() + size);
                     } else {
                         mcol[i + column_offset]->resize(size);
                     }
@@ -208,12 +213,29 @@ struct ProcessHashTableProbe {
                                     assert_cast<ColumnNullable *>(
                                             mcol[i + 
column_offset].get())->insert_join_null_data();
                                 } else {
-                                    auto &column = 
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
-                                    mcol[i + 
column_offset]->insert_from(column, _build_block_rows[j]);
+                                    auto column = 
_build_blocks[_build_block_offsets[j]]
+                                                          .get_by_position(i)
+                                                          .column;
+                                    if (mcol[i + column_offset]->is_nullable() 
xor
+                                        column->is_nullable()) {
+                                        DCHECK(mcol[i + 
column_offset]->is_nullable() &&
+                                               !column->is_nullable());
+                                        column = make_nullable(column);
+                                    }
+                                    mcol[i + 
column_offset]->insert_from(*column,
+                                                                         
_build_block_rows[j]);
                                 }
                             } else {
-                                auto &column = 
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
-                                mcol[i + column_offset]->insert_from(column, 
_build_block_rows[j]);
+                                auto column = 
_build_blocks[_build_block_offsets[j]]
+                                                      .get_by_position(i)
+                                                      .column;
+                                if (mcol[i + column_offset]->is_nullable() xor
+                                    column->is_nullable()) {
+                                    DCHECK(mcol[i + 
column_offset]->is_nullable() &&
+                                           !column->is_nullable());
+                                    column = make_nullable(column);
+                                }
+                                mcol[i + column_offset]->insert_from(*column, 
_build_block_rows[j]);
                             }
                         }
                     } else {
@@ -228,7 +250,11 @@ struct ProcessHashTableProbe {
     void probe_side_output_column(MutableColumns& mcol, const 
std::vector<bool>& output_slot_flags, int size) {
         for (int i = 0; i < output_slot_flags.size(); ++i) {
             if (output_slot_flags[i]) {
-                auto& column = _probe_block.get_by_position(i).column;
+                auto column = _probe_block.get_by_position(i).column;
+                if (mcol[i]->is_nullable() xor column->is_nullable()) {
+                    DCHECK(mcol[i]->is_nullable() && !column->is_nullable());
+                    column = make_nullable(column);
+                }
                 column->replicate(&_items_counts[0], size, *mcol[i]);
             } else {
                 mcol[i]->resize(size);
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 8230d4d697..def5f56ff7 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -851,6 +851,10 @@ Status 
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
             
key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
         }
     }
+
+    MutableColumns temp_key_columns = _create_temp_key_columns();
+    DCHECK(temp_key_columns.size() == key_size);
+
     MutableColumns value_columns;
     for (int i = key_size; i < column_withschema.size(); ++i) {
         if (!mem_reuse) {
@@ -860,19 +864,24 @@ Status 
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
         }
     }
 
+    MutableColumns temp_value_columns = _create_temp_value_columns();
+    DCHECK(temp_value_columns.size() == _aggregate_evaluators.size() &&
+           _aggregate_evaluators.size() == column_withschema.size() - 
key_size);
+
     SCOPED_TIMER(_get_results_timer);
     std::visit(
             [&](auto&& agg_method) -> void {
                 auto& data = agg_method.data;
                 auto& iter = agg_method.iterator;
                 agg_method.init_once();
-                while (iter != data.end() && key_columns[0]->size() < 
state->batch_size()) {
+                while (iter != data.end() && temp_key_columns[0]->size() < 
state->batch_size()) {
                     const auto& key = iter->get_first();
                     auto& mapped = iter->get_second();
-                    agg_method.insert_key_into_columns(key, key_columns, 
_probe_key_sz);
+                    agg_method.insert_key_into_columns(key, temp_key_columns, 
_probe_key_sz);
                     for (size_t i = 0; i < _aggregate_evaluators.size(); ++i)
                         _aggregate_evaluators[i]->insert_result_info(
-                                mapped + _offsets_of_aggregate_states[i], 
value_columns[i].get());
+                                mapped + _offsets_of_aggregate_states[i],
+                                temp_value_columns[i].get());
 
                     ++iter;
                 }
@@ -880,15 +889,15 @@ Status 
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
                     if (agg_method.data.has_null_key_data()) {
                         // only one key of group by support wrap null key
                         // here need additional processing logic on the null 
key / value
-                        DCHECK(key_columns.size() == 1);
-                        DCHECK(key_columns[0]->is_nullable());
-                        if (key_columns[0]->size() < state->batch_size()) {
-                            key_columns[0]->insert_data(nullptr, 0);
+                        DCHECK(temp_key_columns.size() == 1);
+                        DCHECK(temp_key_columns[0]->is_nullable());
+                        if (temp_key_columns[0]->size() < state->batch_size()) 
{
+                            temp_key_columns[0]->insert_data(nullptr, 0);
                             auto mapped = agg_method.data.get_null_key_data();
                             for (size_t i = 0; i < 
_aggregate_evaluators.size(); ++i)
                                 _aggregate_evaluators[i]->insert_result_info(
                                         mapped + 
_offsets_of_aggregate_states[i],
-                                        value_columns[i].get());
+                                        temp_value_columns[i].get());
                             *eos = true;
                         }
                     } else {
@@ -898,6 +907,25 @@ Status 
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
             },
             _agg_data._aggregated_method_variant);
 
+    for (int i = 0; i < key_size; ++i) {
+        if (key_columns[i]->is_nullable() xor 
temp_key_columns[i]->is_nullable()) {
+            DCHECK(key_columns[i]->is_nullable() && 
!temp_key_columns[i]->is_nullable());
+            key_columns[i] = 
(*std::move(make_nullable(std::move(temp_key_columns[i])))).mutate();
+        } else {
+            key_columns[i] = std::move(temp_key_columns[i]);
+        }
+    }
+
+    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+        if (value_columns[i]->is_nullable() xor 
temp_value_columns[i]->is_nullable()) {
+            DCHECK(value_columns[i]->is_nullable() && 
!temp_value_columns[i]->is_nullable());
+            value_columns[i] =
+                    
(*std::move(make_nullable(std::move(temp_value_columns[i])))).mutate();
+        } else {
+            value_columns[i] = std::move(temp_value_columns[i]);
+        }
+    }
+
     if (!mem_reuse) {
         *block = column_withschema;
         MutableColumns columns(block->columns());
@@ -1115,4 +1143,20 @@ void AggregationNode::release_tracker() {
     mem_tracker()->Release(_mem_usage_record.used_in_state + 
_mem_usage_record.used_in_arena);
 }
 
+MutableColumns AggregationNode::_create_temp_key_columns() {
+    MutableColumns key_columns;
+    for (const auto& expr_ctx : _probe_expr_ctxs) {
+        key_columns.push_back(expr_ctx->root()->data_type()->create_column());
+    }
+    return key_columns;
+}
+
+MutableColumns AggregationNode::_create_temp_value_columns() {
+    MutableColumns key_columns;
+    for (const auto& agg : _aggregate_evaluators) {
+        key_columns.push_back(agg->data_type()->create_column());
+    }
+    return key_columns;
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index f020b90a6e..d2f580d327 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -484,6 +484,9 @@ private:
 
     void release_tracker();
 
+    MutableColumns _create_temp_key_columns();
+    MutableColumns _create_temp_value_columns();
+
     using vectorized_execute = std::function<Status(Block* block)>;
     using vectorized_pre_agg = std::function<Status(Block* in_block, Block* 
out_block)>;
     using vectorized_get_result =
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp 
b/be/src/vec/exec/vanalytic_eval_node.cpp
index 65765d15d3..858e330efe 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -544,7 +544,16 @@ Status VAnalyticEvalNode::_output_current_block(Block* 
block) {
     }
 
     for (size_t i = 0; i < _result_window_columns.size(); ++i) {
-        block->insert({std::move(_result_window_columns[i]), 
_agg_functions[i]->data_type(), ""});
+        SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
+        if (output_slot_desc->is_nullable() xor 
_agg_functions[i]->data_type()->is_nullable()) {
+            DCHECK(output_slot_desc->is_nullable() &&
+                   !_agg_functions[i]->data_type()->is_nullable());
+            block->insert({make_nullable(std::move(_result_window_columns[i])),
+                           make_nullable(_agg_functions[i]->data_type()), ""});
+        } else {
+            block->insert(
+                    {std::move(_result_window_columns[i]), 
_agg_functions[i]->data_type(), ""});
+        }
     }
 
     _output_block_index++;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index c3e4e66984..23fc032a95 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -945,12 +945,6 @@ public class Analyzer {
             slotDescriptor.setIsNullable(true);
             return;
         }
-        for (Expr sourceExpr : slotDescriptor.getSourceExprs()) {
-            if (!sourceExpr.isNullable()) {
-                throw new VecNotImplException("The slot (" + 
slotDescriptor.toString()
-                        + ") could not be changed to nullable");
-            }
-        }
     }
 
     /**
diff --git 
a/regression-test/data/correctness/test_outer_join_with_window_function.out 
b/regression-test/data/correctness/test_outer_join_with_window_function.out
new file mode 100644
index 0000000000..e0d7861228
--- /dev/null
+++ b/regression-test/data/correctness/test_outer_join_with_window_function.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+abc    xyz     1577946288488507        1492704224      421001  421001  
2020-01-19T11:15:21     9999-12-30 00:00:00     9999-12-30T00:00        -       
-       -
+
diff --git 
a/regression-test/suites/correctness/test_outer_join_with_window_function.groovy
 
b/regression-test/suites/correctness/test_outer_join_with_window_function.groovy
new file mode 100644
index 0000000000..ce6f79edf7
--- /dev/null
+++ 
b/regression-test/suites/correctness/test_outer_join_with_window_function.groovy
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outer_join_with_with_window_function") {
+    sql """
+        drop table if exists dwd_online_detail;
+    """
+
+    sql """
+        CREATE TABLE `dwd_online_detail` (
+        `logout_time` datetime NOT NULL DEFAULT "9999-12-30 00:00:00",
+        `login_time` datetime NOT NULL DEFAULT "9999-12-30 00:00:00",
+        `game_code` varchar(50) NOT NULL DEFAULT "-",
+        `plat_code` varchar(50) NOT NULL DEFAULT "-",
+        `account` varchar(255) NOT NULL DEFAULT "-",
+        `playerid` varchar(255) NOT NULL DEFAULT "-",
+        `userid` varchar(255) NOT NULL DEFAULT "-",
+        `pid_code` varchar(50) NOT NULL DEFAULT "-",
+        `gid_code` varchar(50) NOT NULL DEFAULT "-",
+        `org_sid` int(11) NOT NULL DEFAULT "0",
+        `ct_sid` int(11) NOT NULL DEFAULT "0",
+        `next_login_time` datetime NOT NULL DEFAULT "9999-12-30 00:00:00"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`logout_time`, `login_time`, `game_code`, `plat_code`, 
`account`, `playerid`, `userid`)
+        PARTITION BY RANGE(`logout_time`)
+        (PARTITION p99991230 VALUES [('9999-12-30 00:00:00'), ('9999-12-31 
00:00:00')))
+        DISTRIBUTED BY HASH(`game_code`, `plat_code`) BUCKETS 4
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "colocate_with" = "gp_group"
+        );
+    """
+    
+    sql """
+        drop table if exists ods_logout;
+    """
+
+    sql """
+        CREATE TABLE `ods_logout` (
+        `day` date NULL COMMENT "",
+        `game` varchar(500) NULL COMMENT "",
+        `plat` varchar(500) NULL COMMENT "",
+        `dt` datetime NULL COMMENT "",
+        `time` bigint(20) NULL COMMENT "",
+        `sid` int(11) NULL COMMENT "",
+        `pid` varchar(500) NULL COMMENT "",
+        `gid` varchar(500) NULL COMMENT "",
+        `account` varchar(500) NULL COMMENT "",
+        `playerid` varchar(500) NULL COMMENT "",
+        `prop` varchar(500) NULL COMMENT "",
+        `p01` varchar(500) NULL COMMENT "",
+        `p02` varchar(500) NULL COMMENT "",
+        `p03` varchar(500) NULL COMMENT "",
+        `p04` varchar(500) NULL COMMENT "",
+        `p05` varchar(500) NULL COMMENT "",
+        `p06` varchar(500) NULL COMMENT "",
+        `p07` varchar(500) NULL COMMENT "",
+        `p08` varchar(500) NULL COMMENT "",
+        `p09` varchar(500) NULL COMMENT "",
+        `p10` varchar(500) NULL COMMENT "",
+        `p11` varchar(500) NULL COMMENT "",
+        `p12` varchar(500) NULL COMMENT "",
+        `p13` varchar(500) NULL COMMENT "",
+        `p14` varchar(500) NULL COMMENT "",
+        `p15` varchar(500) NULL COMMENT ""
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`day`, `game`, `plat`)
+        PARTITION BY RANGE(`day`)
+        (PARTITION p201907 VALUES [('2019-07-01'), ('2019-08-01')))
+        DISTRIBUTED BY HASH(`game`, `plat`) BUCKETS 4
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+
+    sql """
+        drop table if exists dim_account_userid_mapping;
+    """
+
+    sql """
+        CREATE TABLE `dim_account_userid_mapping` (
+        `end_time` datetime NOT NULL DEFAULT "9999-12-30 00:00:00",
+        `start_time` datetime NOT NULL DEFAULT "9999-12-30 00:00:00",
+        `game_code` varchar(50) NOT NULL,
+        `plat_code` varchar(50) NOT NULL,
+        `userkey` varchar(255) NOT NULL,
+        `userid` varchar(255) NOT NULL,
+        `account` varchar(255) NOT NULL,
+        `pid_code` varchar(50) NOT NULL DEFAULT "-",
+        `gid_code` varchar(50) NOT NULL DEFAULT "-",
+        `region` varchar(50) NOT NULL DEFAULT "-"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`end_time`, `start_time`, `game_code`, `plat_code`, 
`userkey`)
+        PARTITION BY RANGE(`end_time`)
+        (PARTITION p20190705 VALUES [('2019-07-05 00:00:00'), ('2019-07-06 
00:00:00')))
+        DISTRIBUTED BY HASH(`game_code`, `plat_code`) BUCKETS 4
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "colocate_with" = "gp_group"
+        );
+    """
+
+    sql """
+        drop table if exists ods_login;
+    """
+
+    sql """
+        CREATE TABLE `ods_login` (
+        `day` date NULL COMMENT "",
+        `game` varchar(500) NULL COMMENT "",
+        `plat` varchar(500) NULL COMMENT "",
+        `dt` datetime NULL COMMENT "",
+        `time` bigint(20) NULL COMMENT "",
+        `sid` int(11) NULL COMMENT "",
+        `pid` varchar(500) NULL COMMENT "",
+        `gid` varchar(500) NULL COMMENT "",
+        `account` varchar(500) NULL COMMENT "",
+        `playerid` varchar(500) NULL COMMENT "",
+        `prop` varchar(500) NULL COMMENT "",
+        `p01` varchar(500) NULL COMMENT "",
+        `p02` varchar(500) NULL COMMENT "",
+        `p03` varchar(500) NULL COMMENT "",
+        `p04` varchar(500) NULL COMMENT "",
+        `p05` varchar(500) NULL COMMENT "",
+        `p06` varchar(500) NULL COMMENT "",
+        `p07` varchar(500) NULL COMMENT "",
+        `p08` varchar(500) NULL COMMENT "",
+        `p09` varchar(500) NULL COMMENT "",
+        `p10` varchar(500) NULL COMMENT "",
+        `p11` varchar(500) NULL COMMENT "",
+        `p12` varchar(500) NULL COMMENT "",
+        `p13` varchar(500) NULL COMMENT "",
+        `p14` varchar(500) NULL COMMENT "",
+        `p15` varchar(500) NULL COMMENT ""
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`day`, `game`, `plat`)
+        COMMENT "登录ods"
+        PARTITION BY RANGE(`day`)
+        (PARTITION p201803 VALUES [('2018-03-01'), ('2018-04-01')),
+        PARTITION p201804 VALUES [('2018-04-01'), ('2018-05-01')),
+        PARTITION p201805 VALUES [('2018-05-01'), ('2018-06-01')),
+        PARTITION p201806 VALUES [('2018-06-01'), ('2018-07-01')),
+        PARTITION p201807 VALUES [('2018-07-01'), ('2018-08-01')),
+        PARTITION p201808 VALUES [('2018-08-01'), ('2018-09-01')),
+        PARTITION p201809 VALUES [('2018-09-01'), ('2018-10-01')),
+        PARTITION p201810 VALUES [('2018-10-01'), ('2018-11-01')),
+        PARTITION p201811 VALUES [('2018-11-01'), ('2018-12-01')),
+        PARTITION p201812 VALUES [('2018-12-01'), ('2019-01-01')),
+        PARTITION p201901 VALUES [('2019-01-01'), ('2019-02-01')),
+        PARTITION p201902 VALUES [('2019-02-01'), ('2019-03-01')),
+        PARTITION p201903 VALUES [('2019-03-01'), ('2019-04-01')),
+        PARTITION p201904 VALUES [('2019-04-01'), ('2019-05-01')),
+        PARTITION p201905 VALUES [('2019-05-01'), ('2019-06-01')),
+        PARTITION p201906 VALUES [('2019-06-01'), ('2019-07-01')),
+        PARTITION p201907 VALUES [('2019-07-01'), ('2019-08-01')),
+        PARTITION p201908 VALUES [('2019-08-01'), ('2019-09-01')),
+        PARTITION p201909 VALUES [('2019-09-01'), ('2019-10-01')),
+        PARTITION p201910 VALUES [('2019-10-01'), ('2019-11-01')),
+        PARTITION p201911 VALUES [('2019-11-01'), ('2019-12-01')),
+        PARTITION p201912 VALUES [('2019-12-01'), ('2020-01-01')),
+        PARTITION p202001 VALUES [('2020-01-01'), ('2020-02-01')),
+        PARTITION p202002 VALUES [('2020-02-01'), ('2020-03-01')),
+        PARTITION p202003 VALUES [('2020-03-01'), ('2020-04-01')),
+        PARTITION p202004 VALUES [('2020-04-01'), ('2020-05-01')),
+        PARTITION p202005 VALUES [('2020-05-01'), ('2020-06-01')),
+        PARTITION p202006 VALUES [('2020-06-01'), ('2020-07-01')),
+        PARTITION p202007 VALUES [('2020-07-01'), ('2020-08-01')),
+        PARTITION p202008 VALUES [('2020-08-01'), ('2020-09-01')),
+        PARTITION p202009 VALUES [('2020-09-01'), ('2020-10-01')),
+        PARTITION p202010 VALUES [('2020-10-01'), ('2020-11-01')),
+        PARTITION p202011 VALUES [('2020-11-01'), ('2020-12-01')),
+        PARTITION p202012 VALUES [('2020-12-01'), ('2021-01-01')),
+        PARTITION p202101 VALUES [('2021-01-01'), ('2021-02-01')),
+        PARTITION p202102 VALUES [('2021-02-01'), ('2021-03-01')),
+        PARTITION p202103 VALUES [('2021-03-01'), ('2021-04-01')),
+        PARTITION p202104 VALUES [('2021-04-01'), ('2021-05-01')),
+        PARTITION p202105 VALUES [('2021-05-01'), ('2021-06-01')),
+        PARTITION p202106 VALUES [('2021-06-01'), ('2021-07-01')),
+        PARTITION p202107 VALUES [('2021-07-01'), ('2021-08-01')),
+        PARTITION p202108 VALUES [('2021-08-01'), ('2021-09-01')),
+        PARTITION p202109 VALUES [('2021-09-01'), ('2021-10-01')),
+        PARTITION p202110 VALUES [('2021-10-01'), ('2021-11-01')),
+        PARTITION p202111 VALUES [('2021-11-01'), ('2021-12-01')),
+        PARTITION p202112 VALUES [('2021-12-01'), ('2022-01-01')),
+        PARTITION p202201 VALUES [('2022-01-01'), ('2022-02-01')),
+        PARTITION p202202 VALUES [('2022-02-01'), ('2022-03-01')),
+        PARTITION p202203 VALUES [('2022-03-01'), ('2022-04-01')),
+        PARTITION p202204 VALUES [('2022-04-01'), ('2022-05-01')),
+        PARTITION p202205 VALUES [('2022-05-01'), ('2022-06-01')),
+        PARTITION p202206 VALUES [('2022-06-01'), ('2022-07-01')),
+        PARTITION p202207 VALUES [('2022-07-01'), ('2022-08-01')),
+        PARTITION p202208 VALUES [('2022-08-01'), ('2022-09-01')),
+        PARTITION p202209 VALUES [('2022-09-01'), ('2022-10-01')))
+        DISTRIBUTED BY HASH(`game`, `plat`) BUCKETS 4
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "dynamic_partition.enable" = "true",
+        "dynamic_partition.time_unit" = "MONTH",
+        "dynamic_partition.time_zone" = "Asia/Shanghai",
+        "dynamic_partition.start" = "-2147483648",
+        "dynamic_partition.end" = "3",
+        "dynamic_partition.prefix" = "p",
+        "dynamic_partition.replication_allocation" = "tag.location.default: 1",
+        "dynamic_partition.buckets" = "4",
+        "dynamic_partition.create_history_partition" = "true",
+        "dynamic_partition.history_partition_num" = "50",
+        "dynamic_partition.hot_partition_num" = "2",
+        "dynamic_partition.reserved_history_periods" = "NULL",
+        "dynamic_partition.start_day_of_month" = "1",
+        "in_memory" = "false",
+        "storage_format" = "V2");
+    """
+
+    sql """
+        insert into ods_logout(day, game, plat, playerid, dt) 
values('2019-07-05', 'abc', 'xyz', '1136638398824557', '2019-07-05 00:00:00');
+    """
+
+    sql """
+        insert into dwd_online_detail(game_code, plat_code, playerid, account, 
org_sid, ct_sid, login_time, logout_time, pid_code,gid_code)
+        values('abc', 'xyz', '1577946288488507', '1492704224', '421001', 
'421001', '2020-01-19 11:15:21', '9999-12-30 00:00:00', '-', '-');
+    """
+
+    qt_select """
+        SELECT 
online_detail.game_code,online_detail.plat_code,online_detail.playerid,online_detail.account,online_detail.org_sid
 , online_detail.ct_sid ,
+        online_detail.login_time,if(online_detail.logout_time='9999-12-30 
00:00:00',coalesce(logout.dt,online_detail.next_login_time),online_detail.logout_time)
 logout_time ,online_detail.next_login_time,online_detail.userid
+        ,online_detail.pid_code,online_detail.gid_code
+        from
+                (select
+                        
tmp.game_code,tmp.plat_code,tmp.playerid,tmp.account,tmp.org_sid,tmp.ct_sid,tmp.login_time,tmp.logout_time,
+                        LEAD(tmp.login_time,1, '9999-12-30 00:00:00') over 
(partition by tmp.game_code,tmp.plat_code,tmp.playerid order by tmp.login_time) 
next_login_time,
+                        COALESCE (mp.userid,'-') userid,COALESCE 
(mp.pid_code,'-') pid_code,COALESCE (mp.gid_code,'-') gid_code
+                from
+                        (select * from dim_account_userid_mapping
+                        where   start_time < convert_tz(date_add('2019-07-05 
00:00:00',INTERVAL 1 day),'Asia/Shanghai','Asia/Shanghai')
+                        and end_time >= convert_tz('2019-07-05 
00:00:00','Asia/Shanghai','Asia/Shanghai')
+                        and game_code ='abc'  and plat_code='xyz'
+                        ) mp
+                right join
+                    (
+                    select *,concat_ws('_',pid_code,gid_code,account) userkey 
from
+                    (select 
game_code,plat_code,playerid,account,org_sid,ct_sid,login_time,logout_time,pid_code,gid_code
+                    from dwd_online_detail where logout_time='9999-12-30 
00:00:00' and game_code='abc' and plat_code ='xyz'
+                    union all
+                    select game game_code,plat plat_code,playerid,account,sid 
org_sid,cast(p08 as int) ct_sid,dt login_time,'9999-12-30 00:00:00' 
logout_time,pid pid_code,gid gid_code
+                    from ods_login
+                    where game='abc' and `plat` = 'xyz'
+                    AND  dt BETWEEN convert_tz('2019-07-05 
00:00:00','Asia/Shanghai','Asia/Shanghai')
+                        and convert_tz('2019-07-05 
23:59:59','Asia/Shanghai','Asia/Shanghai')
+                        and day BETWEEN date_sub('2019-07-05',INTERVAL 1 DAY ) 
and date_add('2019-07-05',INTERVAL 1 DAY )
+                    group by 1,2,3,4,5,6,7,8,9,10
+                    ) t
+                    ) tmp
+                    on mp.game_code=tmp.game_code and mp.plat_code = 
tmp.plat_code and mp.userkey = tmp.userkey
+                        and tmp.login_time >= mp.start_time and tmp.login_time 
< mp.end_time
+                ) online_detail
+                left JOIN
+                        (select  day,game game_code,plat plat_code,playerid, dt
+                        from  ods_logout dlt
+                        where game='abc' and `plat` = 'xyz'
+                        and dt BETWEEN convert_tz('2019-07-05 
00:00:00','Asia/Shanghai','Asia/Shanghai')
+                                and convert_tz('2019-07-05 
23:59:59','Asia/Shanghai','Asia/Shanghai')
+                                and day BETWEEN date_sub('2019-07-05',INTERVAL 
1 DAY ) and date_add('2019-07-05',INTERVAL 1 DAY )
+                        group by 1,2,3,4,5
+                        ) logout
+                on  online_detail.game_code=logout.game_code and  
online_detail.plat_code=logout.plat_code
+                and online_detail.playerid=logout.playerid
+                and logout.dt>online_detail.login_time and logout.dt < 
online_detail.next_login_time
+                union all
+                select 
game_code,plat_code,playerid,account,org_sid,ct_sid,login_time,logout_time,next_login_time,userid,pid_code,gid_code
+                from dwd_online_detail
+                where logout_time BETWEEN convert_tz('2019-07-05 
00:00:00','Asia/Shanghai','Asia/Shanghai')
+                                and convert_tz('2019-07-05 
23:59:59','Asia/Shanghai','Asia/Shanghai')
+                        and not (game_code='abc' and `plat_code` = 'xyz'  );
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to