This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch cp_0618_2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 1f74c13ba2ffb3b4cd84dbce9c61c12911ca4713
Author: Pxl <pxl...@qq.com>
AuthorDate: Tue Jun 18 11:13:07 2024 +0800

    [Bug](information-schema) fix some bug of information_schema.PROCESSLIST 
(#36409)
    
    1. information_schema.PROCESSLIST's schema not matched with
    FrontendServiceImpl::showProcessList
    2. rpc failed on when cloudCluster is null
    3. SchemaScanner::fill_dest_column_for_range use uint32_t to store
    datetimev2 data
    ```cpp
    case TYPE_DATETIMEV2: {
                uint32_t num = *reinterpret_cast<uint64_t*>(data);
    ```
---
 be/src/exec/schema_scanner.cpp                     | 61 ++++++++------------
 .../schema_scanner/schema_processlist_scanner.cpp  | 67 ++++++++++------------
 .../java/org/apache/doris/catalog/SchemaTable.java | 24 ++++----
 .../java/org/apache/doris/qe/ConnectContext.java   |  2 +-
 4 files changed, 70 insertions(+), 84 deletions(-)

diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index 75896f1ef66..dc57bfe8ace 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -62,15 +62,12 @@ namespace doris {
 class ObjectPool;
 
 SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns)
-        : _is_init(false),
-          _param(nullptr),
-          _columns(columns),
-          _schema_table_type(TSchemaTableType::SCH_INVALID) {}
+        : _is_init(false), _columns(columns), 
_schema_table_type(TSchemaTableType::SCH_INVALID) {}
 
 SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns, 
TSchemaTableType::type type)
-        : _is_init(false), _param(nullptr), _columns(columns), 
_schema_table_type(type) {}
+        : _is_init(false), _columns(columns), _schema_table_type(type) {}
 
-SchemaScanner::~SchemaScanner() {}
+SchemaScanner::~SchemaScanner() = default;
 
 Status SchemaScanner::start(RuntimeState* state) {
     if (!_is_init) {
@@ -170,7 +167,7 @@ Status 
SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_
     size_t fill_num = datas.size();
     col_ptr = &nullable_column->get_nested_column();
     for (int i = 0; i < fill_num; ++i) {
-        auto data = datas[i];
+        auto* data = datas[i];
         if (data == nullptr) {
             // For nested column need not insert default.
             nullable_column->insert_data(nullptr, 0);
@@ -180,99 +177,89 @@ Status 
SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_
         }
         switch (col_desc.type) {
         case TYPE_HLL: {
-            HyperLogLog* hll_slot = reinterpret_cast<HyperLogLog*>(data);
-            
reinterpret_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot);
+            auto* hll_slot = reinterpret_cast<HyperLogLog*>(data);
+            
assert_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot);
             break;
         }
         case TYPE_VARCHAR:
         case TYPE_CHAR:
         case TYPE_STRING: {
-            StringRef* str_slot = reinterpret_cast<StringRef*>(data);
-            
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data,
-                                                                              
str_slot->size);
+            auto* str_slot = reinterpret_cast<StringRef*>(data);
+            
assert_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data,
+                                                                         
str_slot->size);
             break;
         }
 
         case TYPE_BOOLEAN: {
             uint8_t num = *reinterpret_cast<bool*>(data);
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(
-                    num);
+            
assert_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(num);
             break;
         }
 
         case TYPE_TINYINT: {
             int8_t num = *reinterpret_cast<int8_t*>(data);
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(
-                    num);
+            
assert_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(num);
             break;
         }
 
         case TYPE_SMALLINT: {
             int16_t num = *reinterpret_cast<int16_t*>(data);
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(
-                    num);
+            
assert_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(num);
             break;
         }
 
         case TYPE_INT: {
             int32_t num = *reinterpret_cast<int32_t*>(data);
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(
-                    num);
+            
assert_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(num);
             break;
         }
 
         case TYPE_BIGINT: {
             int64_t num = *reinterpret_cast<int64_t*>(data);
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
-                    num);
+            
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num);
             break;
         }
 
         case TYPE_LARGEINT: {
             __int128 num;
             memcpy(&num, data, sizeof(__int128));
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(
-                    num);
+            
assert_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(num);
             break;
         }
 
         case TYPE_FLOAT: {
             float num = *reinterpret_cast<float*>(data);
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(
-                    num);
+            
assert_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(num);
             break;
         }
 
         case TYPE_DOUBLE: {
             double num = *reinterpret_cast<double*>(data);
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
-                    num);
+            
assert_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(num);
             break;
         }
 
         case TYPE_DATE: {
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
+            
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
                     reinterpret_cast<char*>(data), 0);
             break;
         }
 
         case TYPE_DATEV2: {
             uint32_t num = *reinterpret_cast<uint32_t*>(data);
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt32>*>(col_ptr)->insert_value(
-                    num);
+            assert_cast<vectorized::ColumnDateV2*>(col_ptr)->insert_value(num);
             break;
         }
 
         case TYPE_DATETIME: {
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
+            
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
                     reinterpret_cast<char*>(data), 0);
             break;
         }
 
         case TYPE_DATETIMEV2: {
-            uint32_t num = *reinterpret_cast<uint64_t*>(data);
-            
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)->insert_value(
-                    num);
+            uint64_t num = *reinterpret_cast<uint64_t*>(data);
+            
assert_cast<vectorized::ColumnDateTimeV2*>(col_ptr)->insert_value(num);
             break;
         }
 
