This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2917124ebd7 [Bug](information-schema) fix some bug of information_schema.PROCESSLIST (#36409) 2917124ebd7 is described below commit 2917124ebd7a46ad905bb8c4950a63376ac0da38 Author: Pxl <pxl...@qq.com> AuthorDate: Tue Jun 18 11:13:07 2024 +0800 [Bug](information-schema) fix some bug of information_schema.PROCESSLIST (#36409) ## Proposed changes 1. information_schema.PROCESSLIST's schema not matched with FrontendServiceImpl::showProcessList 2. rpc failed on when cloudCluster is null 3. SchemaScanner::fill_dest_column_for_range use uint32_t to store datetimev2 data ```cpp case TYPE_DATETIMEV2: { uint32_t num = *reinterpret_cast<uint64_t*>(data); ``` --- be/src/exec/schema_scanner.cpp | 65 +++++++++------------ .../schema_scanner/schema_processlist_scanner.cpp | 67 ++++++++++------------ be/src/pipeline/exec/schema_scan_operator.cpp | 27 ++++----- .../java/org/apache/doris/catalog/SchemaTable.java | 31 +++++----- .../java/org/apache/doris/qe/ConnectContext.java | 8 ++- .../info_schema_db/test_info_schema_db.groovy | 2 + 6 files changed, 95 insertions(+), 105 deletions(-) diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 5f39ff6657d..de9857bad2c 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -70,15 +70,12 @@ namespace doris { class ObjectPool; SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns) - : _is_init(false), - _param(nullptr), - _columns(columns), - _schema_table_type(TSchemaTableType::SCH_INVALID) {} + : _is_init(false), _columns(columns), _schema_table_type(TSchemaTableType::SCH_INVALID) {} SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns, TSchemaTableType::type type) - : _is_init(false), _param(nullptr), _columns(columns), _schema_table_type(type) {} + : _is_init(false), _columns(columns), _schema_table_type(type) {} -SchemaScanner::~SchemaScanner() {} +SchemaScanner::~SchemaScanner() = default; Status SchemaScanner::start(RuntimeState* state) { if (!_is_init) { @@ -192,7 +189,7 @@ Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_ size_t fill_num = datas.size(); col_ptr = &nullable_column->get_nested_column(); for (int i = 0; i < fill_num; ++i) { - auto data = datas[i]; + auto* data = datas[i]; if (data == nullptr) { // For nested column need not insert default. nullable_column->insert_data(nullptr, 0); @@ -202,125 +199,115 @@ Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_ } switch (col_desc.type) { case TYPE_HLL: { - HyperLogLog* hll_slot = reinterpret_cast<HyperLogLog*>(data); - reinterpret_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot); + auto* hll_slot = reinterpret_cast<HyperLogLog*>(data); + assert_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot); break; } case TYPE_VARCHAR: case TYPE_CHAR: case TYPE_STRING: { - StringRef* str_slot = reinterpret_cast<StringRef*>(data); - reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data, - str_slot->size); + auto* str_slot = reinterpret_cast<StringRef*>(data); + assert_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data, + str_slot->size); break; } case TYPE_BOOLEAN: { uint8_t num = *reinterpret_cast<bool*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(num); break; } case TYPE_TINYINT: { int8_t num = *reinterpret_cast<int8_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(num); break; } case TYPE_SMALLINT: { int16_t num = *reinterpret_cast<int16_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(num); break; } case TYPE_INT: { int32_t num = *reinterpret_cast<int32_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(num); break; } case TYPE_BIGINT: { int64_t num = *reinterpret_cast<int64_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num); break; } case TYPE_LARGEINT: { __int128 num; memcpy(&num, data, sizeof(__int128)); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(num); break; } case TYPE_FLOAT: { float num = *reinterpret_cast<float*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(num); break; } case TYPE_DOUBLE: { double num = *reinterpret_cast<double*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(num); break; } case TYPE_DATE: { - reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data( + assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data( reinterpret_cast<char*>(data), 0); break; } case TYPE_DATEV2: { uint32_t num = *reinterpret_cast<uint32_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::UInt32>*>(col_ptr)->insert_value( - num); + assert_cast<vectorized::ColumnDateV2*>(col_ptr)->insert_value(num); break; } case TYPE_DATETIME: { - reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data( + assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data( reinterpret_cast<char*>(data), 0); break; } case TYPE_DATETIMEV2: { - uint32_t num = *reinterpret_cast<uint64_t*>(data); - reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)->insert_value( - num); + uint64_t num = *reinterpret_cast<uint64_t*>(data); + assert_cast<vectorized::ColumnDateTimeV2*>(col_ptr)->insert_value(num); break; } case TYPE_DECIMALV2: { const vectorized::Int128 num = (reinterpret_cast<PackedInt128*>(data))->value; - reinterpret_cast<vectorized::ColumnDecimal128V2*>(col_ptr)->insert_data( + assert_cast<vectorized::ColumnDecimal128V2*>(col_ptr)->insert_data( reinterpret_cast<const char*>(&num), 0); break; } case TYPE_DECIMAL128I: { const vectorized::Int128 num = (reinterpret_cast<PackedInt128*>(data))->value; - reinterpret_cast<vectorized::ColumnDecimal128V3*>(col_ptr)->insert_data( + assert_cast<vectorized::ColumnDecimal128V3*>(col_ptr)->insert_data( reinterpret_cast<const char*>(&num), 0); break; } case TYPE_DECIMAL32: { const int32_t num = *reinterpret_cast<int32_t*>(data); - reinterpret_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data( + assert_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data( reinterpret_cast<const char*>(&num), 0); break; } case TYPE_DECIMAL64: { const int64_t num = *reinterpret_cast<int64_t*>(data); - reinterpret_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data( + assert_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data( reinterpret_cast<const char*>(&num), 0); break; } diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp index 2ecc2be9e01..0f270a6a8c1 100644 --- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp @@ -19,9 +19,11 @@ #include <gen_cpp/FrontendService_types.h> +#include <exception> #include <vector> #include "exec/schema_scanner/schema_helper.h" +#include "runtime/define_primitive_type.h" #include "runtime/runtime_state.h" #include "vec/common/string_ref.h" #include "vec/core/block.h" @@ -30,15 +32,20 @@ namespace doris { std::vector<SchemaScanner::ColumnDesc> SchemaProcessListScanner::_s_processlist_columns = { + {"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false}, {"ID", TYPE_LARGEINT, sizeof(int128_t), false}, {"USER", TYPE_VARCHAR, sizeof(StringRef), false}, {"HOST", TYPE_VARCHAR, sizeof(StringRef), false}, + {"LOGIN_TIME", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false}, {"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false}, {"DB", TYPE_VARCHAR, sizeof(StringRef), false}, {"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false}, {"TIME", TYPE_INT, sizeof(int32_t), false}, {"STATE", TYPE_VARCHAR, sizeof(StringRef), false}, - {"INFO", TYPE_VARCHAR, sizeof(StringRef), false}}; + {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false}, + {"INFO", TYPE_VARCHAR, sizeof(StringRef), false}, + {"FE", TYPE_VARCHAR, sizeof(StringRef), false}, + {"CLOUD_CLUSTER", TYPE_VARCHAR, sizeof(StringRef), false}}; SchemaProcessListScanner::SchemaProcessListScanner() : SchemaScanner(_s_processlist_columns, TSchemaTableType::SCH_PROCESSLIST) {} @@ -90,48 +97,36 @@ Status SchemaProcessListScanner::_fill_block_impl(vectorized::Block* block) { for (size_t row_idx = 0; row_idx < row_num; ++row_idx) { const auto& row = process_list[row_idx]; + if (row.size() != _s_processlist_columns.size()) { + return Status::InternalError( + "process list meet invalid schema, schema_size={}, input_data_size={}", + _s_processlist_columns.size(), row.size()); + } // Fetch and store the column value based on its index std::string& column_value = column_values[row_idx]; // Reference to the actual string in the vector - - switch (col_idx) { - case 0: - column_value = row.size() > 1 ? row[1] : ""; - break; // ID - case 1: - column_value = row.size() > 2 ? row[2] : ""; - break; // USER - case 2: - column_value = row.size() > 3 ? row[3] : ""; - break; // HOST - case 3: - column_value = row.size() > 5 ? row[5] : ""; - break; // CATALOG - case 4: - column_value = row.size() > 6 ? row[6] : ""; - break; // DB - case 5: - column_value = row.size() > 7 ? row[7] : ""; - break; // COMMAND - case 6: - column_value = row.size() > 8 ? row[8] : ""; - break; // TIME - case 7: - column_value = row.size() > 9 ? row[9] : ""; - break; // STATE - case 8: - column_value = row.size() > 11 ? row[11] : ""; - break; // INFO - default: - column_value = ""; - break; - } + column_value = row[col_idx]; if (_s_processlist_columns[col_idx].type == TYPE_LARGEINT || _s_processlist_columns[col_idx].type == TYPE_INT) { - int128_t val = !column_value.empty() ? std::stoll(column_value) : 0; - int_vals[row_idx] = val; + try { + int128_t val = !column_value.empty() ? std::stoll(column_value) : 0; + int_vals[row_idx] = val; + } catch (const std::exception& e) { + return Status::InternalError( + "process list meet invalid data, column={}, data={}, reason={}", + _s_processlist_columns[col_idx].name, column_value, e.what()); + } + datas[row_idx] = &int_vals[row_idx]; + } else if (_s_processlist_columns[col_idx].type == TYPE_DATETIMEV2) { + auto* dv = reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(&int_vals[row_idx]); + if (!dv->from_date_str(column_value.data(), column_value.size(), -1, + config::allow_zero_date)) { + return Status::InternalError( + "process list meet invalid data, column={}, data={}, reason={}", + _s_processlist_columns[col_idx].name, column_value); + } datas[row_idx] = &int_vals[row_idx]; } else { str_refs[row_idx] = diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index e4a3a8c1ca2..f4658988095 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -39,11 +39,11 @@ Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { auto& p = _parent->cast<SchemaScanOperatorX>(); _scanner_param.common_param = p._common_scanner_param; // init schema scanner profile - _scanner_param.profile.reset(new RuntimeProfile("SchemaScanner")); + _scanner_param.profile = std::make_unique<RuntimeProfile>("SchemaScanner"); profile()->add_child(_scanner_param.profile.get(), true, nullptr); // get src tuple desc - const SchemaTableDescriptor* schema_table = + const auto* schema_table = static_cast<const SchemaTableDescriptor*>(p._dest_tuple_desc->table_desc()); // new one scanner _schema_scanner = SchemaScanner::create(schema_table->schema_table_type()); @@ -68,7 +68,6 @@ SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnod _table_name(tnode.schema_scan_node.table_name), _common_scanner_param(new SchemaScannerCommonParam()), _tuple_id(tnode.schema_scan_node.tuple_id), - _dest_tuple_desc(nullptr), _tuple_idx(0), _slot_num(0) {} @@ -149,7 +148,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) { _slot_num = _dest_tuple_desc->slots().size(); // get src tuple desc - const SchemaTableDescriptor* schema_table = + const auto* schema_table = static_cast<const SchemaTableDescriptor*>(_dest_tuple_desc->table_desc()); if (nullptr == schema_table) { @@ -166,7 +165,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) { const std::vector<SchemaScanner::ColumnDesc>& columns_desc(_schema_scanner->get_column_desc()); // if src columns size is zero, it's the dummy slots. - if (0 == columns_desc.size()) { + if (columns_desc.empty()) { _slot_num = 0; } @@ -180,17 +179,15 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) { } if (j >= columns_desc.size()) { - LOG(WARNING) << "no match column for this column(" - << _dest_tuple_desc->slots()[i]->col_name() << ")"; - return Status::InternalError("no match column for this column."); + return Status::InternalError("no match column for this column({})", + _dest_tuple_desc->slots()[i]->col_name()); } if (columns_desc[j].type != _dest_tuple_desc->slots()[i]->type().type) { - LOG(WARNING) << "schema not match. input is " << columns_desc[j].name << "(" - << columns_desc[j].type << ") and output is " - << _dest_tuple_desc->slots()[i]->col_name() << "(" - << _dest_tuple_desc->slots()[i]->type() << ")"; - return Status::InternalError("schema not match."); + return Status::InternalError("schema not match. input is {}({}) and output is {}({})", + columns_desc[j].name, type_to_string(columns_desc[j].type), + _dest_tuple_desc->slots()[i]->col_name(), + type_to_string(_dest_tuple_desc->slots()[i]->type().type)); } } @@ -211,7 +208,7 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl do { block->clear(); for (int i = 0; i < _slot_num; ++i) { - auto dest_slot_desc = _dest_tuple_desc->slots()[i]; + auto* dest_slot_desc = _dest_tuple_desc->slots()[i]; block->insert(vectorized::ColumnWithTypeAndName( dest_slot_desc->get_empty_mutable_column(), dest_slot_desc->get_data_type_ptr(), dest_slot_desc->col_name())); @@ -245,7 +242,7 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl if (src_block.rows()) { // block->check_number_of_rows(); for (int i = 0; i < _slot_num; ++i) { - auto dest_slot_desc = _dest_tuple_desc->slots()[i]; + auto* dest_slot_desc = _dest_tuple_desc->slots()[i]; vectorized::MutableColumnPtr column_ptr = std::move(*block->get_by_position(i).column).mutate(); column_ptr->insert_range_from( diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 6484d12eb15..cdcd02a7c56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -490,17 +490,23 @@ public class SchemaTable extends Table { .column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256)) .column("TAG", ScalarType.createVarchar(256)) .build())) - .put("processlist", new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA, - builder().column("ID", ScalarType.createType(PrimitiveType.LARGEINT)) - .column("USER", ScalarType.createVarchar(32)) - .column("HOST", ScalarType.createVarchar(261)) - .column("CATALOG", ScalarType.createVarchar(64)) - .column("DB", ScalarType.createVarchar(64)) - .column("COMMAND", ScalarType.createVarchar(16)) - .column("TIME", ScalarType.createType(PrimitiveType.INT)) - .column("STATE", ScalarType.createVarchar(64)) - .column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) - .build())) + .put("processlist", + new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA, + builder().column("CURRENT_CONNECTED", ScalarType.createVarchar(16)) + .column("ID", ScalarType.createType(PrimitiveType.LARGEINT)) + .column("USER", ScalarType.createVarchar(32)) + .column("HOST", ScalarType.createVarchar(261)) + .column("LOGIN_TIME", ScalarType.createType(PrimitiveType.DATETIMEV2)) + .column("CATALOG", ScalarType.createVarchar(64)) + .column("DB", ScalarType.createVarchar(64)) + .column("COMMAND", ScalarType.createVarchar(16)) + .column("TIME", ScalarType.createType(PrimitiveType.INT)) + .column("STATE", ScalarType.createVarchar(64)) + .column("QUERY_ID", ScalarType.createVarchar(256)) + .column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) + .column("FE", + ScalarType.createVarchar(64)) + .column("CLOUD_CLUSTER", ScalarType.createVarchar(64)).build())) .put("workload_policy", new SchemaTable(SystemIdGenerator.getNextId(), "workload_policy", TableType.SCHEMA, builder().column("ID", ScalarType.createType(PrimitiveType.BIGINT)) @@ -510,8 +516,7 @@ public class SchemaTable extends Table { .column("PRIORITY", ScalarType.createType(PrimitiveType.INT)) .column("ENABLED", ScalarType.createType(PrimitiveType.BOOLEAN)) .column("VERSION", ScalarType.createType(PrimitiveType.INT)) - .column("WORKLOAD_GROUP", ScalarType.createStringType()) - .build())) + .column("WORKLOAD_GROUP", ScalarType.createStringType()).build())) .put("table_options", new SchemaTable(SystemIdGenerator.getNextId(), "table_options", TableType.SCHEMA, builder().column("TABLE_NAME", ScalarType.createVarchar(NAME_CHAR_LEN)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 81be929bd93..b02d895ff2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -1030,7 +1030,7 @@ public class ConnectContext { if (connId == connectionId) { row.add("Yes"); } else { - row.add(""); + row.add("No"); } row.add("" + connectionId); row.add(ClusterNamespace.getNameFromFullName(qualifiedUser)); @@ -1055,7 +1055,11 @@ public class ConnectContext { } row.add(Env.getCurrentEnv().getSelfNode().getHost()); - row.add(cloudCluster); + if (cloudCluster == null) { + row.add("NULL"); + } else { + row.add(cloudCluster); + } return row; } } diff --git a/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy b/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy index ac85abe8480..1ad8fc40687 100644 --- a/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy +++ b/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy @@ -131,4 +131,6 @@ suite("test_info_schema_db", "p0,external,hive,external_docker,external_docker_h qt_sql116 """select table_catalog, table_schema, table_name from information_schema.tables where table_schema='${innerdb}'""" qt_sql117 """select table_catalog, table_schema, table_name from ${catalog_name}.information_schema.columns where table_schema='tpch1_parquet'""" qt_sql118 """select table_catalog, table_schema, table_name from ${catalog_name}.INFORMATION_SCHEMA.COLUMNS where TABLE_SCHEMA='tpch1_parquet'""" + + sql "select * from information_schema.PROCESSLIST;" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org