This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e6fbccd3ed879bc9f628c65763cb5e8168913813
Author: lihangyu <15605149...@163.com>
AuthorDate: Wed Jan 31 10:51:42 2024 +0800

    [Feature](Variant) support row store for variant type (#30052)
---
 be/src/exprs/json_functions.cpp                    |  2 +
 be/src/olap/rowset/segment_creator.cpp             | 15 ++++-
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  1 +
 be/src/vec/columns/column_object.h                 |  8 +++
 be/src/vec/common/schema_util.cpp                  | 43 +++++++++++++--
 be/src/vec/common/schema_util.h                    | 10 +++-
 .../data_types/serde/data_type_object_serde.cpp    | 38 +++++++++++++
 .../vec/data_types/serde/data_type_object_serde.h  | 10 +---
 .../data/variant_p0/variant_with_rowstore.out      | 25 +++++++++
 .../suites/variant_p0/variant_with_rowstore.groovy | 64 ++++++++++++++++++++++
 10 files changed, 199 insertions(+), 17 deletions(-)

diff --git a/be/src/exprs/json_functions.cpp b/be/src/exprs/json_functions.cpp
index 724c78922e8..ff432c4655a 100644
--- a/be/src/exprs/json_functions.cpp
+++ b/be/src/exprs/json_functions.cpp
@@ -330,6 +330,8 @@ void JsonFunctions::merge_objects(rapidjson::Value& 
dst_object, rapidjson::Value
     if (!src_object.IsObject()) {
         return;
     }
+    VLOG_DEBUG << "merge from src: " << print_json_value(src_object)
+               << ", to: " << print_json_value(dst_object);
     for (auto src_it = src_object.MemberBegin(); src_it != 
src_object.MemberEnd(); ++src_it) {
         auto dst_it = dst_object.FindMember(src_it->name);
         if (dst_it != dst_object.MemberEnd()) {
diff --git a/be/src/olap/rowset/segment_creator.cpp 
b/be/src/olap/rowset/segment_creator.cpp
index 19969b6b146..0fa0151b4b3 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -38,9 +38,11 @@
 #include "vec/columns/column.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_object.h"
+#include "vec/columns/column_string.h"
 #include "vec/common/assert_cast.h"
 #include "vec/common/schema_util.h" // variant column
 #include "vec/core/block.h"
+#include "vec/core/columns_with_type_and_name.h"
 
 namespace doris {
 using namespace ErrorCode;
@@ -111,8 +113,10 @@ Status 
SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
         return Status::OK();
     }
 
-    RETURN_IF_ERROR(
-            vectorized::schema_util::parse_and_encode_variant_columns(block, 
variant_column_pos));
+    vectorized::schema_util::ParseContext ctx;
+    ctx.record_raw_json_column = 
_context->original_tablet_schema->store_row_column();
+    RETURN_IF_ERROR(vectorized::schema_util::parse_and_encode_variant_columns(
+            block, variant_column_pos, ctx));
 
     // Dynamic Block consists of two parts, dynamic part of columns and static 
part of columns
     //     static     extracted
@@ -173,6 +177,13 @@ Status 
SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
         static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column(
                 {}, root->data.get_finalized_column_ptr()->assume_mutable(),
                 root->data.get_least_common_type());
+
+        // // set for rowstore
+        if (_context->original_tablet_schema->store_row_column()) {
+            
static_cast<vectorized::ColumnObject*>(obj.get())->set_rowstore_column(
+                    object_column.get_rowstore_column());
+        }
+
         vectorized::ColumnPtr result = obj->get_ptr();
         if (is_nullable) {
             const auto& null_map = assert_cast<const 
vectorized::ColumnNullable&>(*column_ref)
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index e5725e6fe6e..3c7de0adb7b 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -2366,6 +2366,7 @@ Status 
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
         }
     }
 #endif
+    VLOG_DEBUG << "dump block " << block->dump_data(0, block->rows());
 
     // reverse block row order
     if (_opts.read_orderby_key_reverse) {
diff --git a/be/src/vec/columns/column_object.h 
b/be/src/vec/columns/column_object.h
index 86f3bd19ceb..7f328992a25 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -222,6 +222,10 @@ private:
     // this structure and fill with Subcolumns sub items
     mutable std::shared_ptr<rapidjson::Document> doc_structure;
 
+    // column with raw json strings
+    // used for quickly row store encoding
+    ColumnPtr rowstore_column;
+
 public:
     static constexpr auto COLUMN_NAME_DUMMY = "_dummy";
 
@@ -241,6 +245,10 @@ public:
         return 
subcolumns.get_mutable_root()->data.get_finalized_column_ptr()->assume_mutable();
     }
 
+    void set_rowstore_column(ColumnPtr col) { rowstore_column = col; }
+
+    ColumnPtr get_rowstore_column() const { return rowstore_column; }
+
     bool serialize_one_row_to_string(int row, std::string* output) const;
 
     bool serialize_one_row_to_string(int row, BufferWritable& output) const;
diff --git a/be/src/vec/common/schema_util.cpp 
b/be/src/vec/common/schema_util.cpp
index dc6f65584f2..b943f2e637b 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -47,6 +47,7 @@
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 #include "udf/udf.h"
+#include "util/defer_op.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_array.h"
 #include "vec/columns/column_nullable.h"
@@ -393,10 +394,11 @@ Status get_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
     return Status::OK();
 }
 
-Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& 
variant_pos) {
+Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& 
variant_pos,
+                                        const ParseContext& ctx) {
     try {
         // Parse each variant column from raw string column
-        RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, 
variant_pos));
+        RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, 
variant_pos, ctx));
         vectorized::schema_util::finalize_variant_columns(block, variant_pos,
                                                           false /*not ingore 
sparse*/);
         vectorized::schema_util::encode_variant_sparse_subcolumns(block, 
variant_pos);
@@ -408,14 +410,44 @@ Status parse_and_encode_variant_columns(Block& block, 
const std::vector<int>& va
     return Status::OK();
 }
 
