This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch cp_0618_2 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 1f74c13ba2ffb3b4cd84dbce9c61c12911ca4713 Author: Pxl <pxl...@qq.com> AuthorDate: Tue Jun 18 11:13:07 2024 +0800 [Bug](information-schema) fix some bug of information_schema.PROCESSLIST (#36409) 1. information_schema.PROCESSLIST's schema not matched with FrontendServiceImpl::showProcessList 2. rpc failed on when cloudCluster is null 3. SchemaScanner::fill_dest_column_for_range use uint32_t to store datetimev2 data ```cpp case TYPE_DATETIMEV2: { uint32_t num = *reinterpret_cast<uint64_t*>(data); ``` --- be/src/exec/schema_scanner.cpp | 61 ++++++++------------ .../schema_scanner/schema_processlist_scanner.cpp | 67 ++++++++++------------ .../java/org/apache/doris/catalog/SchemaTable.java | 24 ++++---- .../java/org/apache/doris/qe/ConnectContext.java | 2 +- 4 files changed, 70 insertions(+), 84 deletions(-) diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 75896f1ef66..dc57bfe8ace 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -62,15 +62,12 @@ namespace doris { class ObjectPool; SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns) - : _is_init(false), - _param(nullptr), - _columns(columns), - _schema_table_type(TSchemaTableType::SCH_INVALID) {} + : _is_init(false), _columns(columns), _schema_table_type(TSchemaTableType::SCH_INVALID) {} SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns, TSchemaTableType::type type) - : _is_init(false), _param(nullptr), _columns(columns), _schema_table_type(type) {} + : _is_init(false), _columns(columns), _schema_table_type(type) {} -SchemaScanner::~SchemaScanner() {} +SchemaScanner::~SchemaScanner() = default; Status SchemaScanner::start(RuntimeState* state) { if (!_is_init) { @@ -170,7 +167,7 @@ Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_ size_t fill_num = datas.size(); col_ptr = &nullable_column->get_nested_column(); for (int i = 0; i < fill_num; ++i) { - auto data = datas[i]; + auto* data = datas[i]; if (data == nullptr) { // For nested column need not insert default. nullable_column->insert_data(nullptr, 0); @@ -180,99 +177,89 @@ Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_ } switch (col_desc.type) { case TYPE_HLL: { - HyperLogLog* hll_slot = reinterpret_cast<HyperLogLog*>(data); - reinterpret_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot); + auto* hll_slot = reinterpret_cast<HyperLogLog*>(data); + assert_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot); break; } case TYPE_VARCHAR: case TYPE_CHAR: case TYPE_STRING: { - StringRef* str_slot = reinterpret_cast<StringRef*>(data); - reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data, - str_slot->size); + auto* str_slot = reinterpret_cast<StringRef*>(data); + assert_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data, + str_slot->size); break; } case TYPE_BOOLEAN: { uint8_t num = *reinterpret_cast<bool*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(num); break; } case TYPE_TINYINT: { int8_t num = *reinterpret_cast<int8_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(num); break; } case TYPE_SMALLINT: { int16_t num = *reinterpret_cast<int16_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(num); break; } case TYPE_INT: { int32_t num = *reinterpret_cast<int32_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(num); break; } case TYPE_BIGINT: { int64_t num = *reinterpret_cast<int64_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num); break; } case TYPE_LARGEINT: { __int128 num; memcpy(&num, data, sizeof(__int128)); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(num); break; } case TYPE_FLOAT: { float num = *reinterpret_cast<float*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(num); break; } case TYPE_DOUBLE: { double num = *reinterpret_cast<double*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(num); break; } case TYPE_DATE: { - reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data( + assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data( reinterpret_cast<char*>(data), 0); break; } case TYPE_DATEV2: { uint32_t num = *reinterpret_cast<uint32_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::UInt32>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnDateV2*>(col_ptr)->insert_value(num); break; } case TYPE_DATETIME: { - reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data( + assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data( reinterpret_cast<char*>(data), 0); break; } case TYPE_DATETIMEV2: { - uint32_t num = *reinterpret_cast<uint64_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)->insert_value( - num); + uint64_t num = *reinterpret_cast<uint64_t*>(data); + assert_cast<vectorized::ColumnDateTimeV2*>(col_ptr)->insert_value(num); break; } @@ -291,14 +278,14 @@ Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_ case TYPE_DECIMAL32: { const int32_t num = *reinterpret_cast<int32_t*>(data); - reinterpret_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data( + assert_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data( reinterpret_cast<const char*>(&num), 0); break; } case TYPE_DECIMAL64: { const int64_t num = *reinterpret_cast<int64_t*>(data); - reinterpret_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data( + assert_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data( reinterpret_cast<const char*>(&num), 0); break; } diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp index d9cbd0f3fb7..d6b3af4d04c 100644 --- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp @@ -19,9 +19,11 @@ #include <gen_cpp/FrontendService_types.h> +#include <exception> #include <vector> #include "exec/schema_scanner/schema_helper.h" +#include "runtime/define_primitive_type.h" #include "runtime/runtime_state.h" #include "vec/common/string_ref.h" #include "vec/core/block.h" @@ -30,15 +32,20 @@ namespace doris { std::vector<SchemaScanner::ColumnDesc> SchemaProcessListScanner::_s_processlist_columns = { + {"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false}, {"ID", TYPE_LARGEINT, sizeof(int128_t), false}, {"USER", TYPE_VARCHAR, sizeof(StringRef), false}, {"HOST", TYPE_VARCHAR, sizeof(StringRef), false}, + {"LOGIN_TIME", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false}, {"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false}, {"DB", TYPE_VARCHAR, sizeof(StringRef), false}, {"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false}, {"TIME", TYPE_INT, sizeof(int32_t), false}, {"STATE", TYPE_VARCHAR, sizeof(StringRef), false}, - {"INFO", TYPE_VARCHAR, sizeof(StringRef), false}}; + {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false}, + {"INFO", TYPE_VARCHAR, sizeof(StringRef), false}, + {"FE", TYPE_VARCHAR, sizeof(StringRef), false}, + {"CLOUD_CLUSTER", TYPE_VARCHAR, sizeof(StringRef), false}}; SchemaProcessListScanner::SchemaProcessListScanner() : SchemaScanner(_s_processlist_columns, TSchemaTableType::SCH_PROCESSLIST) {} @@ -89,48 +96,36 @@ Status SchemaProcessListScanner::_fill_block_impl(vectorized::Block* block) { for (size_t row_idx = 0; row_idx < row_num; ++row_idx) { const auto& row = process_list[row_idx]; + if (row.size() != _s_processlist_columns.size()) { + return Status::InternalError( + "process list meet invalid schema, schema_size={}, input_data_size={}", + _s_processlist_columns.size(), row.size()); + } // Fetch and store the column value based on its index std::string& column_value = column_values[row_idx]; // Reference to the actual string in the vector - - switch (col_idx) { - case 0: - column_value = row.size() > 1 ? row[1] : ""; - break; // ID - case 1: - column_value = row.size() > 2 ? row[2] : ""; - break; // USER - case 2: - column_value = row.size() > 3 ? row[3] : ""; - break; // HOST - case 3: - column_value = row.size() > 5 ? row[5] : ""; - break; // CATALOG - case 4: - column_value = row.size() > 6 ? row[6] : ""; - break; // DB - case 5: - column_value = row.size() > 7 ? row[7] : ""; - break; // COMMAND - case 6: - column_value = row.size() > 8 ? row[8] : ""; - break; // TIME - case 7: - column_value = row.size() > 9 ? row[9] : ""; - break; // STATE - case 8: - column_value = row.size() > 11 ? row[11] : ""; - break; // INFO - default: - column_value = ""; - break; - } + column_value = row[col_idx]; if (_s_processlist_columns[col_idx].type == TYPE_LARGEINT || _s_processlist_columns[col_idx].type == TYPE_INT) { - int128_t val = !column_value.empty() ? std::stoll(column_value) : 0; - int_vals[row_idx] = val; + try { + int128_t val = !column_value.empty() ? std::stoll(column_value) : 0; + int_vals[row_idx] = val; + } catch (const std::exception& e) { + return Status::InternalError( + "process list meet invalid data, column={}, data={}, reason={}", + _s_processlist_columns[col_idx].name, column_value, e.what()); + } + datas[row_idx] = &int_vals[row_idx]; + } else if (_s_processlist_columns[col_idx].type == TYPE_DATETIMEV2) { + auto* dv = reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(&int_vals[row_idx]); + if (!dv->from_date_str(column_value.data(), column_value.size(), -1, + config::allow_zero_date)) { + return Status::InternalError( + "process list meet invalid data, column={}, data={}, reason={}", + _s_processlist_columns[col_idx].name, column_value); + } datas[row_idx] = &int_vals[row_idx]; } else { str_refs[row_idx] = diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 164495c4896..b8212259153 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -446,16 +446,20 @@ public class SchemaTable extends Table { .column("SOURCE_LINE", ScalarType.createType(PrimitiveType.INT)) .build())) .put("processlist", new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA, - builder().column("ID", ScalarType.createType(PrimitiveType.LARGEINT)) - .column("USER", ScalarType.createVarchar(32)) - .column("HOST", ScalarType.createVarchar(261)) - .column("CATALOG", ScalarType.createVarchar(64)) - .column("DB", ScalarType.createVarchar(64)) - .column("COMMAND", ScalarType.createVarchar(16)) - .column("TIME", ScalarType.createType(PrimitiveType.INT)) - .column("STATE", ScalarType.createVarchar(64)) - .column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) - .build())) + builder().column("CURRENT_CONNECTED", ScalarType.createVarchar(16)) + .column("ID", ScalarType.createType(PrimitiveType.LARGEINT)) + .column("USER", ScalarType.createVarchar(32)) + .column("HOST", ScalarType.createVarchar(261)) + .column("LOGIN_TIME", ScalarType.createType(PrimitiveType.DATETIMEV2)) + .column("CATALOG", ScalarType.createVarchar(64)) + .column("DB", ScalarType.createVarchar(64)) + .column("COMMAND", ScalarType.createVarchar(16)) + .column("TIME", ScalarType.createType(PrimitiveType.INT)) + .column("STATE", ScalarType.createVarchar(64)) + .column("QUERY_ID", ScalarType.createVarchar(256)) + .column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) + .column("FE", + ScalarType.createVarchar(64)))) .build(); protected SchemaTable(long id, String name, TableType type, List<Column> baseSchema) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 2840021aea9..508691f29a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -830,7 +830,7 @@ public class ConnectContext { if (connId == connectionId) { row.add("Yes"); } else { - row.add(""); + row.add("No"); } row.add("" + connectionId); row.add(ClusterNamespace.getNameFromFullName(qualifiedUser)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org