This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch variant in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant by this push: new a0c28dc7d6 [improve](variant)Desc variant (#23111) a0c28dc7d6 is described below commit a0c28dc7d61711e23fa7fa264c009f430b307aa2 Author: Chenyang Sun <csun5...@gmail.com> AuthorDate: Thu Aug 17 17:32:59 2023 +0800 [improve](variant)Desc variant (#23111) support desc variant with remote fetch --------- Co-authored-by: eldenmoon <15605149...@163.com> --- be/src/olap/rowset/segment_v2/column_reader.h | 5 +- be/src/olap/rowset/segment_v2/segment.cpp | 6 +- be/src/service/internal_service.cpp | 106 +++++ be/src/service/internal_service.h | 5 + fe/fe-core/src/main/cup/sql_parser.cup | 4 +- .../org/apache/doris/analysis/DescribeStmt.java | 28 +- .../java/org/apache/doris/catalog/OlapTable.java | 19 + .../apache/doris/common/proc/IndexInfoProcDir.java | 3 + .../doris/common/proc/IndexSchemaProcNode.java | 8 +- .../common/proc/RemoteIndexSchemaProcDir.java | 113 ++++++ .../common/proc/RemoteIndexSchemaProcNode.java | 72 ++++ .../common/util/FetchRemoteTabletSchemaUtil.java | 320 ++++++++++++++++ .../org/apache/doris/rpc/BackendServiceClient.java | 5 + .../org/apache/doris/rpc/BackendServiceProxy.java | 12 + gensrc/proto/internal_service.proto | 18 + regression-test/data/variant_p0/desc.out | 101 +++++ regression-test/suites/variant_p0/desc.groovy | 174 +++++++++ regression-test/suites/variant_p0/load.groovy | 426 ++++++++++----------- 18 files changed, 1200 insertions(+), 225 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 1d21c2ac91..c27daa43ec 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -493,8 +493,8 @@ private: // Cache the sub column iterators and columns to reduce data read amplification class CachedStreamIterator : public ColumnIterator { public: - CachedStreamIterator(SubstreamCache* stream_cache, const vectorized::PathInData& path) - : _stream_cache(stream_cache), _path(path) {} + CachedStreamIterator(const vectorized::PathInData& path) + : _path(path) {} ~CachedStreamIterator() override = default; @@ -527,7 +527,6 @@ private: _rows_read += nrows; return Status::OK(); } - SubstreamCache* _stream_cache; vectorized::PathInData _path; size_t _rows_read = 0; // could duplicate with nodes under _path, since node_func is idempotent diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index eef524bfa3..3aa0ce2e65 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -416,12 +416,12 @@ Status Segment::new_iterator_with_path(const TabletColumn& tablet_column, if (node != nullptr && node->is_scalar() && node->children.empty()) { // Direct read extracted columns const auto* node = _sub_column_tree.find_leaf(tablet_column.path_info()); - auto cache_iter = new CachedStreamIterator(stream_cache, tablet_column.path_info()); + auto cache_iter = new CachedStreamIterator(tablet_column.path_info()); RETURN_IF_ERROR(add_stream(cache_iter, node)); iter->reset(cache_iter); } else if (node != nullptr && !node->children.empty()) { // None leave node need merge with root - auto* stream_iter = new CachedStreamIterator(stream_cache, tablet_column.path_info()); + auto* stream_iter = new CachedStreamIterator(tablet_column.path_info()); std::vector<const SubcolumnColumnReaders::Node*> leaves; vectorized::PathsInData leaves_paths; SubcolumnColumnReaders::get_leaves_of_node(node, leaves, leaves_paths); @@ -446,7 +446,7 @@ Status Segment::new_iterator_with_path(const TabletColumn& tablet_column, RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); return Status::OK(); } - auto cache_iter = new CachedStreamIterator(stream_cache, tablet_column.path_info()); + auto cache_iter = new CachedStreamIterator(tablet_column.path_info()); RETURN_IF_ERROR(add_stream(cache_iter, node)); iter->reset(cache_iter); } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 610c45aa4d..ac692472fa 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -25,6 +25,7 @@ #include <butil/errno.h> #include <butil/iobuf.h> #include <fcntl.h> +#include <gen_cpp/MasterService_types.h> #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/Status_types.h> @@ -93,6 +94,7 @@ #include "runtime/stream_load/stream_load_context.h" #include "runtime/thread_context.h" #include "runtime/types.h" +#include "service/backend_options.h" #include "service/point_query_executor.h" #include "util/async_io.h" #include "util/brpc_client_cache.h" @@ -112,6 +114,7 @@ #include "util/uid_util.h" #include "vec/columns/column.h" #include "vec/columns/column_string.h" +#include "vec/common/schema_util.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/data_types/data_type.h" @@ -720,6 +723,109 @@ void PInternalServiceImpl::_get_column_ids_by_tablet_ids( response->mutable_status()->set_status_code(TStatusCode::OK); } +template <class RPCResponse> +struct AsyncRPCContext { + RPCResponse response; + brpc::Controller cntl; + brpc::CallId cid; +}; + +void PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcController* controller, + const PFetchRemoteSchemaRequest* request, + PFetchRemoteSchemaResponse* response, + google::protobuf::Closure* done) { + bool ret = _heavy_work_pool.try_offer([request, response, done]() { + auto merge_schema = + [](const std::vector<TabletSchemaSPtr>& input_schema) -> TabletSchemaSPtr { + TabletSchemaSPtr merged_schema = std::make_shared<TabletSchema>(); + vectorized::schema_util::get_least_common_schema(input_schema, merged_schema); + return merged_schema; + }; + + brpc::ClosureGuard closure_guard(done); + Status st = Status::OK(); + if (request->is_coordinator()) { + // Spawn rpc request to none coordinator nodes, and finally merge them all + PFetchRemoteSchemaRequest remote_request(*request); + // set it none coordinator to get merged schema + remote_request.set_is_coordinator(false); + using PFetchRemoteTabletSchemaRpcContext = AsyncRPCContext<PFetchRemoteSchemaResponse>; + std::vector<PFetchRemoteTabletSchemaRpcContext> rpc_contexts( + request->tablet_location_size()); + for (int i = 0; i < request->tablet_location_size(); ++i) { + std::string host = request->tablet_location(i).host(); + int32_t brpc_port = request->tablet_location(i).brpc_port(); + std::shared_ptr<PBackendService_Stub> stub( + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + host, brpc_port)); + stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, &remote_request, + &rpc_contexts[i].response, brpc::DoNothing()); + } + std::vector<TabletSchemaSPtr> schemas; + for (auto& rpc_context : rpc_contexts) { + brpc::Join(rpc_context.cntl.call_id()); + if (rpc_context.cntl.Failed()) { + LOG(WARNING) << "fetch_remote_tablet_schema rpc err:" + << rpc_context.cntl.ErrorText(); + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + rpc_context.cntl.remote_side()); + st = Status::InternalError("fetch_remote_tablet_schema rpc err: {}", + rpc_context.cntl.ErrorText()); + break; + } + if (rpc_context.response.status().status_code() != 0) { + st = Status::create(rpc_context.response.status()); + break; + } + if (rpc_context.response.has_merged_schema()) { + TabletSchemaSPtr schema = std::make_shared<TabletSchema>(); + schema->init_from_pb(rpc_context.response.merged_schema()); + schemas.push_back(schema); + } + } + if (!schemas.empty()) { + // merge all + TabletSchemaSPtr merged_schema = merge_schema(schemas); + merged_schema->to_schema_pb(response->mutable_merged_schema()); + } + st.to_protobuf(response->mutable_status()); + return; + } + + // This is not a coordinator, get it's tablet and merge schema + std::vector<int64_t> target_tablets; + for (int i = 0; i < request->tablet_location_size(); ++i) { + const auto& location = request->tablet_location(i); + auto backend = BackendOptions::get_local_backend(); + // If this is the target backend + if (backend.host == location.host() && config::brpc_port == location.brpc_port()) { + target_tablets.assign(location.tablet_id().begin(), location.tablet_id().end()); + break; + } + } + if (!target_tablets.empty()) { + std::vector<TabletSchemaSPtr> tablet_schemas; + for (int64_t tablet_id : target_tablets) { + TabletSharedPtr tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, false); + if (tablet == nullptr) { + // just ignore + continue; + } + tablet_schemas.push_back(tablet->tablet_schema()); + } + + // merge all + TabletSchemaSPtr merged_schema = merge_schema(tablet_schemas); + merged_schema->to_schema_pb(response->mutable_merged_schema()); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _heavy_work_pool); + } +} + void PInternalServiceImpl::report_stream_load_status(google::protobuf::RpcController* controller, const PReportStreamLoadStatusRequest* request, PReportStreamLoadStatusResponse* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 55b51cf40a..799038a834 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -189,6 +189,11 @@ public: void glob(google::protobuf::RpcController* controller, const PGlobRequest* request, PGlobResponse* response, google::protobuf::Closure* done) override; + void fetch_remote_tablet_schema(google::protobuf::RpcController* controller, + const PFetchRemoteSchemaRequest* request, + PFetchRemoteSchemaResponse* response, + google::protobuf::Closure* done) override; + private: void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index add419fa14..5466ca7f80 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -4423,9 +4423,9 @@ opt_explain_options ::= // Describe statement describe_stmt ::= - describe_command table_name:table + describe_command table_name:table opt_partition_names:partitionNames {: - RESULT = new DescribeStmt(table, false); + RESULT = new DescribeStmt(table, false, partitionNames); :} | KW_SHOW KW_FIELDS KW_FROM table_name:table {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java index f632c00f9c..2aa6b47c46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java @@ -92,6 +92,7 @@ public class DescribeStmt extends ShowStmt { private TableName dbTableName; private ProcNodeInterface node; + private PartitionNames partitionNames; List<List<String>> totalRows = new LinkedList<List<String>>(); @@ -106,6 +107,12 @@ public class DescribeStmt extends ShowStmt { this.isAllTables = isAllTables; } + public DescribeStmt(TableName dbTableName, boolean isAllTables, PartitionNames partitionNames) { + this.dbTableName = dbTableName; + this.isAllTables = isAllTables; + this.partitionNames = partitionNames; + } + public DescribeStmt(TableValuedFunctionRef tableValuedFunctionRef) { this.tableValuedFunctionRef = tableValuedFunctionRef; this.isTableValuedFunction = true; @@ -156,6 +163,13 @@ public class DescribeStmt extends ShowStmt { return; } + if (partitionNames != null) { + partitionNames.analyze(analyzer); + if (partitionNames.isTemp()) { + throw new AnalysisException("Do not support temp partitions"); + } + } + dbTableName.analyze(analyzer); if (!Env.getCurrentEnv().getAccessManager() @@ -178,9 +192,21 @@ public class DescribeStmt extends ShowStmt { if (table.getType() == TableType.OLAP) { procString += ((OlapTable) table).getBaseIndexId(); } else { + if (partitionNames != null) { + throw new AnalysisException("Describe table[" + dbTableName.getTbl() + "] failed"); + } procString += table.getId(); } - + if (partitionNames != null) { + procString += "/"; + StringBuilder builder = new StringBuilder(); + for (String str : partitionNames.getPartitionNames()) { + builder.append(str); + builder.append(","); + } + builder.deleteCharAt(builder.length() - 1); + procString += builder.toString(); + } node = ProcService.getInstance().open(procString); if (node == null) { throw new AnalysisException("Describe table[" + dbTableName.getTbl() + "] failed"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index aec2c4d997..1735df7358 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2276,4 +2276,23 @@ public class OlapTable extends Table { } } } + + public List<Tablet> getAllTablets() throws AnalysisException { + List<Tablet> tablets = Lists.newArrayList(); + for (Partition partition : getPartitions()) { + for (Tablet tablet : partition.getBaseIndex().getTablets()) { + tablets.add(tablet); + } + } + return tablets; + } + + public boolean hasVariantColumns() { + for (Column column : getBaseSchema()) { + if (column.getType().isVariantType()) { + return true; + } + } + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java index 4775ba23ef..24882a3e97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java @@ -128,6 +128,9 @@ public class IndexInfoProcDir implements ProcDirInterface { throw new AnalysisException("Index " + idxId + " does not exist"); } bfColumns = olapTable.getCopiedBfColumns(); + if (olapTable.hasVariantColumns()) { + return new RemoteIndexSchemaProcDir(table, schema, bfColumns); + } } else { schema = table.getBaseSchema(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java index 47da7a9d53..6f125217ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java @@ -49,10 +49,8 @@ public class IndexSchemaProcNode implements ProcNodeInterface { this.bfColumns = bfColumns; } - @Override - public ProcResult fetchResult() throws AnalysisException { + public static ProcResult createResult(List<Column> schema, Set<String> bfColumns) throws AnalysisException { Preconditions.checkNotNull(schema); - BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); @@ -105,4 +103,8 @@ public class IndexSchemaProcNode implements ProcNodeInterface { return result; } + @Override + public ProcResult fetchResult() throws AnalysisException { + return createResult(this.schema, this.bfColumns); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java new file mode 100644 index 0000000000..43dfe89d73 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +/* + * SHOW PROC /dbs/dbId/tableId/index_schema/indexId" + * show index schema + */ +public class RemoteIndexSchemaProcDir implements ProcDirInterface { + public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() + .add("Field").add("Type").add("Null").add("Key") + .add("Default").add("Extra") + .build(); + + private List<Column> schema; + private Set<String> bfColumns; + private TableIf table; + + public RemoteIndexSchemaProcDir(TableIf table, List<Column> schema, Set<String> bfColumns) { + this.table = table; + this.schema = schema; + this.bfColumns = bfColumns; + } + + @Override + public ProcResult fetchResult() throws AnalysisException { + Preconditions.checkNotNull(table); + Preconditions.checkNotNull(schema); + List<Tablet> tablets = null; + table.readLock(); + try { + OlapTable olapTable = (OlapTable) table; + tablets = olapTable.getAllTablets(); + } finally { + table.readUnlock(); + } + List<Column> remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch(); + if (remoteSchema == null || remoteSchema.isEmpty()) { + throw new AnalysisException("fetch remote tablet schema failed"); + } + this.schema = remoteSchema; + return IndexSchemaProcNode.createResult(this.schema, this.bfColumns); + } + + @Override + public boolean register(String name, ProcNodeInterface node) { + return false; + } + + @Override + public ProcNodeInterface lookup(String partitionString) throws AnalysisException { + Preconditions.checkNotNull(table); + + List<String> partitionNameList = new ArrayList<String>(Arrays.asList(partitionString.split(","))); + if (partitionNameList == null || partitionNameList.isEmpty()) { + throw new AnalysisException("Describe table[" + table.getName() + "] failed"); + } + List<Partition> partitions = Lists.newArrayList(); + table.readLock(); + try { + if (table.getType() == TableType.OLAP) { + OlapTable olapTable = (OlapTable) table; + for (String partitionName : partitionNameList) { + Partition partition = olapTable.getPartition(partitionName); + if (partition == null) { + throw new AnalysisException("Partition " + partitionName + " does not exist"); + } + partitions.add(partition); + } + } else { + throw new AnalysisException("Describe table[" + table.getName() + "] failed"); + } + } catch (Throwable t) { + throw new AnalysisException("Describe table[" + table.getName() + "] failed"); + } finally { + table.readUnlock(); + } + return new RemoteIndexSchemaProcNode(partitions, this.schema, this.bfColumns); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java new file mode 100644 index 0000000000..f3a5760ade --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Set; + +/* + * SHOW PROC /dbs/dbId/tableId/index_schema/indexId/partitionName" + * show index schema + */ +public class RemoteIndexSchemaProcNode implements ProcNodeInterface { + public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() + .add("Field").add("Type").add("Null").add("Key") + .add("Default").add("Extra") + .build(); + + private List<Partition> partitions; + private List<Column> schema; + private Set<String> bfColumns; + + public RemoteIndexSchemaProcNode(List<Partition> partitions, List<Column> schema, Set<String> bfColumns) { + this.partitions = partitions; + this.schema = schema; + this.bfColumns = bfColumns; + } + + @Override + public ProcResult fetchResult() throws AnalysisException { + Preconditions.checkNotNull(schema); + Preconditions.checkNotNull(partitions); + List<Tablet> tablets = Lists.newArrayList(); + for (Partition partition : partitions) { + MaterializedIndex idx = partition.getBaseIndex(); + for (Tablet tablet : idx.getTablets()) { + tablets.add(tablet); + } + } + List<Column> remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch(); + if (remoteSchema == null || remoteSchema.isEmpty()) { + throw new AnalysisException("fetch remote tablet schema failed"); + } + this.schema = remoteSchema; + return IndexSchemaProcNode.createResult(this.schema, this.bfColumns); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java new file mode 100644 index 0000000000..7145ac1acf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse; +import org.apache.doris.proto.InternalService.PTabletsLocation; +import org.apache.doris.proto.OlapFile.ColumnPB; +import org.apache.doris.proto.OlapFile.TabletSchemaPB; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +public class FetchRemoteTabletSchemaUtil { + private static final Logger LOG = LogManager.getLogger(FetchRemoteTabletSchemaUtil.class); + + private List<Tablet> tablets; + private List<Column> columns; + + public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) { + this.tablets = tablets; + this.columns = Lists.newArrayList(); + } + + public List<Column> fetch() { + // find be + Preconditions.checkNotNull(tablets); + Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap(); + for (Tablet tablet : tablets) { + for (Replica replica : tablet.getReplicas()) { + Set<Long> tabletIds = beIdToTabletId.computeIfAbsent( + replica.getBackendId(), k -> Sets.newHashSet()); + tabletIds.add(tablet.getId()); + } + } + + // build PTabletsLocation + List<PTabletsLocation> locations = Lists.newArrayList(); + List<Backend> coordinatorBackend = Lists.newArrayList(); + for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) { + Long backendId = entry.getKey(); + Set<Long> tabletIds = entry.getValue(); + Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId); + if (coordinatorBackend.size() < 2) { + coordinatorBackend.add(backend); + } + PTabletsLocation.Builder locationBuilder = PTabletsLocation.newBuilder() + .setHost(backend.getHost()) + .setBrpcPort(backend.getBrpcPort()); + PTabletsLocation location = locationBuilder.addAllTabletId(tabletIds).build(); + locations.add(location); + } + PFetchRemoteSchemaRequest.Builder requestBuilder = PFetchRemoteSchemaRequest.newBuilder() + .addAllTabletLocation(locations) + .setIsCoordinator(true); + // send rpc to coordinatorBackend util succeed or 2 times + for (Backend be : coordinatorBackend) { + try { + PFetchRemoteSchemaRequest request = requestBuilder.build(); + Future<PFetchRemoteSchemaResponse> future = BackendServiceProxy.getInstance() + .fetchRemoteTabletSchemaAsync(be.getBrpcAdress(), request); + PFetchRemoteSchemaResponse response = null; + try { + response = future.get(60, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + String errMsg; + if (code != TStatusCode.OK) { + if (!response.getStatus().getErrorMsgsList().isEmpty()) { + errMsg = response.getStatus().getErrorMsgsList().get(0); + } else { + errMsg = "fetchRemoteTabletSchemaAsync failed. backend address: " + + be.getHost() + " : " + be.getBrpcPort(); + } + throw new AnalysisException(errMsg); + } + fillColumns(response); + break; + } catch (AnalysisException e) { + // continue to get result + LOG.warn(e); + } catch (InterruptedException e) { + // continue to get result + LOG.warn("fetch remote schema future get interrupted Exception"); + } catch (TimeoutException e) { + future.cancel(true); + // continue to get result + LOG.warn("fetch remote schema result timeout, addr {}", be.getBrpcAdress()); + } + } catch (RpcException e) { + LOG.warn("fetch remote schema result rpc exception {}, e {}", be.getBrpcAdress(), e); + } catch (ExecutionException e) { + LOG.warn("fetch remote schema result execution exception {}, addr {}", e, be.getBrpcAdress()); + } + } + return columns; + } + + private void fillColumns(PFetchRemoteSchemaResponse response) throws AnalysisException { + TabletSchemaPB schemaPB = response.getMergedSchema(); + for (ColumnPB columnPB : schemaPB.getColumnList()) { + try { + Column remoteColumn = initColumnFromPB(columnPB); + columns.add(remoteColumn); + } catch (Exception e) { + throw new AnalysisException("default value to string failed"); + } + } + int variantColumntIdx = 0; + for (Column column : columns) { + variantColumntIdx++; + if (column.getType().isVariantType()) { + break; + } + } + if (variantColumntIdx == columns.size()) { + return; + } + List<Column> subList = columns.subList(variantColumntIdx, columns.size()); + Collections.sort(subList, new Comparator<Column>() { + @Override + public int compare(Column c1, Column c2) { + return c1.getName().compareTo(c2.getName()); + } + }); + } + + private Column initColumnFromPB(ColumnPB column) throws AnalysisException { + try { + AggregateType aggType = getAggTypeFromAggName(column.getAggregation()); + Type type = getTypeFromTypeName(column.getType()); + String columnName = column.getName(); + boolean isKey = column.getIsKey(); + boolean isNullable = column.getIsNullable(); + String defaultValue = column.getDefaultValue().toString("UTF-8"); + if (defaultValue.equals("")) { + defaultValue = "NULL"; + } + do { + if (type.isArrayType()) { + List<ColumnPB> childColumn = column.getChildrenColumnsList(); + if (childColumn == null || childColumn.size() != 1) { + break; + } + Column child = initColumnFromPB(childColumn.get(0)); + type = new ArrayType(child.getType()); + } else if (type.isMapType()) { + List<ColumnPB> childColumn = column.getChildrenColumnsList(); + if (childColumn == null || childColumn.size() != 2) { + break; + } + Column keyChild = initColumnFromPB(childColumn.get(0)); + Column valueChild = initColumnFromPB(childColumn.get(1)); + type = new MapType(keyChild.getType(), valueChild.getType()); + } else if (type.isStructType()) { + List<ColumnPB> childColumn = column.getChildrenColumnsList(); + if (childColumn == null) { + break; + } + List<Type> childTypes = Lists.newArrayList(); + for (ColumnPB childPB : childColumn) { + childTypes.add(initColumnFromPB(childPB).getType()); + } + type = new StructType(childTypes); + } + } while (false); + return new Column(columnName, type, isKey, aggType, isNullable, + defaultValue, "remote schema"); + } catch (Exception e) { + throw new AnalysisException("default value to string failed"); + } + } + + private Type getTypeFromTypeName(String typeName) { + Type type; + if (typeName.equals("TINYINT")) { + type = Type.TINYINT; + } else if (typeName.equals("SMALLINT")) { + type = Type.SMALLINT; + } else if (typeName.equals("INT")) { + type = Type.INT; + } else if (typeName.equals("BIGINT")) { + type = Type.BIGINT; + } else if (typeName.equals("LARGEINT")) { + type = Type.LARGEINT; + } else if (typeName.equals("UNSIGNED_TINYINT")) { + type = Type.BIGINT; + } else if (typeName.equals("UNSIGNED_SMALLINT")) { + type = Type.UNSUPPORTED; + } else if (typeName.equals("UNSIGNED_INT")) { + type = Type.UNSUPPORTED; + } else if (typeName.equals("UNSIGNED_BIGINT")) { + type = Type.UNSUPPORTED; + } else if (typeName.equals("FLOAT")) { + type = Type.FLOAT; + } else if (typeName.equals("DISCRETE_DOUBLE")) { + type = Type.DOUBLE; + } else if (typeName.equals("DOUBLE")) { + type = Type.DOUBLE; + } else if (typeName.equals("CHAR")) { + type = Type.CHAR; + } else if (typeName.equals("DATE")) { + type = Type.DATE; + } else if (typeName.equals("DATEV2")) { + type = Type.DATEV2; + } else if (typeName.equals("DATETIMEV2")) { + type = Type.DATETIMEV2; + } else if (typeName.equals("DATETIME")) { + type = Type.DATETIME; + } else if (typeName.equals("DECIMAL32")) { + type = Type.DECIMAL32; + } else if (typeName.equals("DECIMAL64")) { + type = Type.DECIMAL64; + } else if (typeName.equals("DECIMAL128I")) { + type = Type.DECIMAL128; + } else if (typeName.equals("DECIMAL")) { + type = Type.DECIMALV2; + } else if (typeName.equals("VARCHAR")) { + type = Type.VARCHAR; + } else if (typeName.equals("STRING")) { + type = Type.STRING; + } else if (typeName.equals("JSONB")) { + type = Type.JSONB; + } else if (typeName.equals("VARIANT")) { + type = Type.VARIANT; + } else if (typeName.equals("BOOLEAN")) { + type = Type.BOOLEAN; + } else if (typeName.equals("HLL")) { + type = Type.HLL; + } else if (typeName.equals("STRUCT")) { + type = Type.STRUCT; + } else if (typeName.equals("LIST")) { + type = Type.UNSUPPORTED; + } else if (typeName.equals("MAP")) { + type = Type.MAP; + } else if (typeName.equals("OBJECT")) { + type = Type.UNSUPPORTED; + } else if (typeName.equals("ARRAY")) { + type = Type.ARRAY; + } else if (typeName.equals("QUANTILE_STATE")) { + type = Type.QUANTILE_STATE; + } else if (typeName.equals("AGG_STATE")) { + type = Type.AGG_STATE; + } else { + type = Type.UNSUPPORTED; + } + return type; + } + + private AggregateType getAggTypeFromAggName(String aggName) { + AggregateType aggType; + if (aggName.equals("NONE")) { + aggType = AggregateType.NONE; + } else if (aggName.equals("SUM")) { + aggType = AggregateType.SUM; + } else if (aggName.equals("MIN")) { + aggType = AggregateType.MIN; + } else if (aggName.equals("MAX")) { + aggType = AggregateType.MAX; + } else if (aggName.equals("REPLACE")) { + aggType = AggregateType.REPLACE; + } else if (aggName.equals("REPLACE_IF_NOT_NULL")) { + aggType = AggregateType.REPLACE_IF_NOT_NULL; + } else if (aggName.equals("HLL_UNION")) { + aggType = AggregateType.HLL_UNION; + } else if (aggName.equals("BITMAP_UNION")) { + aggType = AggregateType.BITMAP_UNION; + } else if (aggName.equals("QUANTILE_UNION")) { + aggType = AggregateType.QUANTILE_UNION; + } else if (!aggName.isEmpty()) { + aggType = AggregateType.GENERIC_AGGREGATION; + } else { + aggType = AggregateType.NONE; + } + return aggType; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index d012e757ef..a0a4fbaa83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -147,6 +147,11 @@ public class BackendServiceClient { return stub.reportStreamLoadStatus(request); } + public Future<InternalService.PFetchRemoteSchemaResponse> fetchRemoteTabletSchemaAsync( + InternalService.PFetchRemoteSchemaRequest request) { + return stub.fetchRemoteTabletSchema(request); + } + public Future<InternalService.PGlobResponse> glob(InternalService.PGlobRequest request) { return stub.glob(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index d9fae1daf9..dedc832879 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -411,4 +411,16 @@ public class BackendServiceProxy { } } + public Future<InternalService.PFetchRemoteSchemaResponse> fetchRemoteTabletSchemaAsync( + TNetworkAddress address, InternalService.PFetchRemoteSchemaRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.fetchRemoteTabletSchemaAsync(request); + } catch (Throwable e) { + LOG.warn("fetch remote tablet schema catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 830ed3c41a..819f835a90 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -692,6 +692,23 @@ message PGlobResponse { repeated PFileInfo files = 2; } +message PTabletsLocation { + required string host = 1; + required int32 brpc_port = 2; + repeated int64 tablet_id = 3; +} + +message PFetchRemoteSchemaRequest { + repeated PTabletsLocation tablet_location = 1; + required bool is_coordinator = 2; +} + +message PFetchRemoteSchemaResponse { + optional PStatus status = 1; + // intermediate merged schema + optional TabletSchemaPB merged_schema = 2; +} + service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult); @@ -731,5 +748,6 @@ service PBackendService { rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns (PGetTabletVersionsResponse); rpc report_stream_load_status(PReportStreamLoadStatusRequest) returns (PReportStreamLoadStatusResponse); rpc glob(PGlobRequest) returns (PGlobResponse); + rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns (PFetchRemoteSchemaResponse); }; diff --git a/regression-test/data/variant_p0/desc.out b/regression-test/data/variant_p0/desc.out new file mode 100644 index 0000000000..91a6f95609 --- /dev/null +++ b/regression-test/data/variant_p0/desc.out @@ -0,0 +1,101 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_1 -- +k BIGINT Yes true NULL NONE +v VARIANT Yes false NULL NONE +v.a SMALLINT Yes false NULL NONE +v.xxxx TEXT Yes false NULL NONE + +-- !sql_2 -- +k BIGINT Yes true NULL NONE +v VARIANT Yes false NULL NONE +v.a SMALLINT Yes false NULL NONE +v.ddd.aaa TINYINT Yes false NULL NONE +v.ddd.mxmxm JSON Yes false NULL NONE +v.xxxx TEXT Yes false NULL NONE + +-- !sql_3 -- +k BIGINT Yes true NULL NONE +v VARIANT Yes false NULL NONE +v.a SMALLINT Yes false NULL NONE +v.b JSON Yes false NULL NONE +v.c.c SMALLINT Yes false NULL NONE +v.c.e DOUBLE Yes false NULL NONE +v.xxxx TEXT Yes false NULL NONE + +-- !sql_5 -- +k BIGINT Yes true NULL NONE +v VARIANT Yes false NULL NONE + +-- !sql_6_1 -- +k BIGINT Yes true NULL NONE +v VARIANT Yes false NULL NONE +v.a SMALLINT Yes false NULL NONE +v.ddd.aaa TINYINT Yes false NULL NONE +v.ddd.mxmxm JSON Yes false NULL NONE +v.xxxx TEXT Yes false NULL NONE + +-- !sql_6_2 -- +k BIGINT Yes true NULL NONE +v VARIANT Yes false NULL NONE +v.a SMALLINT Yes false NULL NONE +v.xxxx TEXT Yes false NULL NONE + +-- !sql_6_3 -- +k BIGINT Yes true NULL NONE +v VARIANT Yes false NULL NONE +v.a SMALLINT Yes false NULL NONE +v.b JSON Yes false NULL NONE +v.c.c SMALLINT Yes false NULL NONE +v.c.e DOUBLE Yes false NULL NONE + +-- !sql_6 -- +k BIGINT Yes true NULL NONE +v VARIANT Yes false NULL NONE +v.a SMALLINT Yes false NULL NONE +v.b JSON Yes false NULL NONE +v.c.c SMALLINT Yes false NULL NONE +v.c.e DOUBLE Yes false NULL NONE +v.ddd.aaa TINYINT Yes false NULL NONE +v.ddd.mxmxm JSON Yes false NULL NONE +v.xxxx TEXT Yes false NULL NONE + +-- !sql_7 -- +k BIGINT Yes true NULL NONE +v VARIANT Yes false NULL NONE +v.a SMALLINT Yes false NULL NONE +v.b JSON Yes false NULL NONE +v.c.c SMALLINT Yes false NULL NONE +v.c.e DOUBLE Yes false NULL NONE +v.xxxx TEXT Yes false NULL NONE + +-- !sql_7_1 -- +k BIGINT Yes true NULL NONE +v VARIANT Yes false NULL NONE +v.a SMALLINT Yes false NULL NONE +v.xxxx TEXT Yes false NULL NONE + +-- !sql_7_2 -- +k BIGINT Yes true NULL NONE +v VARIANT Yes false NULL NONE +v.a SMALLINT Yes false NULL NONE +v.b JSON Yes false NULL NONE +v.c.c SMALLINT Yes false NULL NONE +v.c.e DOUBLE Yes false NULL NONE + +-- !sql_8 -- +k BIGINT Yes true NULL NONE +v1 VARIANT Yes false NULL NONE +v1.a SMALLINT Yes false NULL NONE +v1.b JSON Yes false NULL NONE +v1.c.c SMALLINT Yes false NULL NONE +v1.c.e DOUBLE Yes false NULL NONE +v1.oooo.xxxx.xxx TINYINT Yes false NULL NONE +v2 VARIANT Yes false NULL NONE +v2.a SMALLINT Yes false NULL NONE +v2.xxxx TEXT Yes false NULL NONE +v3 VARIANT Yes false NULL NONE +v3.a SMALLINT Yes false NULL NONE +v3.b JSON Yes false NULL NONE +v3.c.c SMALLINT Yes false NULL NONE +v3.c.e DOUBLE Yes false NULL NONE + diff --git a/regression-test/suites/variant_p0/desc.groovy b/regression-test/suites/variant_p0/desc.groovy new file mode 100644 index 0000000000..1058390f6e --- /dev/null +++ b/regression-test/suites/variant_p0/desc.groovy @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("regression_test_variant_desc", "variant_type_desc"){ + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def create_table = { table_name, buckets="auto" -> + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS ${buckets} + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + } + + def create_table_partition = { table_name, buckets="auto" -> + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + PARTITION BY RANGE(k) + ( + PARTITION p1 VALUES LESS THAN (3000), + PARTITION p2 VALUES LESS THAN (50000), + PARTITION p3 VALUES LESS THAN (100000) + ) + DISTRIBUTED BY HASH(k) BUCKETS ${buckets} + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + } + + def set_be_config = { key, value -> + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + + try { + // sparse columns + def table_name = "sparse_columns" + create_table table_name + set_be_config.call("ratio_of_defaults_as_sparse_column", "0.95") + sql """insert into sparse_columns select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str + union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_1 """desc ${table_name}""" + sql "truncate table sparse_columns" + sql """insert into sparse_columns select 0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str + union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" : {"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_2 """desc ${table_name}""" + sql "truncate table sparse_columns" + + // no sparse columns + table_name = "no_sparse_columns" + create_table.call(table_name, "4") + sql "set enable_two_phase_read_opt = false;" + set_be_config.call("ratio_of_defaults_as_sparse_column", "1") + sql """insert into ${table_name} select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str + union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_3 """desc ${table_name}""" + sql "truncate table ${table_name}" + + // always sparse column + set_be_config.call("ratio_of_defaults_as_sparse_column", "0") + sql """insert into ${table_name} select 0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str + union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" : {"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_5 """desc ${table_name}""" + sql "truncate table ${table_name}" + + // partititon + table_name = "partition_data" + create_table_partition.call(table_name, "4") + sql "set enable_two_phase_read_opt = false;" + set_be_config.call("ratio_of_defaults_as_sparse_column", "0.95") + sql """insert into ${table_name} select 2500, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str + union all select 2500, '{"a" : 1234, "xxxx" : "kaana", "ddd" : {"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = "4096") limit 4096 ;""" + sql """insert into ${table_name} select 45000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str + union all select 45000, '{"a": 1123}' as json_str union all select 45000, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + sql """insert into ${table_name} values(95000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + qt_sql_6_1 """desc ${table_name} partition p1""" + qt_sql_6_2 """desc ${table_name} partition p2""" + qt_sql_6_3 """desc ${table_name} partition p3""" + qt_sql_6 """desc ${table_name}""" + sql "truncate table ${table_name}" + + // drop partition + table_name = "drop_partition" + create_table_partition.call(table_name, "4") + // insert into partition p1 + sql """insert into ${table_name} values(2500, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + // insert into partition p2 + sql """insert into ${table_name} values(45000, '{"a": 11245, "xxxx" : "kaana"}')""" + // insert into partition p3 + sql """insert into ${table_name} values(95000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + // drop p1 + sql """alter table ${table_name} drop partition p1""" + qt_sql_7 """desc ${table_name}""" + qt_sql_7_1 """desc ${table_name} partition p2""" + qt_sql_7_2 """desc ${table_name} partition p3""" + sql "truncate table ${table_name}" + + // more variant + table_name = "more_variant_table" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v1 variant, + v2 variant, + v3 variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 5 + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}', '{"a": 11245, "xxxx" : "kaana"}', '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + qt_sql_8 """desc ${table_name}""" + sql "truncate table ${table_name}" + } finally { + // reset flags + set_be_config.call("ratio_of_defaults_as_sparse_column", "0.95") + } +} diff --git a/regression-test/suites/variant_p0/load.groovy b/regression-test/suites/variant_p0/load.groovy index 472cdcbcdc..67165e1371 100644 --- a/regression-test/suites/variant_p0/load.groovy +++ b/regression-test/suites/variant_p0/load.groovy @@ -77,222 +77,222 @@ suite("regression_test_variant", "variant_type"){ try { def table_name = "simple_variant" // // // 1. simple cases + create_table table_name + sql """insert into ${table_name} values (1, '[1]'),(1, '{"a" : 1}');""" + sql """insert into ${table_name} values (2, '[2]'),(1, '{"a" : [[[1]]]}');""" + sql """insert into ${table_name} values (3, '3'),(1, '{"a" : 1}'), (1, '{"a" : [1]}');""" + sql """insert into ${table_name} values (4, '"4"'),(1, '{"a" : "1223"}');""" + sql """insert into ${table_name} values (5, '5.0'),(1, '{"a" : [1]}');""" + sql """insert into ${table_name} values (6, '"[6]"'),(1, '{"a" : ["1", 2, 1.1]}');""" + sql """insert into ${table_name} values (7, '7'),(1, '{"a" : 1, "b" : {"c" : 1}}');""" + sql """insert into ${table_name} values (8, '8.11111'),(1, '{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');""" + sql """insert into ${table_name} values (9, '"9999"'),(1, '{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');""" + sql """insert into ${table_name} values (10, '1000000'),(1, '{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');""" + sql """insert into ${table_name} values (11, '[123.0]'),(1, '{"a" : 1, "b" : {"c" : 1}}'),(1, '{"a" : 1, "b" : 10}');""" + sql """insert into ${table_name} values (12, '[123.2]'),(1, '{"a" : 1, "b" : 10}'),(1, '{"a" : 1, "b" : {"c" : 1}}');""" + qt_sql_1 "select k, v from simple_variant order by k, cast(v as string)" + qt_sql_1_1 "select k, v, cast(v:b as string) from simple_variant where length(cast(v:b as string)) > 4 order by k, cast(v as string)" + verify table_name + + // 2. type confilct cases + table_name = "type_conflict_resolution" + create_table table_name + sql """insert into ${table_name} values (1, '{"c" : "123"}');""" + sql """insert into ${table_name} values (2, '{"c" : 123}');""" + sql """insert into ${table_name} values (3, '{"cc" : [123]}');""" + sql """insert into ${table_name} values (4, '{"cc" : [123.1]}');""" + sql """insert into ${table_name} values (5, '{"ccc" : 123}');""" + sql """insert into ${table_name} values (6, '{"ccc" : 123321}');""" + sql """insert into ${table_name} values (7, '{"cccc" : 123.0}');""" + sql """insert into ${table_name} values (8, '{"cccc" : 123.11}');""" + sql """insert into ${table_name} values (9, '{"ccccc" : [123]}');""" + sql """insert into ${table_name} values (10, '{"ccccc" : [123456789]}');""" + sql """insert into ${table_name} values (11, '{"b" : 1111111111111111}');""" + sql """insert into ${table_name} values (12, '{"b" : 1.222222}');""" + sql """insert into ${table_name} values (13, '{"bb" : 1}');""" + sql """insert into ${table_name} values (14, '{"bb" : 214748364711}');""" + sql """insert into ${table_name} values (15, '{"A" : 1}');""" + qt_sql """select v from type_conflict_resolution order by k;""" + verify table_name + + // 3. simple variant sub column select + table_name = "simple_select_variant" + create_table table_name + sql """insert into ${table_name} values (1, '{"A" : 123}');""" + sql """insert into ${table_name} values (2, '{"A" : 1}');""" + sql """insert into ${table_name} values (4, '{"A" : 123456}');""" + sql """insert into ${table_name} values (8, '{"A" : 123456789101112}');""" + qt_sql_2 "select v:A from ${table_name} order by cast(v:A as int)" + sql """insert into ${table_name} values (12, '{"AA" : [123456]}');""" + sql """insert into ${table_name} values (14, '{"AA" : [123456789101112]}');""" + // qt_sql_3 "select v:AA from ${table_name} where size(v:AA) > 0 order by k" + qt_sql_4 "select v:A, v:AA, v from ${table_name} order by k" + qt_sql_5 "select v:A, v:AA, v, v from ${table_name} where cast(v:A as bigint) > 123 order by k" + + sql """insert into ${table_name} values (16, '{"a" : 123.0, "A" : 191191, "c": 123}');""" + sql """insert into ${table_name} values (18, '{"a" : "123", "c" : 123456}');""" + sql """insert into ${table_name} values (20, '{"a" : 1.10111, "A" : 1800, "c" : [12345]}');""" + // sql """insert into ${table_name} values (12, '{"a" : [123]}, "c": "123456"');""" + sql """insert into ${table_name} values (22, '{"a" : 1.1111, "A" : 17211, "c" : 111111}');""" + sql "sync" + qt_sql_6 "select v:a, v:A from ${table_name} order by cast(v:A as bigint), k" + qt_sql_7 "select k, v:A from ${table_name} where cast(v:A as bigint) >= 1 order by cast(v:A as bigint), k" + + // TODO: if not cast, then v:a could return "123" or 123 which is none determinately + qt_sql_8 "select cast(v:a as string), v:A from ${table_name} where cast(v:a as json) is null order by k" + // qt_sql_9 "select cast(v:a as string), v:A from ${table_name} where cast(v:A as json) is null order by k" + + // !!! Not found cast function String to Float64 + // qt_sql_10 "select v:a, v:A from ${table_name} where cast(v:a as double) > 0 order by k" + qt_sql_11 "select v:A from ${table_name} where cast(v:A as bigint) > 1 order by k" + + // ----%%---- + qt_sql_12 "select v:A, v from ${table_name} where cast(v:A as bigint) > 1 order by k" + // ----%%---- + qt_sql_13 "select v:a, v:A from simple_select_variant where 1=1 and cast(v:a as json) is null and cast(v:A as bigint) >= 1 order by k;" + qt_sql_14 """select v:a, v:A, v from simple_select_variant where cast(v:A as bigint) > 0 and cast(v:A as bigint) = 123456 limit 1;""" + + // !!! Not found cast function String to Float64 + // qt_sql_15 "select v:a, v:A from ${table_name} where 1=1 and cast(v:a as double) > 0 and v:A is not null order by k" + // qt_sql_16 "select v:a, v:A, v:c from ${table_name} where 1=1 and cast(v:a as double) > 0 and v:A is not null order by k" + + // TODO: if not cast, then v:a could return "123" or 123 which is none determinately + qt_sql_17 "select cast(v:a as json), v:A, v, v:AA from simple_select_variant where cast(v:A as bigint) is null order by k;" + + sql """insert into simple_select_variant values (12, '{"oamama": 1.1}')""" + qt_sql_18 "select v:a, v:A, v, v:oamama from simple_select_variant where cast(v:oamama as double) is null order by k;" + qt_sql_19 """select v:a, v:A, v, v:oamama from simple_select_variant where cast(v:oamama as double) is not null order by k""" + qt_sql_20 """select v:A from simple_select_variant where cast(v:A as bigint) > 0 and cast(v:A as bigint) = 123456 limit 1;""" + + // !!! Not found cast function String to Float64 + // qt_sql_21 """select v:A, v:a, v from simple_select_variant where cast(v:A as bigint) > 0 and cast(v:a as double) > 1 order by cast(v:A as bigint);""" + + sql "truncate table simple_select_variant" + sql """insert into simple_select_variant values (11, '{"x": [123456]}');""" + sql """insert into simple_select_variant values (12, '{"x": [123456789101112]}');""" + sql """insert into simple_select_variant values (12, '{"xxx" : 123, "yyy" : 456}');""" + qt_sql_21_1 """select * from simple_select_variant where cast(v:x as json) is null""" + qt_sql_21_2 """select cast(v:x as json) from simple_select_variant where cast(v:x as json) is not null order by k;""" + + // 4. multi variant in single table + table_name = "multi_variant" + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v1 variant, + v2 variant, + v3 variant + + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY RANDOM BUCKETS 5 + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + sql """insert into ${table_name} values (1, '{"A" : 123}', '{"B" : 123}', '{"C" : 456}');""" + sql """insert into ${table_name} values (2, '{"C" : "123"}', '{"D" : [123]}', '{"E" : 789}');""" + sql """insert into ${table_name} values (3, '{"C" : "123"}', '{"C" : [123]}', '{"E" : "789"}');""" + sql "sync" + verify table_name + qt_sql_22 "select v1:A from multi_variant order by k;" + qt_sql_23 "select v2:D from multi_variant order by k;" + qt_sql_24 "select v2:C from multi_variant order by k;" + + // 5. multi tablets concurrent load + table_name = "t_json_parallel" + create_table table_name + sql """INSERT INTO t_json_parallel SELECT *, '{"k1":1, "k2": "some", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}' FROM numbers("number" = "50000");""" + qt_sql_25 """ SELECT sum(cast(v:k1 as int)), sum(cast(v:k4 as double)), sum(cast(json_extract(v:k5, "\$.[0].[0]") as int)) from t_json_parallel; """ + //50000 61700000 55000.00000000374 6150000 + // 7. gh data + table_name = "ghdata" + create_table table_name + load_json_data.call(table_name, """${getS3Url() + '/load/ghdata_sample.json'}""") + qt_sql_26 "select count() from ${table_name}" + + // 8. json empty string + // table_name = "empty_string" // create_table table_name - // sql """insert into ${table_name} values (1, '[1]'),(1, '{"a" : 1}');""" - // sql """insert into ${table_name} values (2, '[2]'),(1, '{"a" : [[[1]]]}');""" - // sql """insert into ${table_name} values (3, '3'),(1, '{"a" : 1}'), (1, '{"a" : [1]}');""" - // sql """insert into ${table_name} values (4, '"4"'),(1, '{"a" : "1223"}');""" - // sql """insert into ${table_name} values (5, '5.0'),(1, '{"a" : [1]}');""" - // sql """insert into ${table_name} values (6, '"[6]"'),(1, '{"a" : ["1", 2, 1.1]}');""" - // sql """insert into ${table_name} values (7, '7'),(1, '{"a" : 1, "b" : {"c" : 1}}');""" - // sql """insert into ${table_name} values (8, '8.11111'),(1, '{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');""" - // sql """insert into ${table_name} values (9, '"9999"'),(1, '{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');""" - // sql """insert into ${table_name} values (10, '1000000'),(1, '{"a" : 1, "b" : {"c" : [{"a" : 1}]}}');""" - // sql """insert into ${table_name} values (11, '[123.0]'),(1, '{"a" : 1, "b" : {"c" : 1}}'),(1, '{"a" : 1, "b" : 10}');""" - // sql """insert into ${table_name} values (12, '[123.2]'),(1, '{"a" : 1, "b" : 10}'),(1, '{"a" : 1, "b" : {"c" : 1}}');""" - // qt_sql_1 "select k, v from simple_variant order by k, cast(v as string)" - // qt_sql_1_1 "select k, v, cast(v:b as string) from simple_variant where length(cast(v:b as string)) > 4 order by k, cast(v as string)" - // verify table_name - - // // 2. type confilct cases - // table_name = "type_conflict_resolution" - // create_table table_name - // sql """insert into ${table_name} values (1, '{"c" : "123"}');""" - // sql """insert into ${table_name} values (2, '{"c" : 123}');""" - // sql """insert into ${table_name} values (3, '{"cc" : [123]}');""" - // sql """insert into ${table_name} values (4, '{"cc" : [123.1]}');""" - // sql """insert into ${table_name} values (5, '{"ccc" : 123}');""" - // sql """insert into ${table_name} values (6, '{"ccc" : 123321}');""" - // sql """insert into ${table_name} values (7, '{"cccc" : 123.0}');""" - // sql """insert into ${table_name} values (8, '{"cccc" : 123.11}');""" - // sql """insert into ${table_name} values (9, '{"ccccc" : [123]}');""" - // sql """insert into ${table_name} values (10, '{"ccccc" : [123456789]}');""" - // sql """insert into ${table_name} values (11, '{"b" : 1111111111111111}');""" - // sql """insert into ${table_name} values (12, '{"b" : 1.222222}');""" - // sql """insert into ${table_name} values (13, '{"bb" : 1}');""" - // sql """insert into ${table_name} values (14, '{"bb" : 214748364711}');""" - // sql """insert into ${table_name} values (15, '{"A" : 1}');""" - // qt_sql """select v from type_conflict_resolution order by k;""" - // verify table_name - - // // 3. simple variant sub column select - // table_name = "simple_select_variant" - // create_table table_name - // sql """insert into ${table_name} values (1, '{"A" : 123}');""" - // sql """insert into ${table_name} values (2, '{"A" : 1}');""" - // sql """insert into ${table_name} values (4, '{"A" : 123456}');""" - // sql """insert into ${table_name} values (8, '{"A" : 123456789101112}');""" - // qt_sql_2 "select v:A from ${table_name} order by cast(v:A as int)" - // sql """insert into ${table_name} values (12, '{"AA" : [123456]}');""" - // sql """insert into ${table_name} values (14, '{"AA" : [123456789101112]}');""" - // // qt_sql_3 "select v:AA from ${table_name} where size(v:AA) > 0 order by k" - // qt_sql_4 "select v:A, v:AA, v from ${table_name} order by k" - // qt_sql_5 "select v:A, v:AA, v, v from ${table_name} where cast(v:A as bigint) > 123 order by k" - - // sql """insert into ${table_name} values (16, '{"a" : 123.0, "A" : 191191, "c": 123}');""" - // sql """insert into ${table_name} values (18, '{"a" : "123", "c" : 123456}');""" - // sql """insert into ${table_name} values (20, '{"a" : 1.10111, "A" : 1800, "c" : [12345]}');""" - // // sql """insert into ${table_name} values (12, '{"a" : [123]}, "c": "123456"');""" - // sql """insert into ${table_name} values (22, '{"a" : 1.1111, "A" : 17211, "c" : 111111}');""" - // sql "sync" - // qt_sql_6 "select v:a, v:A from ${table_name} order by cast(v:A as bigint), k" - // qt_sql_7 "select k, v:A from ${table_name} where cast(v:A as bigint) >= 1 order by cast(v:A as bigint), k" - - // // TODO: if not cast, then v:a could return "123" or 123 which is none determinately - // qt_sql_8 "select cast(v:a as string), v:A from ${table_name} where cast(v:a as json) is null order by k" - // // qt_sql_9 "select cast(v:a as string), v:A from ${table_name} where cast(v:A as json) is null order by k" - - // // !!! Not found cast function String to Float64 - // // qt_sql_10 "select v:a, v:A from ${table_name} where cast(v:a as double) > 0 order by k" - // qt_sql_11 "select v:A from ${table_name} where cast(v:A as bigint) > 1 order by k" - - // // ----%%---- - // qt_sql_12 "select v:A, v from ${table_name} where cast(v:A as bigint) > 1 order by k" - // // ----%%---- - // qt_sql_13 "select v:a, v:A from simple_select_variant where 1=1 and cast(v:a as json) is null and cast(v:A as bigint) >= 1 order by k;" - // qt_sql_14 """select v:a, v:A, v from simple_select_variant where cast(v:A as bigint) > 0 and cast(v:A as bigint) = 123456 limit 1;""" - - // // !!! Not found cast function String to Float64 - // // qt_sql_15 "select v:a, v:A from ${table_name} where 1=1 and cast(v:a as double) > 0 and v:A is not null order by k" - // // qt_sql_16 "select v:a, v:A, v:c from ${table_name} where 1=1 and cast(v:a as double) > 0 and v:A is not null order by k" - - // // TODO: if not cast, then v:a could return "123" or 123 which is none determinately - // qt_sql_17 "select cast(v:a as json), v:A, v, v:AA from simple_select_variant where cast(v:A as bigint) is null order by k;" - - // sql """insert into simple_select_variant values (12, '{"oamama": 1.1}')""" - // qt_sql_18 "select v:a, v:A, v, v:oamama from simple_select_variant where cast(v:oamama as double) is null order by k;" - // qt_sql_19 """select v:a, v:A, v, v:oamama from simple_select_variant where cast(v:oamama as double) is not null order by k""" - // qt_sql_20 """select v:A from simple_select_variant where cast(v:A as bigint) > 0 and cast(v:A as bigint) = 123456 limit 1;""" - - // // !!! Not found cast function String to Float64 - // // qt_sql_21 """select v:A, v:a, v from simple_select_variant where cast(v:A as bigint) > 0 and cast(v:a as double) > 1 order by cast(v:A as bigint);""" - - // sql "truncate table simple_select_variant" - // sql """insert into simple_select_variant values (11, '{"x": [123456]}');""" - // sql """insert into simple_select_variant values (12, '{"x": [123456789101112]}');""" - // sql """insert into simple_select_variant values (12, '{"xxx" : 123, "yyy" : 456}');""" - // qt_sql_21_1 """select * from simple_select_variant where cast(v:x as json) is null""" - // qt_sql_21_2 """select cast(v:x as json) from simple_select_variant where cast(v:x as json) is not null order by k;""" - - // // 4. multi variant in single table - // table_name = "multi_variant" - // sql "DROP TABLE IF EXISTS ${table_name}" - // sql """ - // CREATE TABLE IF NOT EXISTS ${table_name} ( - // k bigint, - // v1 variant, - // v2 variant, - // v3 variant - // - // ) - // DUPLICATE KEY(`k`) - // DISTRIBUTED BY RANDOM BUCKETS 5 - // properties("replication_num" = "1", "disable_auto_compaction" = "false"); - // """ - // sql """insert into ${table_name} values (1, '{"A" : 123}', '{"B" : 123}', '{"C" : 456}');""" - // sql """insert into ${table_name} values (2, '{"C" : "123"}', '{"D" : [123]}', '{"E" : 789}');""" - // sql """insert into ${table_name} values (3, '{"C" : "123"}', '{"C" : [123]}', '{"E" : "789"}');""" - // sql "sync" - // verify table_name - // qt_sql_22 "select v1:A from multi_variant order by k;" - // qt_sql_23 "select v2:D from multi_variant order by k;" - // qt_sql_24 "select v2:C from multi_variant order by k;" - - // // 5. multi tablets concurrent load - // table_name = "t_json_parallel" - // create_table table_name - // sql """INSERT INTO t_json_parallel SELECT *, '{"k1":1, "k2": "some", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}' FROM numbers("number" = "50000");""" - // qt_sql_25 """ SELECT sum(cast(v:k1 as int)), sum(cast(v:k4 as double)), sum(cast(json_extract(v:k5, "\$.[0].[0]") as int)) from t_json_parallel; """ - // //50000 61700000 55000.00000000374 6150000 - // // 7. gh data - // table_name = "ghdata" - // create_table table_name - // load_json_data.call(table_name, """${getS3Url() + '/load/ghdata_sample.json'}""") - // qt_sql_26 "select count() from ${table_name}" - - // // 8. json empty string - // // table_name = "empty_string" - // // create_table table_name - // // sql """INSERT INTO empty_string VALUES (1, ''), (2, '{"k1": 1, "k2": "v1"}'), (3, '{}'), (4, '{"k1": 2}');""" - // // sql """INSERT INTO empty_string VALUES (3, null), (4, '{"k1": 1, "k2": "v1"}'), (3, '{}'), (4, '{"k1": 2}');""" - // // qt_sql_27 "SELECT * FROM ${table_name} ORDER BY k;" - - // // // 9. btc data - // // table_name = "btcdata" - // // create_table table_name - // // load_json_data.call(table_name, """${getS3Url() + '/load/btc_transactions.json'}""") - // // qt_sql_28 "select count() from ${table_name}" - - // // 10. alter add variant - // table_name = "alter_variant" - // create_table table_name - // sql """INSERT INTO ${table_name} VALUES (1, ''), (1, '{"k1": 1, "k2": "v1"}'), (1, '{}'), (1, '{"k1": 2}');""" - // sql "alter table ${table_name} add column v2 variant default null" - // sql """INSERT INTO ${table_name} VALUES (1, '{"kyyyy" : "123"}', '{"kxkxkxkx" : [123]}'), (1, '{"kxxxx" : 123}', '{"xxxxyyyy": 123}');""" - // qt_sql_29_1 """select * from alter_variant where length(cast(v2 as string)) > 2 order by k, cast(v as string), cast(v2 as string);""" - // verify table_name - - // // 11. boolean values - // table_name = "boolean_values" - // create_table table_name - // sql """INSERT INTO ${table_name} VALUES (1, ''), (2, '{"k1": true, "k2": false}'), (3, '{}'), (4, '{"k1": false}');""" - // verify table_name + // sql """INSERT INTO empty_string VALUES (1, ''), (2, '{"k1": 1, "k2": "v1"}'), (3, '{}'), (4, '{"k1": 2}');""" + // sql """INSERT INTO empty_string VALUES (3, null), (4, '{"k1": 1, "k2": "v1"}'), (3, '{}'), (4, '{"k1": 2}');""" + // qt_sql_27 "SELECT * FROM ${table_name} ORDER BY k;" - // // 12. jsonb values - // table_name = "jsonb_values" - // create_table table_name - // sql """insert into ${table_name} values (1, '{"a" : ["123", 123, [123]]}')""" - // sql """insert into ${table_name} values (2, '{"a" : ["123"]}')""" - // sql """insert into ${table_name} values (3, '{"a" : "123"}')""" - // sql """insert into ${table_name} values (4, '{"a" : 123456}')""" - // sql """insert into ${table_name} values (5, '{"a" : [123, "123", 1.11111]}')""" - // sql """insert into ${table_name} values (6, '{"a" : [123, 1.11, "123"]}')""" - // sql """insert into ${table_name} values(7, '{"a" : [123, {"xx" : 1}], "b" : {"c" : 456, "d" : null, "e" : 7.111}}')""" - // // TODO data bellow is invalid at present - // // sql """insert into ${table_name} values (8, '{"a" : [123, 111........]}')""" - // sql """insert into ${table_name} values (9, '{"a" : [123, {"a" : 1}]}')""" - // sql """insert into ${table_name} values (10, '{"a" : [{"a" : 1}, 123]}')""" - // qt_sql_29 "select v:a from ${table_name} order by k" - // // b? 7.111 [123,{"xx":1}] {"b":{"c":456,"e":7.111}} 456 - // qt_sql_30 "select v:b.e, v:a, v:b, v:b.c from jsonb_values where cast(v:b.e as double) > 1;" - - // // 13. sparse columns - // table_name = "sparse_columns" + // // 9. btc data + // table_name = "btcdata" // create_table table_name - // sql """insert into sparse_columns select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str - // union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" - // qt_sql_30 """ select v from sparse_columns where v is not null and json_extract(v, "\$") != "{}" order by cast(v as string) limit 10""" - // sql "truncate table sparse_columns" - // sql """insert into sparse_columns select 0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str - // union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" : {"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = "4096") limit 4096 ;""" - // qt_sql_31 """ select v from sparse_columns where v is not null and json_extract(v, "\$") != "{}" order by cast(v as string) limit 10""" - // sql "truncate table sparse_columns" - - // // 12. streamload remote file - // table_name = "logdata" - // create_table.call(table_name, "4") - // sql "set enable_two_phase_read_opt = false;" - // // no sparse columns - // set_be_config.call("ratio_of_defaults_as_sparse_column", "1") - // load_json_data.call(table_name, """${getS3Url() + '/load/logdata.json'}""") - // qt_sql_32 """ select v->"\$.json.parseFailed" from logdata where v->"\$.json.parseFailed" != 'null' order by k limit 10;""" - // qt_sql_32_1 """select v:json.parseFailed from logdata where cast(v:json.parseFailed as string) is not null and k = 162;""" - // sql "truncate table ${table_name}" - - // // 0.95 default ratio - // set_be_config.call("ratio_of_defaults_as_sparse_column", "0.95") - // load_json_data.call(table_name, """${getS3Url() + '/load/logdata.json'}""") - // qt_sql_33 """ select v->"\$.json.parseFailed" from logdata where v->"\$.json.parseFailed" != 'null' order by k limit 10;""" - // qt_sql_33_1 """select v:json.parseFailed from logdata where cast(v:json.parseFailed as string) is not null and k = 162;""" - // sql "truncate table ${table_name}" - - // // always sparse column - // set_be_config.call("ratio_of_defaults_as_sparse_column", "0") - // load_json_data.call(table_name, """${getS3Url() + '/load/logdata.json'}""") - // qt_sql_34 """ select v->"\$.json.parseFailed" from logdata where v->"\$.json.parseFailed" != 'null' order by k limit 10;""" - // sql "truncate table ${table_name}" - // qt_sql_35 """select v->"\$.json.parseFailed" from logdata where k = 162 and v->"\$.json.parseFailed" != 'null';""" - // qt_sql_35_1 """select v:json.parseFailed from logdata where cast(v:json.parseFailed as string) is not null and k = 162;""" + // load_json_data.call(table_name, """${getS3Url() + '/load/btc_transactions.json'}""") + // qt_sql_28 "select count() from ${table_name}" + + // 10. alter add variant + table_name = "alter_variant" + create_table table_name + sql """INSERT INTO ${table_name} VALUES (1, ''), (1, '{"k1": 1, "k2": "v1"}'), (1, '{}'), (1, '{"k1": 2}');""" + sql "alter table ${table_name} add column v2 variant default null" + sql """INSERT INTO ${table_name} VALUES (1, '{"kyyyy" : "123"}', '{"kxkxkxkx" : [123]}'), (1, '{"kxxxx" : 123}', '{"xxxxyyyy": 123}');""" + qt_sql_29_1 """select * from alter_variant where length(cast(v2 as string)) > 2 order by k, cast(v as string), cast(v2 as string);""" + verify table_name + + // 11. boolean values + table_name = "boolean_values" + create_table table_name + sql """INSERT INTO ${table_name} VALUES (1, ''), (2, '{"k1": true, "k2": false}'), (3, '{}'), (4, '{"k1": false}');""" + verify table_name + + // 12. jsonb values + table_name = "jsonb_values" + create_table table_name + sql """insert into ${table_name} values (1, '{"a" : ["123", 123, [123]]}')""" + sql """insert into ${table_name} values (2, '{"a" : ["123"]}')""" + sql """insert into ${table_name} values (3, '{"a" : "123"}')""" + sql """insert into ${table_name} values (4, '{"a" : 123456}')""" + sql """insert into ${table_name} values (5, '{"a" : [123, "123", 1.11111]}')""" + sql """insert into ${table_name} values (6, '{"a" : [123, 1.11, "123"]}')""" + sql """insert into ${table_name} values(7, '{"a" : [123, {"xx" : 1}], "b" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + // TODO data bellow is invalid at present + // sql """insert into ${table_name} values (8, '{"a" : [123, 111........]}')""" + sql """insert into ${table_name} values (9, '{"a" : [123, {"a" : 1}]}')""" + sql """insert into ${table_name} values (10, '{"a" : [{"a" : 1}, 123]}')""" + qt_sql_29 "select v:a from ${table_name} order by k" + // b? 7.111 [123,{"xx":1}] {"b":{"c":456,"e":7.111}} 456 + qt_sql_30 "select v:b.e, v:a, v:b, v:b.c from jsonb_values where cast(v:b.e as double) > 1;" + + // 13. sparse columns + table_name = "sparse_columns" + create_table table_name + sql """insert into sparse_columns select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str + union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_30 """ select v from sparse_columns where v is not null and json_extract(v, "\$") != "{}" order by cast(v as string) limit 10""" + sql "truncate table sparse_columns" + sql """insert into sparse_columns select 0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str + union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" : {"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_31 """ select v from sparse_columns where v is not null and json_extract(v, "\$") != "{}" order by cast(v as string) limit 10""" + sql "truncate table sparse_columns" + + // 12. streamload remote file + table_name = "logdata" + create_table.call(table_name, "4") + sql "set enable_two_phase_read_opt = false;" + // no sparse columns + set_be_config.call("ratio_of_defaults_as_sparse_column", "1") + load_json_data.call(table_name, """${getS3Url() + '/load/logdata.json'}""") + qt_sql_32 """ select v->"\$.json.parseFailed" from logdata where v->"\$.json.parseFailed" != 'null' order by k limit 10;""" + qt_sql_32_1 """select v:json.parseFailed from logdata where cast(v:json.parseFailed as string) is not null and k = 162;""" + sql "truncate table ${table_name}" + + // 0.95 default ratio + set_be_config.call("ratio_of_defaults_as_sparse_column", "0.95") + load_json_data.call(table_name, """${getS3Url() + '/load/logdata.json'}""") + qt_sql_33 """ select v->"\$.json.parseFailed" from logdata where v->"\$.json.parseFailed" != 'null' order by k limit 10;""" + qt_sql_33_1 """select v:json.parseFailed from logdata where cast(v:json.parseFailed as string) is not null and k = 162;""" + sql "truncate table ${table_name}" + + // always sparse column + set_be_config.call("ratio_of_defaults_as_sparse_column", "0") + load_json_data.call(table_name, """${getS3Url() + '/load/logdata.json'}""") + qt_sql_34 """ select v->"\$.json.parseFailed" from logdata where v->"\$.json.parseFailed" != 'null' order by k limit 10;""" + sql "truncate table ${table_name}" + qt_sql_35 """select v->"\$.json.parseFailed" from logdata where k = 162 and v->"\$.json.parseFailed" != 'null';""" + qt_sql_35_1 """select v:json.parseFailed from logdata where cast(v:json.parseFailed as string) is not null and k = 162;""" // load gharchive --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org