This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 573b594df39 [improvement](Variant Type) Support displaying subcolumns expanded for the variant column (#27764) 573b594df39 is described below commit 573b594df39913e186b1e86e39ea23be32578f2c Author: Sun Chenyang <csun5...@gmail.com> AuthorDate: Fri Dec 8 20:34:58 2023 +0800 [improvement](Variant Type) Support displaying subcolumns expanded for the variant column (#27764) --- be/src/olap/rowset_builder.cpp | 14 +- be/src/olap/tablet_schema.cpp | 27 +++ be/src/olap/tablet_schema.h | 6 + be/src/service/internal_service.cpp | 112 +++++++++++ be/src/service/internal_service.h | 4 + docs/en/docs/advanced/variables.md | 4 + docs/zh-CN/docs/advanced/variables.md | 4 + .../main/java/org/apache/doris/catalog/Type.java | 42 ++++ fe/fe-core/src/main/cup/sql_parser.cup | 4 +- .../org/apache/doris/analysis/DescribeStmt.java | 29 ++- .../org/apache/doris/catalog/AggregateType.java | 20 ++ .../java/org/apache/doris/catalog/OlapTable.java | 10 + .../apache/doris/common/proc/IndexInfoProcDir.java | 5 + .../doris/common/proc/IndexSchemaProcNode.java | 8 +- .../common/proc/RemoteIndexSchemaProcDir.java | 113 +++++++++++ .../common/proc/RemoteIndexSchemaProcNode.java | 72 +++++++ .../common/util/FetchRemoteTabletSchemaUtil.java | 220 ++++++++++++++++++++ .../java/org/apache/doris/qe/SessionVariable.java | 21 ++ .../org/apache/doris/rpc/BackendServiceClient.java | 5 + .../org/apache/doris/rpc/BackendServiceProxy.java | 12 ++ gensrc/proto/internal_service.proto | 18 ++ regression-test/data/variant_p0/desc.out | 196 ++++++++++++++++++ regression-test/suites/variant_p0/desc.groovy | 224 +++++++++++++++++++++ 23 files changed, 1163 insertions(+), 7 deletions(-) diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 4675d668f41..c9da7359a31 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -341,7 +341,19 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, indexes[i], ori_tablet_schema); } if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) { - _tablet->update_max_version_schema(_tablet_schema); + // After schema change, should include extracted column + // For example: a table has two columns, k and v + // After adding a column v2, the schema version increases, max_version_schema needs to be updated. + // _tablet_schema includes k, v, and v2 + // if v is a variant, need to add the columns decomposed from the v to the _tablet_schema. + if (_tablet_schema->num_variant_columns() > 0) { + TabletSchemaSPtr max_version_schema = std::make_shared<TabletSchema>(); + max_version_schema->copy_from(*_tablet_schema); + max_version_schema->copy_extracted_columns(ori_tablet_schema); + _tablet->update_max_version_schema(max_version_schema); + } else { + _tablet->update_max_version_schema(_tablet_schema); + } } _tablet_schema->set_table_id(table_schema_param->table_id()); diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index e8f9d5d52b8..7d1abd8aa91 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -1031,6 +1031,33 @@ bool TabletSchema::is_dropped_column(const TabletColumn& col) const { column(col.name()).unique_id() != col.unique_id(); } +void TabletSchema::copy_extracted_columns(const TabletSchema& src_schema) { + std::unordered_set<int32_t> variant_columns; + for (const auto& col : columns()) { + if (col.is_variant_type()) { + variant_columns.insert(col.unique_id()); + } + } + for (const TabletColumn& col : src_schema.columns()) { + if (col.is_extracted_column() && variant_columns.contains(col.parent_unique_id())) { + ColumnPB col_pb; + col.to_schema_pb(&col_pb); + TabletColumn new_col(col_pb); + append_column(new_col, ColumnType::VARIANT); + } + } +} + +void TabletSchema::reserve_extracted_columns() { + for (auto it = _cols.begin(); it != _cols.end();) { + if (!it->is_extracted_column()) { + it = _cols.erase(it); + } else { + ++it; + } + } +} + void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const { for (const auto& i : _cluster_key_idxes) { tablet_schema_pb->add_cluster_key_idxes(i); diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 072bebd95ad..b0de3912982 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -331,6 +331,12 @@ public: bool is_dropped_column(const TabletColumn& col) const; + // copy extracted columns from src_schema + void copy_extracted_columns(const TabletSchema& src_schema); + + // only reserve extracted columns + void reserve_extracted_columns(); + string get_all_field_names() const { string str = "["; for (auto p : _field_name_to_index) { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index e541c738be5..987a2106894 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -25,6 +25,7 @@ #include <butil/errno.h> #include <butil/iobuf.h> #include <fcntl.h> +#include <gen_cpp/MasterService_types.h> #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/Status_types.h> @@ -95,6 +96,7 @@ #include "runtime/stream_load/stream_load_context.h" #include "runtime/thread_context.h" #include "runtime/types.h" +#include "service/backend_options.h" #include "service/point_query_executor.h" #include "util/arrow/row_batch.h" #include "util/async_io.h" @@ -113,6 +115,7 @@ #include "util/uid_util.h" #include "vec/columns/column.h" #include "vec/columns/column_string.h" +#include "vec/common/schema_util.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/data_types/data_type.h" @@ -852,6 +855,115 @@ void PInternalServiceImpl::_get_column_ids_by_tablet_ids( response->mutable_status()->set_status_code(TStatusCode::OK); } +template <class RPCResponse> +struct AsyncRPCContext { + RPCResponse response; + brpc::Controller cntl; + brpc::CallId cid; +}; + +void PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcController* controller, + const PFetchRemoteSchemaRequest* request, + PFetchRemoteSchemaResponse* response, + google::protobuf::Closure* done) { + bool ret = _heavy_work_pool.try_offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + Status st = Status::OK(); + if (request->is_coordinator()) { + // Spawn rpc request to none coordinator nodes, and finally merge them all + PFetchRemoteSchemaRequest remote_request(*request); + // set it none coordinator to get merged schema + remote_request.set_is_coordinator(false); + using PFetchRemoteTabletSchemaRpcContext = AsyncRPCContext<PFetchRemoteSchemaResponse>; + std::vector<PFetchRemoteTabletSchemaRpcContext> rpc_contexts( + request->tablet_location_size()); + for (int i = 0; i < request->tablet_location_size(); ++i) { + std::string host = request->tablet_location(i).host(); + int32_t brpc_port = request->tablet_location(i).brpc_port(); + std::shared_ptr<PBackendService_Stub> stub( + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + host, brpc_port)); + rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id(); + stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, &remote_request, + &rpc_contexts[i].response, brpc::DoNothing()); + } + std::vector<TabletSchemaSPtr> schemas; + for (auto& rpc_context : rpc_contexts) { + brpc::Join(rpc_context.cid); + if (!st.ok()) { + // make sure all flying rpc request is joined + continue; + } + if (rpc_context.cntl.Failed()) { + LOG(WARNING) << "fetch_remote_tablet_schema rpc err:" + << rpc_context.cntl.ErrorText(); + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + rpc_context.cntl.remote_side()); + st = Status::InternalError("fetch_remote_tablet_schema rpc err: {}", + rpc_context.cntl.ErrorText()); + } + if (rpc_context.response.status().status_code() != 0) { + st = Status::create(rpc_context.response.status()); + } + if (rpc_context.response.has_merged_schema()) { + TabletSchemaSPtr schema = std::make_shared<TabletSchema>(); + schema->init_from_pb(rpc_context.response.merged_schema()); + schemas.push_back(schema); + } + } + if (!schemas.empty() && st.ok()) { + // merge all + TabletSchemaSPtr merged_schema; + static_cast<void>(vectorized::schema_util::get_least_common_schema(schemas, nullptr, + merged_schema)); + VLOG_DEBUG << "dump schema:" << merged_schema->dump_structure(); + merged_schema->reserve_extracted_columns(); + merged_schema->to_schema_pb(response->mutable_merged_schema()); + } + st.to_protobuf(response->mutable_status()); + return; + } else { + // This is not a coordinator, get it's tablet and merge schema + std::vector<int64_t> target_tablets; + for (int i = 0; i < request->tablet_location_size(); ++i) { + const auto& location = request->tablet_location(i); + auto backend = BackendOptions::get_local_backend(); + // If this is the target backend + if (backend.host == location.host() && config::brpc_port == location.brpc_port()) { + target_tablets.assign(location.tablet_id().begin(), location.tablet_id().end()); + break; + } + } + if (!target_tablets.empty()) { + std::vector<TabletSchemaSPtr> tablet_schemas; + for (int64_t tablet_id : target_tablets) { + TabletSharedPtr tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, + false); + if (tablet == nullptr) { + // just ignore + LOG(WARNING) << "tablet does not exist, tablet id is " << tablet_id; + continue; + } + tablet_schemas.push_back(tablet->tablet_schema()); + } + if (!tablet_schemas.empty()) { + // merge all + TabletSchemaSPtr merged_schema; + static_cast<void>(vectorized::schema_util::get_least_common_schema( + tablet_schemas, nullptr, merged_schema)); + merged_schema->to_schema_pb(response->mutable_merged_schema()); + VLOG_DEBUG << "dump schema:" << merged_schema->dump_structure(); + } + } + st.to_protobuf(response->mutable_status()); + } + }); + if (!ret) { + offer_failed(response, done, _heavy_work_pool); + } +} + void PInternalServiceImpl::report_stream_load_status(google::protobuf::RpcController* controller, const PReportStreamLoadStatusRequest* request, PReportStreamLoadStatusResponse* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 5713faababc..15d121f2f1b 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -204,6 +204,10 @@ public: const PGroupCommitInsertRequest* request, PGroupCommitInsertResponse* response, google::protobuf::Closure* done) override; + void fetch_remote_tablet_schema(google::protobuf::RpcController* controller, + const PFetchRemoteSchemaRequest* request, + PFetchRemoteSchemaResponse* response, + google::protobuf::Closure* done) override; void get_wal_queue_size(google::protobuf::RpcController* controller, const PGetWalQueueSizeRequest* request, diff --git a/docs/en/docs/advanced/variables.md b/docs/en/docs/advanced/variables.md index 871a81e4021..030e228f523 100644 --- a/docs/en/docs/advanced/variables.md +++ b/docs/en/docs/advanced/variables.md @@ -701,6 +701,10 @@ Note that the comment must start with /*+ and can only follow the SELECT. Whether to enable partial columns update semantics for native insert into statement, default is false. Please note that the default value of the session variable `enable_insert_strict`, which controls whether the insert statement operates in strict mode, is true. In other words, the insert statement is in strict mode by default, and in this mode, updating non-existing keys in partial column updates is not allowed. Therefore, when using the insert statement for partial columns update an [...] </version> +* `describe_extend_variant_column` + + Controls whether to extend variant column in desc table_name. The default value is false. + *** #### Supplementary instructions on statement execution timeout control diff --git a/docs/zh-CN/docs/advanced/variables.md b/docs/zh-CN/docs/advanced/variables.md index 43daa14af82..2a697b01bf7 100644 --- a/docs/zh-CN/docs/advanced/variables.md +++ b/docs/zh-CN/docs/advanced/variables.md @@ -689,6 +689,10 @@ try (Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/ 是否在对insert into语句启用部分列更新的语义,默认为 false。需要注意的是,控制insert语句是否开启严格模式的会话变量`enable_insert_strict`的默认值为true,即insert语句默认开启严格模式,而在严格模式下进行部分列更新不允许更新不存在的key。所以,在使用insert语句进行部分列更新的时候如果希望能插入不存在的key,需要在`enable_unique_key_partial_update`设置为true的基础上同时将`enable_insert_strict`设置为false。 </version> +* `describe_extend_variant_column` + + 是否展示 variant 的拆解列。默认为 false。 + *** #### 关于语句执行超时控制的补充说明 diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java index b9dc9260dd3..981f5bb61e6 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java @@ -123,6 +123,7 @@ public abstract class Type { public static final ScalarType VARIANT = new ScalarType(PrimitiveType.VARIANT); public static final AnyType ANY_STRUCT_TYPE = new AnyStructType(); public static final AnyType ANY_ELEMENT_TYPE = new AnyElementType(); + private static final Map<String, Type> typeMap = new HashMap<>(); private static final Logger LOG = LogManager.getLogger(Type.class); private static final ArrayList<ScalarType> integerTypes; @@ -135,6 +136,43 @@ public abstract class Type { private static final ArrayList<Type> structSubTypes; private static final ArrayList<ScalarType> trivialTypes; + static { + typeMap.put("TINYINT", Type.TINYINT); + typeMap.put("SMALLINT", Type.SMALLINT); + typeMap.put("INT", Type.INT); + typeMap.put("BIGINT", Type.BIGINT); + typeMap.put("LARGEINT", Type.LARGEINT); + typeMap.put("UNSIGNED_TINYINT", Type.UNSUPPORTED); + typeMap.put("UNSIGNED_SMALLINT", Type.UNSUPPORTED); + typeMap.put("UNSIGNED_INT", Type.UNSUPPORTED); + typeMap.put("UNSIGNED_BIGINT", Type.UNSUPPORTED); + typeMap.put("FLOAT", Type.FLOAT); + typeMap.put("DISCRETE_DOUBLE", Type.DOUBLE); + typeMap.put("DOUBLE", Type.DOUBLE); + typeMap.put("CHAR", Type.CHAR); + typeMap.put("DATE", Type.DATE); + typeMap.put("DATEV2", Type.DATEV2); + typeMap.put("DATETIMEV2", Type.DATETIMEV2); + typeMap.put("DATETIME", Type.DATETIME); + typeMap.put("DECIMAL32", Type.DECIMAL32); + typeMap.put("DECIMAL64", Type.DECIMAL64); + typeMap.put("DECIMAL128I", Type.DECIMAL128); + typeMap.put("DECIMAL", Type.DECIMALV2); + typeMap.put("VARCHAR", Type.VARCHAR); + typeMap.put("STRING", Type.STRING); + typeMap.put("JSONB", Type.JSONB); + typeMap.put("VARIANT", Type.VARIANT); + typeMap.put("BOOLEAN", Type.BOOLEAN); + typeMap.put("HLL", Type.HLL); + typeMap.put("STRUCT", Type.STRUCT); + typeMap.put("LIST", Type.UNSUPPORTED); + typeMap.put("MAP", Type.MAP); + typeMap.put("OBJECT", Type.UNSUPPORTED); + typeMap.put("ARRAY", Type.ARRAY); + typeMap.put("QUANTILE_STATE", Type.QUANTILE_STATE); + typeMap.put("AGG_STATE", Type.AGG_STATE); + } + static { integerTypes = Lists.newArrayList(); integerTypes.add(TINYINT); @@ -2251,5 +2289,9 @@ public abstract class Type { } return false; } + + public static Type getTypeFromTypeName(String typeName) { + return typeMap.getOrDefault(typeName, Type.UNSUPPORTED); + } } diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 1be6d99e25b..a81f8cf93c4 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -4345,9 +4345,9 @@ opt_explain_options ::= // Describe statement describe_stmt ::= - describe_command table_name:table + describe_command table_name:table opt_partition_names:partitionNames {: - RESULT = new DescribeStmt(table, false); + RESULT = new DescribeStmt(table, false, partitionNames); :} | KW_SHOW KW_FIELDS KW_FROM table_name:table {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java index f632c00f9c3..3325a736870 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java @@ -92,6 +92,7 @@ public class DescribeStmt extends ShowStmt { private TableName dbTableName; private ProcNodeInterface node; + private PartitionNames partitionNames; List<List<String>> totalRows = new LinkedList<List<String>>(); @@ -106,6 +107,12 @@ public class DescribeStmt extends ShowStmt { this.isAllTables = isAllTables; } + public DescribeStmt(TableName dbTableName, boolean isAllTables, PartitionNames partitionNames) { + this.dbTableName = dbTableName; + this.isAllTables = isAllTables; + this.partitionNames = partitionNames; + } + public DescribeStmt(TableValuedFunctionRef tableValuedFunctionRef) { this.tableValuedFunctionRef = tableValuedFunctionRef; this.isTableValuedFunction = true; @@ -156,6 +163,13 @@ public class DescribeStmt extends ShowStmt { return; } + if (partitionNames != null) { + partitionNames.analyze(analyzer); + if (partitionNames.isTemp()) { + throw new AnalysisException("Do not support temp partitions"); + } + } + dbTableName.analyze(analyzer); if (!Env.getCurrentEnv().getAccessManager() @@ -178,9 +192,22 @@ public class DescribeStmt extends ShowStmt { if (table.getType() == TableType.OLAP) { procString += ((OlapTable) table).getBaseIndexId(); } else { + if (partitionNames != null) { + throw new AnalysisException(dbTableName.getTbl() + + " is not a OLAP table, describe table failed"); + } procString += table.getId(); } - + if (partitionNames != null) { + procString += "/"; + StringBuilder builder = new StringBuilder(); + for (String str : partitionNames.getPartitionNames()) { + builder.append(str); + builder.append(","); + } + builder.deleteCharAt(builder.length() - 1); + procString += builder.toString(); + } node = ProcService.getInstance().open(procString); if (node == null) { throw new AnalysisException("Describe table[" + dbTableName.getTbl() + "] failed"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java index 1d6b8628690..f86ea7855eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java @@ -23,7 +23,9 @@ import com.google.common.collect.Lists; import java.util.EnumMap; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; public enum AggregateType { SUM("SUM"), @@ -39,6 +41,20 @@ public enum AggregateType { private static EnumMap<AggregateType, EnumSet<PrimitiveType>> compatibilityMap; + private static final Map<String, AggregateType> aggTypeMap = new HashMap<>(); + + static { + aggTypeMap.put("NONE", AggregateType.NONE); + aggTypeMap.put("SUM", AggregateType.SUM); + aggTypeMap.put("MIN", AggregateType.MIN); + aggTypeMap.put("MAX", AggregateType.MAX); + aggTypeMap.put("REPLACE", AggregateType.REPLACE); + aggTypeMap.put("REPLACE_IF_NOT_NULL", AggregateType.REPLACE_IF_NOT_NULL); + aggTypeMap.put("HLL_UNION", AggregateType.HLL_UNION); + aggTypeMap.put("BITMAP_UNION", AggregateType.BITMAP_UNION); + aggTypeMap.put("QUANTILE_UNION", AggregateType.QUANTILE_UNION); + } + static { compatibilityMap = new EnumMap<>(AggregateType.class); List<PrimitiveType> primitiveTypeList = Lists.newArrayList(); @@ -181,4 +197,8 @@ public enum AggregateType { return null; } } + + public static AggregateType getAggTypeFromAggName(String typeName) { + return aggTypeMap.get(typeName); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 2284164363d..adfa20fe1c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2423,4 +2423,14 @@ public class OlapTable extends Table { } return false; } + + public List<Tablet> getAllTablets() throws AnalysisException { + List<Tablet> tablets = Lists.newArrayList(); + for (Partition partition : getPartitions()) { + for (Tablet tablet : partition.getBaseIndex().getTablets()) { + tablets.add(tablet); + } + } + return tablets; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java index 4775ba23ef8..de91b59dce1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.AnalysisException; +import org.apache.doris.qe.SessionVariable; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -128,6 +129,10 @@ public class IndexInfoProcDir implements ProcDirInterface { throw new AnalysisException("Index " + idxId + " does not exist"); } bfColumns = olapTable.getCopiedBfColumns(); + if (olapTable.hasVariantColumns() + && SessionVariable.enableDescribeExtendVariantColumn()) { + return new RemoteIndexSchemaProcDir(table, schema, bfColumns); + } } else { schema = table.getBaseSchema(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java index 47da7a9d53b..6f125217ee1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java @@ -49,10 +49,8 @@ public class IndexSchemaProcNode implements ProcNodeInterface { this.bfColumns = bfColumns; } - @Override - public ProcResult fetchResult() throws AnalysisException { + public static ProcResult createResult(List<Column> schema, Set<String> bfColumns) throws AnalysisException { Preconditions.checkNotNull(schema); - BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); @@ -105,4 +103,8 @@ public class IndexSchemaProcNode implements ProcNodeInterface { return result; } + @Override + public ProcResult fetchResult() throws AnalysisException { + return createResult(this.schema, this.bfColumns); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java new file mode 100644 index 00000000000..7852bd6b1c0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +/* + * SHOW PROC /dbs/dbId/tableId/index_schema/indexId" + * show index schema + */ +public class RemoteIndexSchemaProcDir implements ProcDirInterface { + public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() + .add("Field").add("Type").add("Null").add("Key") + .add("Default").add("Extra") + .build(); + + private List<Column> schema; + private Set<String> bfColumns; + private TableIf table; + + public RemoteIndexSchemaProcDir(TableIf table, List<Column> schema, Set<String> bfColumns) { + this.table = table; + this.schema = schema; + this.bfColumns = bfColumns; + } + + @Override + public ProcResult fetchResult() throws AnalysisException { + Preconditions.checkNotNull(table); + Preconditions.checkNotNull(schema); + List<Tablet> tablets = null; + table.readLock(); + try { + OlapTable olapTable = (OlapTable) table; + tablets = olapTable.getAllTablets(); + } finally { + table.readUnlock(); + } + List<Column> remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch(); + if (remoteSchema == null || remoteSchema.isEmpty()) { + throw new AnalysisException("fetch remote tablet schema failed"); + } + this.schema.addAll(remoteSchema); + return IndexSchemaProcNode.createResult(this.schema, this.bfColumns); + } + + @Override + public boolean register(String name, ProcNodeInterface node) { + return false; + } + + @Override + public ProcNodeInterface lookup(String partitionString) throws AnalysisException { + Preconditions.checkNotNull(table); + + List<String> partitionNameList = new ArrayList<String>(Arrays.asList(partitionString.split(","))); + if (partitionNameList == null || partitionNameList.isEmpty()) { + throw new AnalysisException("Describe table[" + table.getName() + "] failed"); + } + List<Partition> partitions = Lists.newArrayList(); + table.readLock(); + try { + if (table.getType() == TableType.OLAP) { + OlapTable olapTable = (OlapTable) table; + for (String partitionName : partitionNameList) { + Partition partition = olapTable.getPartition(partitionName); + if (partition == null) { + throw new AnalysisException("Partition " + partitionName + " does not exist"); + } + partitions.add(partition); + } + } else { + throw new AnalysisException(table.getName() + " is not a OLAP table, describe table failed"); + } + } catch (Throwable t) { + throw new AnalysisException("Describe table[" + table.getName() + "] failed"); + } finally { + table.readUnlock(); + } + return new RemoteIndexSchemaProcNode(partitions, this.schema, this.bfColumns); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java new file mode 100644 index 00000000000..d5b3d463223 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Set; + +/* + * SHOW PROC /dbs/dbId/tableId/index_schema/indexId/partitionName" + * show index schema + */ +public class RemoteIndexSchemaProcNode implements ProcNodeInterface { + public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() + .add("Field").add("Type").add("Null").add("Key") + .add("Default").add("Extra") + .build(); + + private List<Partition> partitions; + private List<Column> schema; + private Set<String> bfColumns; + + public RemoteIndexSchemaProcNode(List<Partition> partitions, List<Column> schema, Set<String> bfColumns) { + this.partitions = partitions; + this.schema = schema; + this.bfColumns = bfColumns; + } + + @Override + public ProcResult fetchResult() throws AnalysisException { + Preconditions.checkNotNull(schema); + Preconditions.checkNotNull(partitions); + List<Tablet> tablets = Lists.newArrayList(); + for (Partition partition : partitions) { + MaterializedIndex idx = partition.getBaseIndex(); + for (Tablet tablet : idx.getTablets()) { + tablets.add(tablet); + } + } + List<Column> remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch(); + if (remoteSchema == null || remoteSchema.isEmpty()) { + throw new AnalysisException("fetch remote tablet schema failed"); + } + this.schema.addAll(remoteSchema); + return IndexSchemaProcNode.createResult(this.schema, this.bfColumns); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java new file mode 100644 index 00000000000..808b4b2a552 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse; +import org.apache.doris.proto.InternalService.PTabletsLocation; +import org.apache.doris.proto.OlapFile.ColumnPB; +import org.apache.doris.proto.OlapFile.TabletSchemaPB; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +// This class is used to pull the specified tablets' columns existing on the Backend (BE) +// including regular columns and columns decomposed by variants +public class FetchRemoteTabletSchemaUtil { + private static final Logger LOG = LogManager.getLogger(FetchRemoteTabletSchemaUtil.class); + + private List<Tablet> remoteTablets; + private List<Column> tableColumns; + + public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) { + this.remoteTablets = tablets; + this.tableColumns = Lists.newArrayList(); + } + + public List<Column> fetch() { + // 1. Find which Backend (BE) servers the tablets are on + Preconditions.checkNotNull(remoteTablets); + Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap(); + for (Tablet tablet : remoteTablets) { + for (Replica replica : tablet.getReplicas()) { + // only need alive replica + if (replica.isAlive()) { + Set<Long> tabletIds = beIdToTabletId.computeIfAbsent( + replica.getBackendId(), k -> Sets.newHashSet()); + tabletIds.add(tablet.getId()); + } + } + } + + // 2. Randomly select 2 Backend (BE) servers to act as coordinators. + // Coordinator BE is responsible for collecting all table columns and returning to the FE. + // Two BE provide a retry opportunity with the second one in case the first attempt fails. + List<PTabletsLocation> locations = Lists.newArrayList(); + List<Backend> coordinatorBackend = Lists.newArrayList(); + for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) { + Long backendId = entry.getKey(); + Set<Long> tabletIds = entry.getValue(); + Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId); + // only need alive be + if (!backend.isAlive()) { + continue; + } + // need 2 be to provide a retry + if (coordinatorBackend.size() < 2) { + coordinatorBackend.add(backend); + } + PTabletsLocation.Builder locationBuilder = PTabletsLocation.newBuilder() + .setHost(backend.getHost()) + .setBrpcPort(backend.getBrpcPort()); + PTabletsLocation location = locationBuilder.addAllTabletId(tabletIds).build(); + locations.add(location); + } + PFetchRemoteSchemaRequest.Builder requestBuilder = PFetchRemoteSchemaRequest.newBuilder() + .addAllTabletLocation(locations) + .setIsCoordinator(true); + // 3. Send rpc to coordinatorBackend util succeed or retry + for (Backend be : coordinatorBackend) { + try { + PFetchRemoteSchemaRequest request = requestBuilder.build(); + Future<PFetchRemoteSchemaResponse> future = BackendServiceProxy.getInstance() + .fetchRemoteTabletSchemaAsync(be.getBrpcAddress(), request); + PFetchRemoteSchemaResponse response = null; + try { + response = future.get(60, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + String errMsg; + if (code != TStatusCode.OK) { + if (!response.getStatus().getErrorMsgsList().isEmpty()) { + errMsg = response.getStatus().getErrorMsgsList().get(0); + } else { + errMsg = "fetchRemoteTabletSchemaAsync failed. backend address: " + + be.getHost() + " : " + be.getBrpcPort(); + } + throw new RpcException(be.getHost(), errMsg); + } + fillColumns(response); + return tableColumns; + } catch (AnalysisException e) { + // continue to get result + LOG.warn(e); + } catch (InterruptedException e) { + // continue to get result + LOG.warn("fetch remote schema future get interrupted Exception"); + } catch (TimeoutException e) { + future.cancel(true); + // continue to get result + LOG.warn("fetch remote schema result timeout, addr {}", be.getBrpcAddress()); + } + } catch (RpcException e) { + LOG.warn("fetch remote schema result rpc exception {}, e {}", be.getBrpcAddress(), e); + } catch (ExecutionException e) { + LOG.warn("fetch remote schema ExecutionException, addr {}, e {}", be.getBrpcAddress(), e); + } + } + return tableColumns; + } + + private void fillColumns(PFetchRemoteSchemaResponse response) throws AnalysisException { + TabletSchemaPB schemaPB = response.getMergedSchema(); + for (ColumnPB columnPB : schemaPB.getColumnList()) { + try { + Column remoteColumn = initColumnFromPB(columnPB); + tableColumns.add(remoteColumn); + } catch (Exception e) { + throw new AnalysisException("column default value to string failed"); + } + } + // sort the columns + Collections.sort(tableColumns, new Comparator<Column>() { + @Override + public int compare(Column c1, Column c2) { + return c1.getName().compareTo(c2.getName()); + } + }); + } + + private Column initColumnFromPB(ColumnPB column) throws AnalysisException { + try { + AggregateType aggType = AggregateType.getAggTypeFromAggName(column.getAggregation()); + Type type = Type.getTypeFromTypeName(column.getType()); + String columnName = column.getName(); + boolean isKey = column.getIsKey(); + boolean isNullable = column.getIsNullable(); + String defaultValue = column.getDefaultValue().toString("UTF-8"); + if (defaultValue.equals("")) { + defaultValue = null; + } + if (isKey) { + aggType = null; + } + do { + if (type.isArrayType()) { + List<ColumnPB> childColumn = column.getChildrenColumnsList(); + if (childColumn == null || childColumn.size() != 1) { + break; + } + Column child = initColumnFromPB(childColumn.get(0)); + type = new ArrayType(child.getType()); + } else if (type.isMapType()) { + List<ColumnPB> childColumn = column.getChildrenColumnsList(); + if (childColumn == null || childColumn.size() != 2) { + break; + } + Column keyChild = initColumnFromPB(childColumn.get(0)); + Column valueChild = initColumnFromPB(childColumn.get(1)); + type = new MapType(keyChild.getType(), valueChild.getType()); + } else if (type.isStructType()) { + List<ColumnPB> childColumn = column.getChildrenColumnsList(); + if (childColumn == null) { + break; + } + List<Type> childTypes = Lists.newArrayList(); + for (ColumnPB childPB : childColumn) { + childTypes.add(initColumnFromPB(childPB).getType()); + } + type = new StructType(childTypes); + } + } while (false); + return new Column(columnName, type, isKey, aggType, isNullable, + defaultValue, "remote schema"); + } catch (Exception e) { + throw new AnalysisException("default value to string failed"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index ebb5741671d..33603dd78a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -474,6 +474,8 @@ public class SessionVariable implements Serializable, Writable { public static final String WAIT_FULL_BLOCK_SCHEDULE_TIMES = "wait_full_block_schedule_times"; + public static final String DESCRIBE_EXTEND_VARIANT_COLUMN = "describe_extend_variant_column"; + public static final List<String> DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -857,6 +859,9 @@ public class SessionVariable implements Serializable, Writable { return beNumberForTest; } + @VariableMgr.VarAttr(name = DESCRIBE_EXTEND_VARIANT_COLUMN, needForward = true) + public boolean enableDescribeExtendVariantColumn = false; + @VariableMgr.VarAttr(name = PROFILLING) public boolean profiling = false; @@ -3057,6 +3062,22 @@ public class SessionVariable implements Serializable, Writable { } } + public boolean getEnableDescribeExtendVariantColumn() { + return enableDescribeExtendVariantColumn; + } + + public void setEnableDescribeExtendVariantColumn(boolean enableDescribeExtendVariantColumn) { + this.enableDescribeExtendVariantColumn = enableDescribeExtendVariantColumn; + } + + public static boolean enableDescribeExtendVariantColumn() { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return false; + } + return connectContext.getSessionVariable().enableDescribeExtendVariantColumn; + } + public int getProfileLevel() { return this.profileLevel; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 9535d075e24..f3f4440e038 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -152,6 +152,11 @@ public class BackendServiceClient { return stub.reportStreamLoadStatus(request); } + public Future<InternalService.PFetchRemoteSchemaResponse> fetchRemoteTabletSchemaAsync( + InternalService.PFetchRemoteSchemaRequest request) { + return stub.fetchRemoteTabletSchema(request); + } + public Future<InternalService.PGlobResponse> glob(InternalService.PGlobRequest request) { return stub.glob(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 80767acecfd..02245c83ced 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -466,5 +466,17 @@ public class BackendServiceProxy { } } + public Future<InternalService.PFetchRemoteSchemaResponse> fetchRemoteTabletSchemaAsync( + TNetworkAddress address, InternalService.PFetchRemoteSchemaRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.fetchRemoteTabletSchemaAsync(request); + } catch (Throwable e) { + LOG.warn("fetch remote tablet schema catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index ad66efc62a8..3676d854a94 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -803,6 +803,23 @@ message PGetWalQueueSizeResponse{ optional int64 size = 2; } +message PTabletsLocation { + required string host = 1; + required int32 brpc_port = 2; + repeated int64 tablet_id = 3; +} + +message PFetchRemoteSchemaRequest { + repeated PTabletsLocation tablet_location = 1; + required bool is_coordinator = 2; +} + +message PFetchRemoteSchemaResponse { + optional PStatus status = 1; + // intermediate merged schema + optional TabletSchemaPB merged_schema = 2; +} + service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult); @@ -846,5 +863,6 @@ service PBackendService { rpc group_commit_insert(PGroupCommitInsertRequest) returns (PGroupCommitInsertResponse); rpc get_wal_queue_size(PGetWalQueueSizeRequest) returns(PGetWalQueueSizeResponse); rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns (PFetchArrowFlightSchemaResult); + rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns (PFetchRemoteSchemaResponse); }; diff --git a/regression-test/data/variant_p0/desc.out b/regression-test/data/variant_p0/desc.out new file mode 100644 index 00000000000..f6f3d91048f --- /dev/null +++ b/regression-test/data/variant_p0/desc.out @@ -0,0 +1,196 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_1 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_2 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.ddd.aaa TINYINT Yes false \N +v.ddd.mxmxm JSON Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_3 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_6_1 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.ddd.aaa TINYINT Yes false \N +v.ddd.mxmxm JSON Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_6_2 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_6_3 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N + +-- !sql_6 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.ddd.aaa TINYINT Yes false \N +v.ddd.mxmxm JSON Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_7 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_7_1 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_7_2 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N + +-- !sql_7_3 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_8 -- +k BIGINT Yes true \N +v1 VARIANT Yes false \N NONE +v2 VARIANT Yes false \N NONE +v3 VARIANT Yes false \N NONE +v1.a SMALLINT Yes false \N +v1.b JSON Yes false \N +v1.c.c SMALLINT Yes false \N +v1.c.e DOUBLE Yes false \N +v1.oooo.xxxx.xxx TINYINT Yes false \N +v2.a SMALLINT Yes false \N +v2.xxxx TEXT Yes false \N +v3.a SMALLINT Yes false \N +v3.b JSON Yes false \N +v3.c.c SMALLINT Yes false \N +v3.c.e DOUBLE Yes false \N + +-- !sql_9 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE + +-- !sql_9_1 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.oooo.xxxx.xxx TINYINT Yes false \N + +-- !sql_10 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.k1 TINYINT Yes false \N +v.k2 TEXT Yes false \N +v.k3 ARRAY<SMALLINT> Yes false [] +v.k4 DOUBLE Yes false \N +v.k5 JSON Yes false \N + +-- !sql_10_1 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v2 VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.k1 TINYINT Yes false \N +v.k2 TEXT Yes false \N +v.k3 ARRAY<SMALLINT> Yes false [] +v.k4 DOUBLE Yes false \N +v.k5 JSON Yes false \N +v.oooo.xxxx.xxx TINYINT Yes false \N +v2.a SMALLINT Yes false \N +v2.b JSON Yes false \N +v2.c.c SMALLINT Yes false \N +v2.c.e DOUBLE Yes false \N +v2.oooo.xxxx.xxx TINYINT Yes false \N + +-- !sql_10_2 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.k1 TINYINT Yes false \N +v.k2 TEXT Yes false \N +v.k3 ARRAY<SMALLINT> Yes false [] +v.k4 DOUBLE Yes false \N +v.k5 JSON Yes false \N +v.oooo.xxxx.xxx TINYINT Yes false \N +v2.a SMALLINT Yes false \N +v2.b JSON Yes false \N +v2.c.c SMALLINT Yes false \N +v2.c.e DOUBLE Yes false \N +v2.oooo.xxxx.xxx TINYINT Yes false \N + +-- !sql_10_3 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v3 VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.k1 TINYINT Yes false \N +v.k2 TEXT Yes false \N +v.k3 ARRAY<SMALLINT> Yes false [] +v.k4 DOUBLE Yes false \N +v.k5 JSON Yes false \N +v.oooo.xxxx.xxx TINYINT Yes false \N +v3.a SMALLINT Yes false \N +v3.b JSON Yes false \N +v3.c.c SMALLINT Yes false \N +v3.c.e DOUBLE Yes false \N +v3.oooo.xxxx.xxx TINYINT Yes false \N + +-- !sql_11 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.!@#^&*() TEXT Yes false \N +v.名字 TEXT Yes false \N +v.画像.丬文 TEXT Yes false \N +v.画像.地址 TEXT Yes false \N +v.金额 SMALLINT Yes false \N + diff --git a/regression-test/suites/variant_p0/desc.groovy b/regression-test/suites/variant_p0/desc.groovy new file mode 100644 index 00000000000..fc9332acc59 --- /dev/null +++ b/regression-test/suites/variant_p0/desc.groovy @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("regression_test_variant_desc", "nonConcurrent"){ + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def create_table = { table_name, buckets="auto" -> + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS ${buckets} + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + } + + def create_table_partition = { table_name, buckets="auto" -> + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + PARTITION BY RANGE(k) + ( + PARTITION p1 VALUES LESS THAN (3000), + PARTITION p2 VALUES LESS THAN (50000), + PARTITION p3 VALUES LESS THAN (100000) + ) + DISTRIBUTED BY HASH(k) BUCKETS ${buckets} + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + } + + def set_be_config = { key, value -> + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + + try { + // sparse columns + def table_name = "sparse_columns" + create_table table_name + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") + sql """set describe_extend_variant_column = true""" + sql """insert into sparse_columns select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str + union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_1 """desc ${table_name}""" + sql "truncate table sparse_columns" + sql """insert into sparse_columns select 0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str + union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" : {"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_2 """desc ${table_name}""" + sql "truncate table sparse_columns" + + // no sparse columns + table_name = "no_sparse_columns" + create_table.call(table_name, "4") + sql "set enable_two_phase_read_opt = false;" + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1") + sql """insert into ${table_name} select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str + union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_3 """desc ${table_name}""" + sql "truncate table ${table_name}" + + // partititon + table_name = "partition_data" + create_table_partition.call(table_name, "4") + sql "set enable_two_phase_read_opt = false;" + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") + sql """insert into ${table_name} select 2500, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str + union all select 2500, '{"a" : 1234, "xxxx" : "kaana", "ddd" : {"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = "4096") limit 4096 ;""" + sql """insert into ${table_name} select 45000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str + union all select 45000, '{"a": 1123}' as json_str union all select 45000, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + sql """insert into ${table_name} values(95000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + qt_sql_6_1 """desc ${table_name} partition p1""" + qt_sql_6_2 """desc ${table_name} partition p2""" + qt_sql_6_3 """desc ${table_name} partition p3""" + qt_sql_6 """desc ${table_name}""" + sql "truncate table ${table_name}" + + // drop partition + table_name = "drop_partition" + create_table_partition.call(table_name, "4") + // insert into partition p1 + sql """insert into ${table_name} values(2500, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + // insert into partition p2 + sql """insert into ${table_name} values(45000, '{"a": 11245, "xxxx" : "kaana"}')""" + // insert into partition p3 + sql """insert into ${table_name} values(95000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + // drop p1 + sql """alter table ${table_name} drop partition p1""" + qt_sql_7 """desc ${table_name}""" + qt_sql_7_1 """desc ${table_name} partition p2""" + qt_sql_7_2 """desc ${table_name} partition p3""" + qt_sql_7_3 """desc ${table_name} partition (p2, p3)""" + sql "truncate table ${table_name}" + + // more variant + table_name = "more_variant_table" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v1 variant, + v2 variant, + v3 variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 5 + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}', '{"a": 11245, "xxxx" : "kaana"}', '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + qt_sql_8 """desc ${table_name}""" + sql "truncate table ${table_name}" + + // describe_extend_variant_column = false + sql """set describe_extend_variant_column = false""" + table_name = "no_extend_variant_column" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 5 + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')""" + qt_sql_9 """desc ${table_name}""" + sql """set describe_extend_variant_column = true""" + qt_sql_9_1 """desc ${table_name}""" + sql "truncate table ${table_name}" + + // schema change: add varaint + table_name = "schema_change_table" + create_table.call(table_name, "5") + // add, drop columns + sql """INSERT INTO ${table_name} values(0, '{"k1":1, "k2": "hello world", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}')""" + sql """set describe_extend_variant_column = true""" + qt_sql_10 """desc ${table_name}""" + // add column + sql "alter table ${table_name} add column v2 variant default null" + sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}', + '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')""" + qt_sql_10_1 """desc ${table_name}""" + // drop cloumn + sql "alter table ${table_name} drop column v2" + qt_sql_10_2 """desc ${table_name}""" + // add column + sql "alter table ${table_name} add column v3 variant default null" + sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}', + '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')""" + qt_sql_10_3 """desc ${table_name}""" + //sql "truncate table ${table_name}" + + // varaint column name: chinese name, unicode + table_name = "chinese_table" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 5 + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + sql """ insert into ${table_name} values (0, '{"名字" : "jack", "!@#^&*()": "11111", "金额" : 200, "画像" : {"地址" : "北京", "\\\u4E2C\\\u6587": "unicode"}}')""" + sql """set describe_extend_variant_column = true""" + qt_sql_11 """desc ${table_name}""" + } finally { + // reset flags + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org