@@ -291,14 +278,14 @@ Status 
SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_
 
         case TYPE_DECIMAL32: {
             const int32_t num = *reinterpret_cast<int32_t*>(data);
-            
reinterpret_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data(
+            assert_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data(
                     reinterpret_cast<const char*>(&num), 0);
             break;
         }
 
         case TYPE_DECIMAL64: {
             const int64_t num = *reinterpret_cast<int64_t*>(data);
-            
reinterpret_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data(
+            assert_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data(
                     reinterpret_cast<const char*>(&num), 0);
             break;
         }
diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp 
b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
index d9cbd0f3fb7..d6b3af4d04c 100644
--- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
@@ -19,9 +19,11 @@
 
 #include <gen_cpp/FrontendService_types.h>
 
+#include <exception>
 #include <vector>
 
 #include "exec/schema_scanner/schema_helper.h"
+#include "runtime/define_primitive_type.h"
 #include "runtime/runtime_state.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/block.h"
@@ -30,15 +32,20 @@
 namespace doris {
 
 std::vector<SchemaScanner::ColumnDesc> 
SchemaProcessListScanner::_s_processlist_columns = {
+        {"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false},
         {"ID", TYPE_LARGEINT, sizeof(int128_t), false},
         {"USER", TYPE_VARCHAR, sizeof(StringRef), false},
         {"HOST", TYPE_VARCHAR, sizeof(StringRef), false},
+        {"LOGIN_TIME", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false},
         {"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false},
         {"DB", TYPE_VARCHAR, sizeof(StringRef), false},
         {"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false},
         {"TIME", TYPE_INT, sizeof(int32_t), false},
         {"STATE", TYPE_VARCHAR, sizeof(StringRef), false},
-        {"INFO", TYPE_VARCHAR, sizeof(StringRef), false}};
+        {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
+        {"INFO", TYPE_VARCHAR, sizeof(StringRef), false},
+        {"FE", TYPE_VARCHAR, sizeof(StringRef), false},
+        {"CLOUD_CLUSTER", TYPE_VARCHAR, sizeof(StringRef), false}};
 
 SchemaProcessListScanner::SchemaProcessListScanner()
         : SchemaScanner(_s_processlist_columns, 
TSchemaTableType::SCH_PROCESSLIST) {}
@@ -89,48 +96,36 @@ Status 
SchemaProcessListScanner::_fill_block_impl(vectorized::Block* block) {
 
         for (size_t row_idx = 0; row_idx < row_num; ++row_idx) {
             const auto& row = process_list[row_idx];
+            if (row.size() != _s_processlist_columns.size()) {
+                return Status::InternalError(
+                        "process list meet invalid schema, schema_size={}, 
input_data_size={}",
+                        _s_processlist_columns.size(), row.size());
+            }
 
             // Fetch and store the column value based on its index
             std::string& column_value =
                     column_values[row_idx]; // Reference to the actual string 
in the vector
-
-            switch (col_idx) {
-            case 0:
-                column_value = row.size() > 1 ? row[1] : "";
-                break; // ID
-            case 1:
-                column_value = row.size() > 2 ? row[2] : "";
-                break; // USER
-            case 2:
-                column_value = row.size() > 3 ? row[3] : "";
-                break; // HOST
-            case 3:
-                column_value = row.size() > 5 ? row[5] : "";
-                break; // CATALOG
-            case 4:
-                column_value = row.size() > 6 ? row[6] : "";
-                break; // DB
-            case 5:
-                column_value = row.size() > 7 ? row[7] : "";
-                break; // COMMAND
-            case 6:
-                column_value = row.size() > 8 ? row[8] : "";
-                break; // TIME
-            case 7:
-                column_value = row.size() > 9 ? row[9] : "";
-                break; // STATE
-            case 8:
-                column_value = row.size() > 11 ? row[11] : "";
-                break; // INFO
-            default:
-                column_value = "";
-                break;
-            }
+            column_value = row[col_idx];
 
             if (_s_processlist_columns[col_idx].type == TYPE_LARGEINT ||
                 _s_processlist_columns[col_idx].type == TYPE_INT) {
-                int128_t val = !column_value.empty() ? 
std::stoll(column_value) : 0;
-                int_vals[row_idx] = val;
+                try {
+                    int128_t val = !column_value.empty() ? 
std::stoll(column_value) : 0;
+                    int_vals[row_idx] = val;
+                } catch (const std::exception& e) {
+                    return Status::InternalError(
+                            "process list meet invalid data, column={}, 
data={}, reason={}",
+                            _s_processlist_columns[col_idx].name, 
column_value, e.what());
+                }
+                datas[row_idx] = &int_vals[row_idx];
+            } else if (_s_processlist_columns[col_idx].type == 
TYPE_DATETIMEV2) {
+                auto* dv = 
reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(&int_vals[row_idx]);
+                if (!dv->from_date_str(column_value.data(), 
column_value.size(), -1,
+                                       config::allow_zero_date)) {
+                    return Status::InternalError(
+                            "process list meet invalid data, column={}, 
data={}, reason={}",
+                            _s_processlist_columns[col_idx].name, 
column_value);
+                }
                 datas[row_idx] = &int_vals[row_idx];
             } else {
                 str_refs[row_idx] =
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 164495c4896..b8212259153 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -446,16 +446,20 @@ public class SchemaTable extends Table {
                             .column("SOURCE_LINE", 
ScalarType.createType(PrimitiveType.INT))
                             .build()))
             .put("processlist", new SchemaTable(SystemIdGenerator.getNextId(), 
"processlist", TableType.SCHEMA,
-                    builder().column("ID", 
ScalarType.createType(PrimitiveType.LARGEINT))
-                            .column("USER", ScalarType.createVarchar(32))
-                            .column("HOST", ScalarType.createVarchar(261))
-                            .column("CATALOG", ScalarType.createVarchar(64))
-                            .column("DB", ScalarType.createVarchar(64))
-                            .column("COMMAND", ScalarType.createVarchar(16))
-                            .column("TIME", 
ScalarType.createType(PrimitiveType.INT))
-                            .column("STATE", ScalarType.createVarchar(64))
-                            .column("INFO", 
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
-                            .build()))
+                    builder().column("CURRENT_CONNECTED", 
ScalarType.createVarchar(16))
+                    .column("ID", 
ScalarType.createType(PrimitiveType.LARGEINT))
+                    .column("USER", ScalarType.createVarchar(32))
+                    .column("HOST", ScalarType.createVarchar(261))
+                    .column("LOGIN_TIME", 
ScalarType.createType(PrimitiveType.DATETIMEV2))
+                    .column("CATALOG", ScalarType.createVarchar(64))
+                    .column("DB", ScalarType.createVarchar(64))
+                    .column("COMMAND", ScalarType.createVarchar(16))
+                    .column("TIME", ScalarType.createType(PrimitiveType.INT))
+                    .column("STATE", ScalarType.createVarchar(64))
+                    .column("QUERY_ID", ScalarType.createVarchar(256))
+                    .column("INFO", 
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
+                    .column("FE",
+                            ScalarType.createVarchar(64))))
             .build();
 
     protected SchemaTable(long id, String name, TableType type, List<Column> 
baseSchema) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 2840021aea9..508691f29a3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -830,7 +830,7 @@ public class ConnectContext {
             if (connId == connectionId) {
                 row.add("Yes");
             } else {
-                row.add("");
+                row.add("No");
             }
             row.add("" + connectionId);
             row.add(ClusterNamespace.getNameFromFullName(qualifiedUser));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to