-Status parse_variant_columns(Block& block, const std::vector<int>& 
variant_pos) {
+Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
+                             const ParseContext& ctx) {
     for (int i = 0; i < variant_pos.size(); ++i) {
-        const auto& column_ref = block.get_by_position(variant_pos[i]).column;
+        auto column_ref = block.get_by_position(variant_pos[i]).column;
         bool is_nullable = column_ref->is_nullable();
         const auto& column = remove_nullable(column_ref);
         const auto& var = assert_cast<const ColumnObject&>(*column.get());
         var.assume_mutable_ref().finalize();
+
+        MutableColumnPtr variant_column;
+        bool record_raw_string_with_serialization = false;
+        // set
+        auto __defer = Defer([&]() {
+            if (!ctx.record_raw_json_column) {
+                return;
+            }
+            auto* var = 
static_cast<vectorized::ColumnObject*>(variant_column.get());
+            if (record_raw_string_with_serialization) {
+                // encode to raw json column
+                auto raw_column = vectorized::ColumnString::create();
+                for (size_t i = 0; i < var->rows(); ++i) {
+                    std::string raw_str;
+                    var->serialize_one_row_to_string(i, &raw_str);
+                    raw_column->insert_data(raw_str.c_str(), raw_str.size());
+                }
+                var->set_rowstore_column(raw_column->get_ptr());
+            } else {
+                // use original input json column
+                auto original_var_root = 
vectorized::check_and_get_column<vectorized::ColumnObject>(
+                                                 
remove_nullable(column_ref).get())
+                                                 ->get_root();
+                var->set_rowstore_column(original_var_root);
+            }
+        });
+
         if (!var.is_scalar_variant()) {
+            variant_column = var.assume_mutable();
+            record_raw_string_with_serialization = true;
             // already parsed
             continue;
         }
@@ -440,9 +472,10 @@ Status parse_variant_columns(Block& block, const 
std::vector<int>& variant_pos)
                             : var.get_root();
         }
 
-        MutableColumnPtr variant_column = ColumnObject::create(true);
+        variant_column = ColumnObject::create(true);
         parse_json_to_variant(*variant_column.get(),
                               assert_cast<const 
ColumnString&>(*raw_json_column));
+
         // Wrap variant with nullmap if it is nullable
         ColumnPtr result = variant_column->get_ptr();
         if (is_nullable) {
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 43972e0788c..c626b875c85 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -82,12 +82,18 @@ TabletColumn get_column_by_type(const 
vectorized::DataTypePtr& data_type, const
 TabletColumn get_least_type_column(const TabletColumn& original, const 
DataTypePtr& new_type,
                                    const ExtraInfo& ext_info, bool* changed);
 
+struct ParseContext {
+    // record an extract json column, used for encoding row store
+    bool record_raw_json_column = false;
+};
 // thread steps to parse and encode variant columns into flatterned columns
 // 1. parse variant from raw json string
 // 2. finalize variant column to each subcolumn least commn types, default 
ignore sparse sub columns
 // 2. encode sparse sub columns
-Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& 
variant_pos);
-Status parse_variant_columns(Block& block, const std::vector<int>& 
variant_pos);
+Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& 
variant_pos,
+                                        const ParseContext& ctx);
+Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
+                             const ParseContext& ctx);
 void finalize_variant_columns(Block& block, const std::vector<int>& 
variant_pos,
                               bool ignore_sparse = true);
 void encode_variant_sparse_subcolumns(Block& block, const std::vector<int>& 
variant_pos);
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp 
b/be/src/vec/data_types/serde/data_type_object_serde.cpp
index 5b227951e42..6d4f82e845b 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp
@@ -20,9 +20,17 @@
 #include <rapidjson/stringbuffer.h>
 
 #include "common/status.h"
