This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit e6fbccd3ed879bc9f628c65763cb5e8168913813 Author: lihangyu <15605149...@163.com> AuthorDate: Wed Jan 31 10:51:42 2024 +0800 [Feature](Variant) support row store for variant type (#30052) --- be/src/exprs/json_functions.cpp | 2 + be/src/olap/rowset/segment_creator.cpp | 15 ++++- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 1 + be/src/vec/columns/column_object.h | 8 +++ be/src/vec/common/schema_util.cpp | 43 +++++++++++++-- be/src/vec/common/schema_util.h | 10 +++- .../data_types/serde/data_type_object_serde.cpp | 38 +++++++++++++ .../vec/data_types/serde/data_type_object_serde.h | 10 +--- .../data/variant_p0/variant_with_rowstore.out | 25 +++++++++ .../suites/variant_p0/variant_with_rowstore.groovy | 64 ++++++++++++++++++++++ 10 files changed, 199 insertions(+), 17 deletions(-) diff --git a/be/src/exprs/json_functions.cpp b/be/src/exprs/json_functions.cpp index 724c78922e8..ff432c4655a 100644 --- a/be/src/exprs/json_functions.cpp +++ b/be/src/exprs/json_functions.cpp @@ -330,6 +330,8 @@ void JsonFunctions::merge_objects(rapidjson::Value& dst_object, rapidjson::Value if (!src_object.IsObject()) { return; } + VLOG_DEBUG << "merge from src: " << print_json_value(src_object) + << ", to: " << print_json_value(dst_object); for (auto src_it = src_object.MemberBegin(); src_it != src_object.MemberEnd(); ++src_it) { auto dst_it = dst_object.FindMember(src_it->name); if (dst_it != dst_object.MemberEnd()) { diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 19969b6b146..0fa0151b4b3 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -38,9 +38,11 @@ #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_object.h" +#include "vec/columns/column_string.h" #include "vec/common/assert_cast.h" #include "vec/common/schema_util.h" // variant column #include "vec/core/block.h" +#include "vec/core/columns_with_type_and_name.h" namespace doris { using namespace ErrorCode; @@ -111,8 +113,10 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, return Status::OK(); } - RETURN_IF_ERROR( - vectorized::schema_util::parse_and_encode_variant_columns(block, variant_column_pos)); + vectorized::schema_util::ParseContext ctx; + ctx.record_raw_json_column = _context->original_tablet_schema->store_row_column(); + RETURN_IF_ERROR(vectorized::schema_util::parse_and_encode_variant_columns( + block, variant_column_pos, ctx)); // Dynamic Block consists of two parts, dynamic part of columns and static part of columns // static extracted @@ -173,6 +177,13 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column( {}, root->data.get_finalized_column_ptr()->assume_mutable(), root->data.get_least_common_type()); + + // // set for rowstore + if (_context->original_tablet_schema->store_row_column()) { + static_cast<vectorized::ColumnObject*>(obj.get())->set_rowstore_column( + object_column.get_rowstore_column()); + } + vectorized::ColumnPtr result = obj->get_ptr(); if (is_nullable) { const auto& null_map = assert_cast<const vectorized::ColumnNullable&>(*column_ref) diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index e5725e6fe6e..3c7de0adb7b 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -2366,6 +2366,7 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { } } #endif + VLOG_DEBUG << "dump block " << block->dump_data(0, block->rows()); // reverse block row order if (_opts.read_orderby_key_reverse) { diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 86f3bd19ceb..7f328992a25 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -222,6 +222,10 @@ private: // this structure and fill with Subcolumns sub items mutable std::shared_ptr<rapidjson::Document> doc_structure; + // column with raw json strings + // used for quickly row store encoding + ColumnPtr rowstore_column; + public: static constexpr auto COLUMN_NAME_DUMMY = "_dummy"; @@ -241,6 +245,10 @@ public: return subcolumns.get_mutable_root()->data.get_finalized_column_ptr()->assume_mutable(); } + void set_rowstore_column(ColumnPtr col) { rowstore_column = col; } + + ColumnPtr get_rowstore_column() const { return rowstore_column; } + bool serialize_one_row_to_string(int row, std::string* output) const; bool serialize_one_row_to_string(int row, BufferWritable& output) const; diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index dc6f65584f2..b943f2e637b 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -47,6 +47,7 @@ #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "udf/udf.h" +#include "util/defer_op.h" #include "vec/columns/column.h" #include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" @@ -393,10 +394,11 @@ Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, return Status::OK(); } -Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& variant_pos) { +Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& variant_pos, + const ParseContext& ctx) { try { // Parse each variant column from raw string column - RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_pos)); + RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_pos, ctx)); vectorized::schema_util::finalize_variant_columns(block, variant_pos, false /*not ingore sparse*/); vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_pos); @@ -408,14 +410,44 @@ Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& va return Status::OK(); } -Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos) { +Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos, + const ParseContext& ctx) { for (int i = 0; i < variant_pos.size(); ++i) { - const auto& column_ref = block.get_by_position(variant_pos[i]).column; + auto column_ref = block.get_by_position(variant_pos[i]).column; bool is_nullable = column_ref->is_nullable(); const auto& column = remove_nullable(column_ref); const auto& var = assert_cast<const ColumnObject&>(*column.get()); var.assume_mutable_ref().finalize(); + + MutableColumnPtr variant_column; + bool record_raw_string_with_serialization = false; + // set + auto __defer = Defer([&]() { + if (!ctx.record_raw_json_column) { + return; + } + auto* var = static_cast<vectorized::ColumnObject*>(variant_column.get()); + if (record_raw_string_with_serialization) { + // encode to raw json column + auto raw_column = vectorized::ColumnString::create(); + for (size_t i = 0; i < var->rows(); ++i) { + std::string raw_str; + var->serialize_one_row_to_string(i, &raw_str); + raw_column->insert_data(raw_str.c_str(), raw_str.size()); + } + var->set_rowstore_column(raw_column->get_ptr()); + } else { + // use original input json column + auto original_var_root = vectorized::check_and_get_column<vectorized::ColumnObject>( + remove_nullable(column_ref).get()) + ->get_root(); + var->set_rowstore_column(original_var_root); + } + }); + if (!var.is_scalar_variant()) { + variant_column = var.assume_mutable(); + record_raw_string_with_serialization = true; // already parsed continue; } @@ -440,9 +472,10 @@ Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos) : var.get_root(); } - MutableColumnPtr variant_column = ColumnObject::create(true); + variant_column = ColumnObject::create(true); parse_json_to_variant(*variant_column.get(), assert_cast<const ColumnString&>(*raw_json_column)); + // Wrap variant with nullmap if it is nullable ColumnPtr result = variant_column->get_ptr(); if (is_nullable) { diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index 43972e0788c..c626b875c85 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -82,12 +82,18 @@ TabletColumn get_column_by_type(const vectorized::DataTypePtr& data_type, const TabletColumn get_least_type_column(const TabletColumn& original, const DataTypePtr& new_type, const ExtraInfo& ext_info, bool* changed); +struct ParseContext { + // record an extract json column, used for encoding row store + bool record_raw_json_column = false; +}; // thread steps to parse and encode variant columns into flatterned columns // 1. parse variant from raw json string // 2. finalize variant column to each subcolumn least commn types, default ignore sparse sub columns // 2. encode sparse sub columns -Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& variant_pos); -Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos); +Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& variant_pos, + const ParseContext& ctx); +Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos, + const ParseContext& ctx); void finalize_variant_columns(Block& block, const std::vector<int>& variant_pos, bool ignore_sparse = true); void encode_variant_sparse_subcolumns(Block& block, const std::vector<int>& variant_pos); diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index 5b227951e42..6d4f82e845b 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -20,9 +20,17 @@ #include <rapidjson/stringbuffer.h> #include "common/status.h" +#include "vec/columns/column.h" #include "vec/columns/column_object.h" #include "vec/common/assert_cast.h" #include "vec/common/schema_util.h" +#include "vec/core/field.h" + +#ifdef __AVX2__ +#include "util/jsonb_parser_simd.h" +#else +#include "util/jsonb_parser.h" +#endif namespace doris { @@ -57,6 +65,36 @@ Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column, return Status::OK(); } +void DataTypeObjectSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, + Arena* mem_pool, int32_t col_id, + int row_num) const { + const auto& variant = assert_cast<const ColumnObject&>(column); + if (!variant.is_finalized()) { + const_cast<ColumnObject&>(variant).finalize(); + } + result.writeKey(col_id); + JsonbParser json_parser; + CHECK(variant.get_rowstore_column() != nullptr); + // use original document + const auto& data_ref = variant.get_rowstore_column()->get_data_at(row_num); + // encode as jsonb + bool succ = json_parser.parse(data_ref.data, data_ref.size); + // maybe more graceful, it is ok to check here since data could be parsed + CHECK(succ); + result.writeStartBinary(); + result.writeBinary(json_parser.getWriter().getOutput()->getBuffer(), + json_parser.getWriter().getOutput()->getSize()); + result.writeEndBinary(); +} + +void DataTypeObjectSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const { + auto& variant = assert_cast<ColumnObject&>(column); + Field field; + auto blob = static_cast<const JsonbBlobVal*>(arg); + field.assign_jsonb(blob->getBlob(), blob->getBlobLen()); + variant.insert(field); +} + } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h b/be/src/vec/data_types/serde/data_type_object_serde.h index 485ece2dbfe..b25f1dbb1b5 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.h +++ b/be/src/vec/data_types/serde/data_type_object_serde.h @@ -67,15 +67,9 @@ public: return Status::NotSupported("read_column_from_pb with type " + column.get_name()); } void write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, Arena* mem_pool, - int32_t col_id, int row_num) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "write_one_cell_to_jsonb with type " + column.get_name()); - } + int32_t col_id, int row_num) const override; - void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "read_one_cell_from_jsonb with type " + column.get_name()); - } + void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; void write_column_to_arrow(const IColumn& column, const NullMap* null_map, arrow::ArrayBuilder* array_builder, int start, diff --git a/regression-test/data/variant_p0/variant_with_rowstore.out b/regression-test/data/variant_p0/variant_with_rowstore.out new file mode 100644 index 00000000000..d7d759baad3 --- /dev/null +++ b/regression-test/data/variant_p0/variant_with_rowstore.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +-3 {"a":1,"b":1.5,"c":[1,2,3]} +-2 {"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}} +-1 {"a":1123} +0 {"a":1234,"xxxx":"kaana"} +1 {"a":1234,"xxxx":"kaana"} +2 {"a":1234,"xxxx":"kaana"} +3 {"a":1234,"xxxx":"kaana"} +4 {"a":1234,"xxxx":"kaana"} +5 {"a":1234,"xxxx":"kaana"} +6 {"a":1234,"xxxx":"kaana"} + +-- !sql -- +-3 {"a":1,"b":1.5,"c":[1,2,3]} {"a":1,"b":1.5,"c":[1,2,3]} +-2 {"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}} {"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}} +-1 {"a":1123} {"a":1123} +0 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"} +1 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"} +2 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"} +3 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"} +4 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"} +5 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"} +6 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"} + diff --git a/regression-test/suites/variant_p0/variant_with_rowstore.groovy b/regression-test/suites/variant_p0/variant_with_rowstore.groovy new file mode 100644 index 00000000000..9b9b9ebe5b9 --- /dev/null +++ b/regression-test/suites/variant_p0/variant_with_rowstore.groovy @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("regression_test_variant_rowstore", "variant_type"){ + def set_be_config = { key, value -> + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + + def table_name = "var_rs" + sql "DROP TABLE IF EXISTS ${table_name}" + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") + + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 1 + properties("replication_num" = "1", "disable_auto_compaction" = "false", "store_row_column" = "true"); + """ + sql "set experimental_enable_nereids_planner = false" + sql """insert into ${table_name} values (-3, '{"a" : 1, "b" : 1.5, "c" : [1, 2, 3]}')""" + sql """insert into ${table_name} select -2, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : "null", "e" : 7.111}}' as json_str + union all select -1, '{"a": 1123}' as json_str union all select *, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql "select * from ${table_name} order by k limit 10" + + + table_name = "multi_var_rs" + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant, + v1 variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 1 + properties("replication_num" = "1", "disable_auto_compaction" = "false", "store_row_column" = "true"); + """ + sql """insert into ${table_name} select k, cast(v as string), cast(v as string) from var_rs""" + qt_sql "select * from ${table_name} order by k limit 10" +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org