+#include "vec/columns/column.h"
 #include "vec/columns/column_object.h"
 #include "vec/common/assert_cast.h"
 #include "vec/common/schema_util.h"
+#include "vec/core/field.h"
+
+#ifdef __AVX2__
+#include "util/jsonb_parser_simd.h"
+#else
+#include "util/jsonb_parser.h"
+#endif
 
 namespace doris {
 
@@ -57,6 +65,36 @@ Status DataTypeObjectSerDe::write_column_to_mysql(const 
IColumn& column,
     return Status::OK();
 }
 
+void DataTypeObjectSerDe::write_one_cell_to_jsonb(const IColumn& column, 
JsonbWriter& result,
+                                                  Arena* mem_pool, int32_t 
col_id,
+                                                  int row_num) const {
+    const auto& variant = assert_cast<const ColumnObject&>(column);
+    if (!variant.is_finalized()) {
+        const_cast<ColumnObject&>(variant).finalize();
+    }
+    result.writeKey(col_id);
+    JsonbParser json_parser;
+    CHECK(variant.get_rowstore_column() != nullptr);
+    // use original document
+    const auto& data_ref = variant.get_rowstore_column()->get_data_at(row_num);
+    // encode as jsonb
+    bool succ = json_parser.parse(data_ref.data, data_ref.size);
+    // maybe more graceful, it is ok to check here since data could be parsed
+    CHECK(succ);
+    result.writeStartBinary();
+    result.writeBinary(json_parser.getWriter().getOutput()->getBuffer(),
+                       json_parser.getWriter().getOutput()->getSize());
+    result.writeEndBinary();
+}
+
+void DataTypeObjectSerDe::read_one_cell_from_jsonb(IColumn& column, const 
JsonbValue* arg) const {
+    auto& variant = assert_cast<ColumnObject&>(column);
+    Field field;
+    auto blob = static_cast<const JsonbBlobVal*>(arg);
+    field.assign_jsonb(blob->getBlob(), blob->getBlobLen());
+    variant.insert(field);
+}
+
 } // namespace vectorized
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h 
b/be/src/vec/data_types/serde/data_type_object_serde.h
index 485ece2dbfe..b25f1dbb1b5 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.h
+++ b/be/src/vec/data_types/serde/data_type_object_serde.h
@@ -67,15 +67,9 @@ public:
         return Status::NotSupported("read_column_from_pb with type " + 
column.get_name());
     }
     void write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, 
Arena* mem_pool,
-                                 int32_t col_id, int row_num) const override {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
-                               "write_one_cell_to_jsonb with type " + 
column.get_name());
-    }
+                                 int32_t col_id, int row_num) const override;
 
-    void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
-                               "read_one_cell_from_jsonb with type " + 
column.get_name());
-    }
+    void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) 
const override;
 
     void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
                                arrow::ArrayBuilder* array_builder, int start,
diff --git a/regression-test/data/variant_p0/variant_with_rowstore.out 
b/regression-test/data/variant_p0/variant_with_rowstore.out
new file mode 100644
index 00000000000..d7d759baad3
--- /dev/null
+++ b/regression-test/data/variant_p0/variant_with_rowstore.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+-3     {"a":1,"b":1.5,"c":[1,2,3]}
+-2     {"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}}
+-1     {"a":1123}
+0      {"a":1234,"xxxx":"kaana"}
+1      {"a":1234,"xxxx":"kaana"}
+2      {"a":1234,"xxxx":"kaana"}
+3      {"a":1234,"xxxx":"kaana"}
+4      {"a":1234,"xxxx":"kaana"}
+5      {"a":1234,"xxxx":"kaana"}
+6      {"a":1234,"xxxx":"kaana"}
+
+-- !sql --
+-3     {"a":1,"b":1.5,"c":[1,2,3]}     {"a":1,"b":1.5,"c":[1,2,3]}
+-2     {"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}}       
{"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}}
+-1     {"a":1123}      {"a":1123}
+0      {"a":1234,"xxxx":"kaana"}       {"a":1234,"xxxx":"kaana"}
+1      {"a":1234,"xxxx":"kaana"}       {"a":1234,"xxxx":"kaana"}
+2      {"a":1234,"xxxx":"kaana"}       {"a":1234,"xxxx":"kaana"}
+3      {"a":1234,"xxxx":"kaana"}       {"a":1234,"xxxx":"kaana"}
+4      {"a":1234,"xxxx":"kaana"}       {"a":1234,"xxxx":"kaana"}
+5      {"a":1234,"xxxx":"kaana"}       {"a":1234,"xxxx":"kaana"}
+6      {"a":1234,"xxxx":"kaana"}       {"a":1234,"xxxx":"kaana"}
+
diff --git a/regression-test/suites/variant_p0/variant_with_rowstore.groovy 
b/regression-test/suites/variant_p0/variant_with_rowstore.groovy
new file mode 100644
index 00000000000..9b9b9ebe5b9
--- /dev/null
+++ b/regression-test/suites/variant_p0/variant_with_rowstore.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("regression_test_variant_rowstore", "variant_type"){
+    def set_be_config = { key, value ->
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+        logger.info("update config: code=" + code + ", out=" + out + ", err=" 
+ err)
+    }
+ 
+    def table_name = "var_rs"
+    sql "DROP TABLE IF EXISTS ${table_name}"
+    set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95")
+
+    sql """
+            CREATE TABLE IF NOT EXISTS ${table_name} (
+                k bigint,
+                v variant
+            )
+            DUPLICATE KEY(`k`)
+            DISTRIBUTED BY HASH(k) BUCKETS 1
+            properties("replication_num" = "1", "disable_auto_compaction" = 
"false", "store_row_column" = "true");
+        """
+    sql "set experimental_enable_nereids_planner = false"
+    sql """insert into ${table_name} values (-3, '{"a" : 1, "b" : 1.5, "c" : 
[1, 2, 3]}')"""
+    sql """insert into  ${table_name} select -2, '{"a": 11245, "b" : [123, 
{"xx" : 1}], "c" : {"c" : 456, "d" : "null", "e" : 7.111}}'  as json_str
+            union  all select -1, '{"a": 1123}' as json_str union all select 
*, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") 
limit 4096 ;"""
+    qt_sql "select * from ${table_name} order by k limit 10"
+
+
+    table_name = "multi_var_rs"
+    sql "DROP TABLE IF EXISTS ${table_name}"
+    sql """
+            CREATE TABLE IF NOT EXISTS ${table_name} (
+                k bigint,
+                v variant,
+                v1 variant
+            )
+            DUPLICATE KEY(`k`)
+            DISTRIBUTED BY HASH(k) BUCKETS 1
+            properties("replication_num" = "1", "disable_auto_compaction" = 
"false", "store_row_column" = "true");
+    """
+    sql """insert into ${table_name} select k, cast(v as string), cast(v as 
string) from var_rs"""
+    qt_sql "select * from ${table_name} order by k limit 10"
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to