This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch variant-sparse in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push: new 553db1844da [fix] (variant) add serialize and deserialize (#45487) 553db1844da is described below commit 553db1844da808caf4de3609a44b7b9d6bd87fc5 Author: Sun Chenyang <suncheny...@selectdb.com> AuthorDate: Mon Dec 16 21:37:36 2024 +0800 [fix] (variant) add serialize and deserialize (#45487) --- .../rowset/segment_v2/hierarchical_data_reader.cpp | 2 +- .../rowset/segment_v2/hierarchical_data_reader.h | 4 +- be/src/vec/columns/column_object.cpp | 367 +++++++++++++-------- be/src/vec/columns/column_object.h | 38 +-- .../vec/data_types/serde/data_type_array_serde.cpp | 18 + .../vec/data_types/serde/data_type_array_serde.h | 3 + .../vec/data_types/serde/data_type_jsonb_serde.cpp | 12 + .../vec/data_types/serde/data_type_jsonb_serde.h | 3 + .../data_types/serde/data_type_nullable_serde.cpp | 15 + .../data_types/serde/data_type_nullable_serde.h | 3 + .../data_types/serde/data_type_number_serde.cpp | 10 + .../vec/data_types/serde/data_type_number_serde.h | 3 + be/src/vec/data_types/serde/data_type_serde.h | 2 +- .../vec/data_types/serde/data_type_string_serde.h | 12 + 14 files changed, 331 insertions(+), 161 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp index db6bac6b8b4..c85e4b429ad 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp @@ -206,7 +206,7 @@ Status ExtractReader::extract_to(vectorized::MutableColumnPtr& dst, size_t nrows ""}, expected_type, &cast_column)); variant.get_root()->insert_range_from(*cast_column, 0, nrows); - variant.set_num_rows(variant.get_root()->size()); + // variant.set_num_rows(variant.get_root()->size()); } if (dst->is_nullable()) { // fill nullmap diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h index f85038713ca..6c8ced89cd2 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h @@ -240,10 +240,10 @@ private: src_null_map.clear(); assert_cast<ColumnObject&>( assert_cast<ColumnNullable&>(*_root_reader->column).get_nested_column()) - .clear_subcolumns_data(); + .clear_column_data(); } else { ColumnObject& root_column = assert_cast<ColumnObject&>(*_root_reader->column); - root_column.clear_subcolumns_data(); + root_column.clear_column_data(); } } else { if (dst->is_nullable()) { diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index c1a50f6064b..9d6e260724b 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -616,18 +616,20 @@ bool ColumnObject::Subcolumn::is_finalized() const { } template <typename Func> -MutableColumnPtr ColumnObject::apply_for_subcolumns(Func&& func) const { +MutableColumnPtr ColumnObject::apply_for_columns(Func&& func) const { if (!is_finalized()) { auto finalized = clone_finalized(); auto& finalized_object = assert_cast<ColumnObject&>(*finalized); - return finalized_object.apply_for_subcolumns(std::forward<Func>(func)); + return finalized_object.apply_for_columns(std::forward<Func>(func)); } auto res = ColumnObject::create(is_nullable, false); for (const auto& subcolumn : subcolumns) { - auto new_subcolumn = func(subcolumn->data.get_finalized_column()); + auto new_subcolumn = func(subcolumn->data.get_finalized_column_ptr()); res->add_sub_column(subcolumn->path, new_subcolumn->assume_mutable(), subcolumn->data.get_least_common_type()); } + auto sparse_column = func(serialized_sparse_column); + res->serialized_sparse_column = sparse_column->assume_mutable(); check_consistency(); return res; } @@ -642,6 +644,7 @@ void ColumnObject::resize(size_t n) { for (auto& subcolumn : subcolumns) { subcolumn->data.pop_back(num_rows - n); } + serialized_sparse_column->pop_back(num_rows - n); } num_rows = n; } @@ -809,8 +812,13 @@ ColumnObject::ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_) check_consistency(); } +ColumnObject::ColumnObject(size_t num_rows) : is_nullable(true) { + insert_many_defaults(num_rows); + check_consistency(); +} + void ColumnObject::check_consistency() const { - if (subcolumns.empty()) { + if (subcolumns.empty() && serialized_sparse_column->empty()) { return; } for (const auto& leaf : subcolumns) { @@ -820,6 +828,11 @@ void ColumnObject::check_consistency() const { leaf->path.get_path(), num_rows, leaf->data.size()); } } + if (num_rows != serialized_sparse_column->size()) { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "unmatched sparse column:, expeted rows: {}, but meet: {}", num_rows, + serialized_sparse_column->size()); + } } size_t ColumnObject::size() const { @@ -835,13 +848,11 @@ MutableColumnPtr ColumnObject::clone_resized(size_t new_size) const { } // If subcolumns are empty, then res will be empty but new_size > 0 if (subcolumns.empty()) { - // Add an emtpy column with new_size rows - auto res = ColumnObject::create(true, false); - res->set_num_rows(new_size); + auto res = ColumnObject::create(new_size); return res; } - auto res = apply_for_subcolumns( - [&](const auto& subcolumn) { return subcolumn.clone_resized(new_size); }); + auto res = apply_for_columns( + [&](const ColumnPtr column) { return column->clone_resized(new_size); }); return res; } @@ -850,6 +861,7 @@ size_t ColumnObject::byte_size() const { for (const auto& entry : subcolumns) { res += entry->data.byteSize(); } + res += serialized_sparse_column->byte_size(); return res; } @@ -858,6 +870,7 @@ size_t ColumnObject::allocated_bytes() const { for (const auto& entry : subcolumns) { res += entry->data.allocatedBytes(); } + res += serialized_sparse_column->allocated_bytes(); return res; } @@ -940,6 +953,7 @@ void ColumnObject::insert_default() { for (auto& entry : subcolumns) { entry->data.insert_default(); } + serialized_sparse_column->insert_default(); ++num_rows; } @@ -1000,16 +1014,18 @@ void ColumnObject::Subcolumn::serialize_to_sparse_column(ColumnString* key, std: // remove default row -= num_of_defaults_in_prefix; - is_null = false; for (size_t i = 0; i < data.size(); ++i) { const auto& part = data[i]; if (row < part->size()) { - // insert key - key->insert_data(path.data(), path.size()); - // insert value - const auto& part_type = data_types[i]; - const auto& serde = part_type->get_serde(); - serde->write_one_cell_to_binary(*part, value, row); + if (assert_cast<const ColumnNullable&>(*part).is_null_at(row)) { + is_null = true; + } else { + is_null = false; + // insert key + key->insert_data(path.data(), path.size()); + // insert value + data_types[i]->get_serde()->write_one_cell_to_binary(*part, value, row); + } return; } @@ -1020,6 +1036,110 @@ void ColumnObject::Subcolumn::serialize_to_sparse_column(ColumnString* key, std: "Index ({}) for serialize to sparse column is out of range", row); } +const char* parse_binary_from_sparse_column(TypeIndex type, const char* data, Field& res, + FieldInfo& info_res) { + const char* end = data; + switch (type) { + case TypeIndex::String: { + const size_t size = *reinterpret_cast<const size_t*>(data); + data += sizeof(size_t); + res = Field(String(data, size)); + end = data + size; + break; + } + case TypeIndex::Int8: { + res = *reinterpret_cast<const Int8*>(data); + end = data + sizeof(Int8); + break; + } + case TypeIndex::Int16: { + res = *reinterpret_cast<const Int16*>(data); + end = data + sizeof(Int16); + break; + } + case TypeIndex::Int32: { + res = *reinterpret_cast<const Int32*>(data); + end = data + sizeof(Int32); + break; + } + case TypeIndex::Int64: { + res = *reinterpret_cast<const Int64*>(data); + end = data + sizeof(Int64); + break; + } + case TypeIndex::Float32: { + res = *reinterpret_cast<const Float32*>(data); + end = data + sizeof(Float32); + break; + } + case TypeIndex::Float64: { + res = *reinterpret_cast<const Float64*>(data); + end = data + sizeof(Float64); + break; + } + case TypeIndex::JSONB: { + size_t size = *reinterpret_cast<const size_t*>(data); + data += sizeof(size_t); + res = JsonbField(data, size); + end = data + size; + break; + } + case TypeIndex::Array: { + const size_t size = *reinterpret_cast<const size_t*>(data); + data += sizeof(size_t); + res = Array(size); + vectorized::Array& array = res.get<Array>(); + info_res.num_dimensions++; + for (size_t i = 0; i < size; ++i) { + const uint8_t is_null = *reinterpret_cast<const uint8_t*>(data++); + if (is_null) { + array.emplace_back(Null()); + continue; + } + Field nested_field; + const TypeIndex nested_type = + assert_cast<const TypeIndex>(*reinterpret_cast<const uint8_t*>(data++)); + data = parse_binary_from_sparse_column(nested_type, data, nested_field, info_res); + array.emplace_back(std::move(nested_field)); + } + end = data; + break; + } + default: + throw doris::Exception(ErrorCode::OUT_OF_BOUND, + "Type ({}) for deserialize_from_sparse_column is invalid", type); + } + return end; +} + +std::pair<Field, FieldInfo> ColumnObject::deserialize_from_sparse_column(const ColumnString* value, + size_t row) const { + const auto& data_ref = value->get_data_at(row); + const char* data = data_ref.data; + DCHECK(data_ref.size > 0); + + FieldInfo info_res = { + .scalar_type_id = TypeIndex::Nothing, + .have_nulls = false, + .need_convert = false, + .num_dimensions = 1, + }; + // 0 is null + const uint8_t is_null = *reinterpret_cast<const uint8_t*>(data++); + if (is_null) { + DCHECK(data_ref.size == 1); + return {Null(), info_res}; + } + + DCHECK(data_ref.size > 1); + const TypeIndex type = assert_cast<const TypeIndex>(*reinterpret_cast<const uint8_t*>(data++)); + info_res.scalar_type_id = type; + Field res; + const char* end = parse_binary_from_sparse_column(type, data, res, info_res); + DCHECK_EQ(end - data_ref.data, data_ref.size); + return {std::move(res), std::move(info_res)}; +} + Field ColumnObject::operator[](size_t n) const { Field object; get(n, object); @@ -1043,6 +1163,18 @@ void ColumnObject::get(size_t n, Field& res) const { object.try_emplace(entry->path.get_path(), field); } } + + const auto& [path, value] = get_sparse_data_paths_and_values(); + auto& sparse_column_offsets = serialized_sparse_column_offsets(); + size_t offset = sparse_column_offsets[n - 1]; + size_t end = sparse_column_offsets[n]; + // Iterator over [path, binary value] + for (size_t i = offset; i != end; ++i) { + const StringRef path_data = path->get_data_at(i); + const auto& data = deserialize_from_sparse_column(value, i); + object.try_emplace(std::string(path_data.data, path_data.size), data.first); + } + if (object.empty()) { res = Null(); } @@ -1159,7 +1291,7 @@ void ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column( const auto& src_serialized_sparse_column_offsets = src.serialized_sparse_column_offsets(); if (src_serialized_sparse_column_offsets[start - 1] == src_serialized_sparse_column_offsets[start + length - 1]) { - size_t current_size = size(); + size_t current_size = num_rows; /// If no src subcolumns should be inserted into sparse column, insert defaults. if (sorted_src_subcolumn_for_sparse_column.empty()) { @@ -1228,7 +1360,8 @@ void ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column( const PathInData column_path(src_sparse_path); if (auto* subcolumn = get_subcolumn(column_path); subcolumn != nullptr) { // Deserialize binary value into subcolumn from src serialized sparse column data. - subcolumn->deserialize_from_sparse_column(src_sparse_column_values, i); + const auto& data = src.deserialize_from_sparse_column(src_sparse_column_values, i); + subcolumn->insert(data.first, data.second); } else { // Before inserting this path into sparse column check if we need to // insert suibcolumns from sorted_src_subcolumn_for_sparse_column before. @@ -1284,16 +1417,6 @@ void ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column( return; } -ColumnPtr ColumnObject::replicate(const Offsets& offsets) const { - if (subcolumns.empty()) { - // Add an emtpy column with offsets.back rows - auto res = ColumnObject::create(true, false); - res->set_num_rows(offsets.back()); - } - return apply_for_subcolumns( - [&](const auto& subcolumn) { return subcolumn.replicate(offsets); }); -} - ColumnPtr ColumnObject::permute(const Permutation& perm, size_t limit) const { if (subcolumns.empty()) { if (limit == 0) { @@ -1306,19 +1429,17 @@ ColumnPtr ColumnObject::permute(const Permutation& perm, size_t limit) const { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Size of permutation is less than required."); } - // Add an emtpy column with limit rows - auto res = ColumnObject::create(true, false); - res->set_num_rows(limit); + auto res = ColumnObject::create(limit); return res; } - return apply_for_subcolumns( - [&](const auto& subcolumn) { return subcolumn.permute(perm, limit); }); + return apply_for_columns([&](const ColumnPtr column) { return column->permute(perm, limit); }); } void ColumnObject::pop_back(size_t length) { for (auto& entry : subcolumns) { entry->data.pop_back(length); } + serialized_sparse_column->pop_back(length); num_rows -= length; } @@ -1441,15 +1562,6 @@ bool ColumnObject::add_sub_column(const PathInData& key, size_t new_size) { return true; } -PathsInData ColumnObject::getKeys() const { - PathsInData keys; - keys.reserve(subcolumns.size()); - for (const auto& entry : subcolumns) { - keys.emplace_back(entry->path); - } - return keys; -} - bool ColumnObject::is_finalized() const { return std::all_of(subcolumns.begin(), subcolumns.end(), [](const auto& entry) { return entry->data.is_finalized(); }); @@ -1704,8 +1816,6 @@ Status ColumnObject::serialize_sparse_columns( std::map<std::string_view, Subcolumn>&& remaing_subcolumns) { CHECK(is_finalized()); - serialized_sparse_column = ColumnMap::create(ColumnString::create(), ColumnString::create(), - ColumnArray::ColumnOffsets::create()); if (remaing_subcolumns.empty()) { serialized_sparse_column->insert_many_defaults(num_rows); return Status::OK(); @@ -1784,39 +1894,6 @@ Status ColumnObject::finalize(FinalizeMode mode) { new_subcolumns.get_mutable_root()->data.finalize(mode); } - // pick sparse columns - std::set<std::string_view> selected_path; - std::vector<std::string_view> remaining_path; - if (subcolumns.size() > MAX_SUBCOLUMNS) { - // pick subcolumns sort by size of none null values - std::unordered_map<std::string_view, size_t> none_null_value_sizes; - // 1. get the none null value sizes - for (auto&& entry : subcolumns) { - if (entry->data.is_root) { - continue; - } - size_t size = entry->data.get_non_null_value_size(); - none_null_value_sizes[entry->path.get_path()] = size; - } - // 2. sort by the size - std::vector<std::pair<std::string_view, size_t>> sorted_by_size( - none_null_value_sizes.begin(), none_null_value_sizes.end()); - std::sort(sorted_by_size.begin(), sorted_by_size.end(), - [](const auto& a, const auto& b) { return a.second > b.second; }); - - // 3. pick MAX_SUBCOLUMNS selected subcolumns - for (size_t i = 0; i < std::min(MAX_SUBCOLUMNS, sorted_by_size.size()); ++i) { - selected_path.insert(sorted_by_size[i].first); - } - - // 4. put remaining subcolumns to remaining_subcolumns - for (const auto& entry : sorted_by_size) { - if (selected_path.find(entry.first) == selected_path.end()) { - remaining_path.emplace_back(entry.first); - } - } - } - // finalize all subcolumns for (auto&& entry : subcolumns) { const auto& least_common_type = entry->data.get_least_common_type(); @@ -1840,24 +1917,57 @@ Status ColumnObject::finalize(FinalizeMode mode) { } } - // add selected subcolumns to new_subcolumns - for (auto&& entry : subcolumns) { - if (selected_path.find(entry->path.get_path()) != selected_path.end()) { - new_subcolumns.add(entry->path, entry->data); + // merge and encode sparse column + if (mode == FinalizeMode::WRITE_MODE) { + // pick sparse columns + std::set<std::string_view> selected_path; + std::vector<std::string_view> remaining_path; + if (subcolumns.size() > MAX_SUBCOLUMNS) { + // pick subcolumns sort by size of none null values + std::unordered_map<std::string_view, size_t> none_null_value_sizes; + // 1. get the none null value sizes + for (auto&& entry : subcolumns) { + if (entry->data.is_root) { + continue; + } + size_t size = entry->data.get_non_null_value_size(); + none_null_value_sizes[entry->path.get_path()] = size; + } + // 2. sort by the size + std::vector<std::pair<std::string_view, size_t>> sorted_by_size( + none_null_value_sizes.begin(), none_null_value_sizes.end()); + std::sort(sorted_by_size.begin(), sorted_by_size.end(), + [](const auto& a, const auto& b) { return a.second > b.second; }); + + // 3. pick MAX_SUBCOLUMNS selected subcolumns + for (size_t i = 0; i < std::min(MAX_SUBCOLUMNS, sorted_by_size.size()); ++i) { + selected_path.insert(sorted_by_size[i].first); + } + + // 4. put remaining subcolumns to remaining_subcolumns + for (const auto& entry : sorted_by_size) { + if (selected_path.find(entry.first) == selected_path.end()) { + remaining_path.emplace_back(entry.first); + } + } + } + // add selected subcolumns to new_subcolumns + for (auto&& entry : subcolumns) { + if (selected_path.find(entry->path.get_path()) != selected_path.end()) { + new_subcolumns.add(entry->path, entry->data); + } } - } - std::map<std::string_view, Subcolumn> remaing_subcolumns; - // merge remaining subcolumns to sparse_column - for (auto&& entry : subcolumns) { - if (selected_path.find(entry->path.get_path()) != selected_path.end()) { - remaing_subcolumns.emplace(entry->path.get_path(), entry->data); + std::map<std::string_view, Subcolumn> remaing_subcolumns; + // merge remaining subcolumns to sparse_column + for (auto&& entry : subcolumns) { + if (selected_path.find(entry->path.get_path()) != selected_path.end()) { + remaing_subcolumns.emplace(entry->path.get_path(), entry->data); + } } + RETURN_IF_ERROR(serialize_sparse_columns(std::move(remaing_subcolumns))); } - // merge and encode sparse column - RETURN_IF_ERROR(serialize_sparse_columns(std::move(remaing_subcolumns))); - std::swap(subcolumns, new_subcolumns); doc_structure = nullptr; _prev_positions.clear(); @@ -1894,6 +2004,7 @@ ColumnPtr get_base_column_of_array(const ColumnPtr& column) { return column; } +// ---- ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const { if (!is_finalized()) { auto finalized = clone_finalized(); @@ -1901,9 +2012,7 @@ ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const { return finalized_object.filter(filter, count); } if (subcolumns.empty()) { - // Add an emtpy column with filtered rows - auto res = ColumnObject::create(true, false); - res->set_num_rows(count_bytes_in_filter(filter)); + auto res = ColumnObject::create(count_bytes_in_filter(filter)); return res; } auto new_column = ColumnObject::create(true, false); @@ -1912,35 +2021,17 @@ ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const { new_column->add_sub_column(entry->path, subcolumn->assume_mutable(), entry->data.get_least_common_type()); } + // filter return new_column; } -Status ColumnObject::filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) { - if (!is_finalized()) { - finalize(); - } - if (subcolumns.empty()) { - assert_cast<ColumnObject*>(col_ptr)->insert_many_defaults(sel_size); - return Status::OK(); - } - auto* res = assert_cast<ColumnObject*>(col_ptr); - for (const auto& subcolumn : subcolumns) { - auto new_subcolumn = subcolumn->data.get_least_common_type()->create_column(); - RETURN_IF_ERROR(subcolumn->data.get_finalized_column().filter_by_selector( - sel, sel_size, new_subcolumn.get())); - res->add_sub_column(subcolumn->path, new_subcolumn->assume_mutable(), - subcolumn->data.get_least_common_type()); - } - return Status::OK(); -} - size_t ColumnObject::filter(const Filter& filter) { if (!is_finalized()) { finalize(); } size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); if (count == 0) { - for_each_subcolumn([](auto& part) { part->clear(); }); + clear(); } else { for_each_subcolumn([&](auto& part) { if (part->size() != count) { @@ -1958,6 +2049,14 @@ size_t ColumnObject::filter(const Filter& filter) { } } }); + const auto result_size = serialized_sparse_column->filter(filter); + if (result_size != count) { + throw Exception(ErrorCode::INTERNAL_ERROR, + "result_size not euqal with filter_size, result_size={}, " + "filter_size={}", + result_size, count); + } + CHECK_EQ(result_size, count); } num_rows = count; #ifndef NDEBUG @@ -1966,7 +2065,7 @@ size_t ColumnObject::filter(const Filter& filter) { return count; } -void ColumnObject::clear_subcolumns_data() { +void ColumnObject::clear_column_data() { for (auto& entry : subcolumns) { for (auto& part : entry->data.data) { DCHECK_EQ(part->use_count(), 1); @@ -1974,12 +2073,14 @@ void ColumnObject::clear_subcolumns_data() { } entry->data.num_of_defaults_in_prefix = 0; } + serialized_sparse_column->clear(); num_rows = 0; } void ColumnObject::clear() { Subcolumns empty; std::swap(empty, subcolumns); + serialized_sparse_column->clear(); num_rows = 0; _prev_positions.clear(); } @@ -2063,61 +2164,53 @@ void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t* indic } } -void ColumnObject::for_each_imutable_subcolumn(ImutableColumnCallback callback) const { +template <typename Func> +void ColumnObject::for_each_imutable_column(Func&& callback) const { if (!is_finalized()) { auto finalized = clone_finalized(); auto& finalized_object = assert_cast<ColumnObject&>(*finalized); - finalized_object.for_each_imutable_subcolumn(callback); + finalized_object.for_each_imutable_column(callback); return; } for (const auto& entry : subcolumns) { for (auto& part : entry->data.data) { - callback(*part); + callback(part); } } -} - -bool ColumnObject::is_exclusive() const { - bool is_exclusive = IColumn::is_exclusive(); - for_each_imutable_subcolumn([&](const auto& subcolumn) { - if (!subcolumn.is_exclusive()) { - is_exclusive = false; - } - }); - return is_exclusive; + callback(serialized_sparse_column); } void ColumnObject::update_hash_with_value(size_t n, SipHash& hash) const { - for_each_imutable_subcolumn( - [&](const auto& subcolumn) { return subcolumn.update_hash_with_value(n, hash); }); + for_each_imutable_column( + [&](const ColumnPtr column) { return column->update_hash_with_value(n, hash); }); } void ColumnObject::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { - for_each_imutable_subcolumn([&](const auto& subcolumn) { - return subcolumn.update_hashes_with_value(hashes, nullptr); + for_each_imutable_column([&](const ColumnPtr column) { + return column->update_hashes_with_value(hashes, nullptr); }); } void ColumnObject::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, const uint8_t* __restrict null_data) const { - for_each_imutable_subcolumn([&](const auto& subcolumn) { - return subcolumn.update_xxHash_with_value(start, end, hash, nullptr); + for_each_imutable_column([&](const ColumnPtr column) { + return column->update_xxHash_with_value(start, end, hash, nullptr); }); } void ColumnObject::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType type, uint32_t rows, uint32_t offset, const uint8_t* __restrict null_data) const { - for_each_imutable_subcolumn([&](const auto& subcolumn) { - return subcolumn.update_crcs_with_value(hash, type, rows, offset, nullptr); + for_each_imutable_column([&](const ColumnPtr column) { + return column->update_crcs_with_value(hash, type, rows, offset, nullptr); }); } void ColumnObject::update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const { - for_each_imutable_subcolumn([&](const auto& subcolumn) { - return subcolumn.update_crc_with_value(start, end, hash, nullptr); + for_each_imutable_column([&](const ColumnPtr column) { + return column->update_crc_with_value(start, end, hash, nullptr); }); } diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 38ed5478f02..72cc783caf8 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -98,7 +98,7 @@ public: constexpr static TypeIndex MOST_COMMON_TYPE_ID = TypeIndex::JSONB; // Nullable(Array(Nullable(Object))) const static DataTypePtr NESTED_TYPE; - const size_t MAX_SUBCOLUMNS = 200; + const size_t MAX_SUBCOLUMNS = 3; // Finlize mode for subcolumns, write mode will estimate which subcolumns are sparse columns(too many null values inside column), // merge and encode them into a shared column in root column. Only affects in flush block to segments. // Otherwise read mode should be as default mode. @@ -185,9 +185,6 @@ public: void serialize_to_sparse_column(ColumnString* key, std::string_view path, ColumnString* value, size_t row, bool& is_null); - // Deserialize the i-th row of the column from the sparse column. - void deserialize_from_sparse_column(const ColumnString* value, size_t row) {} - friend class ColumnObject; private: @@ -263,7 +260,8 @@ private: // It's filled when the number of subcolumns reaches the limit. // It has type Map(String, String) and stores a map (path, binary serialized subcolumn value) for each row. - WrappedPtr serialized_sparse_column; + WrappedPtr serialized_sparse_column = ColumnMap::create( + ColumnString::create(), ColumnString::create(), ColumnArray::ColumnOffsets::create()); public: static constexpr auto COLUMN_NAME_DUMMY = "_dummy"; @@ -272,6 +270,9 @@ public: explicit ColumnObject(bool is_nullable_, DataTypePtr type, MutableColumnPtr&& column); + // create without root, num_rows = size + explicit ColumnObject(size_t size); + ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_); ~ColumnObject() override = default; @@ -316,8 +317,6 @@ public: // Only single scalar root column bool is_scalar_variant() const; - bool is_exclusive() const override; - ColumnPtr get_root() const { return subcolumns.get_root()->data.get_finalized_column_ptr(); } bool has_subcolumn(const PathInData& key) const; @@ -365,8 +364,6 @@ public: return serialized_sparse_column->convert_to_full_column_if_const(); } - PathsInData getKeys() const; - // use sparse_subcolumns_schema to record sparse column's path info and type Status finalize(FinalizeMode mode); @@ -385,7 +382,7 @@ public: void resize(size_t n) override; - void clear_subcolumns_data(); + void clear_column_data(); std::string get_name() const override { if (is_scalar_variant()) { @@ -416,8 +413,6 @@ public: void insert_default() override; - ColumnPtr replicate(const Offsets& offsets) const override; - void pop_back(size_t length) override; Field operator[](size_t n) const override; @@ -428,8 +423,6 @@ public: ColumnPtr filter(const Filter&, ssize_t) const override; - Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override; - size_t filter(const Filter&) override; ColumnPtr permute(const Permutation&, size_t) const override; @@ -437,7 +430,7 @@ public: bool is_variable_length() const override { return true; } template <typename Func> - MutableColumnPtr apply_for_subcolumns(Func&& func) const; + MutableColumnPtr apply_for_columns(Func&& func) const; // Extract path from root column and output to dst Status extract_root(const PathInData& path, MutableColumnPtr& dst) const; @@ -462,6 +455,10 @@ public: void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + ColumnPtr replicate(const Offsets& offsets) const override { + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "replicate" + get_name()); + } + Int64 get_int(size_t /*n*/) const override { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "get_int" + get_name()); } @@ -529,10 +526,6 @@ public: "deserialize_vec_with_null_map" + get_name()); } - Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) const { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "filter_by_selector" + get_name()); - } - bool structure_equals(const IColumn&) const override { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "structure_equals" + get_name()); } @@ -579,6 +572,10 @@ public: return {&key, &value}; } + // Deserialize the i-th row of the column from the sparse column. + std::pair<Field, FieldInfo> deserialize_from_sparse_column(const ColumnString* value, + size_t row) const; + private: // May throw execption void try_insert(const Field& field); @@ -586,7 +583,8 @@ private: /// It's used to get shared sized of Nested to insert correct default values. const Subcolumns::Node* get_leaf_of_the_same_nested(const Subcolumns::NodePtr& entry) const; - void for_each_imutable_subcolumn(ImutableColumnCallback callback) const; + template <typename Func> + void for_each_imutable_column(Func&& callback) const; // return null if not found const Subcolumn* get_subcolumn_with_cache(const PathInData& key, size_t index_hint) const; diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_array_serde.cpp index a56eb00dbdd..1b812781805 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp @@ -434,5 +434,23 @@ Status DataTypeArraySerDe::read_column_from_pb(IColumn& column, const PValues& a } return Status::OK(); } + +void DataTypeArraySerDe::write_one_cell_to_binary(const IColumn& src_column, + ColumnString* dst_column, int64_t row_num) const { + const uint8_t type = static_cast<uint8_t>(TypeIndex::Array); + dst_column->insert_data(reinterpret_cast<const char*>(&type), sizeof(uint8_t)); + + const auto& array_col = assert_cast<const ColumnArray&>(src_column); + const IColumn& nested_column = array_col.get_data(); + const auto& offsets = array_col.get_offsets(); + size_t start = offsets[row_num - 1]; + size_t end = offsets[row_num]; + size_t size = end - start; + dst_column->insert_data(reinterpret_cast<const char*>(&size), sizeof(size_t)); + for (size_t offset = start; offset != end; ++offset) { + nested_serde->write_one_cell_to_binary(nested_column, dst_column, offset); + } +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h b/be/src/vec/data_types/serde/data_type_array_serde.h index 5b15f48f502..aaf1a425512 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.h +++ b/be/src/vec/data_types/serde/data_type_array_serde.h @@ -101,6 +101,9 @@ public: nested_serde->set_return_object_as_string(value); } + void write_one_cell_to_binary(const IColumn& src_column, ColumnString* dst_column, + int64_t row_num) const override; + private: template <bool is_binary_format> Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result, diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp index 10218e4164d..f56bccc298c 100644 --- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp @@ -277,5 +277,17 @@ Status DataTypeJsonbSerDe::read_column_from_pb(IColumn& column, const PValues& a } return Status::OK(); } + +void DataTypeJsonbSerDe::write_one_cell_to_binary(const IColumn& src_column, + ColumnString* dst_column, int64_t row_num) const { + const uint8_t type = static_cast<uint8_t>(TypeIndex::JSONB); + const auto& col = assert_cast<const ColumnString&>(src_column); + const auto& data_ref = col.get_data_at(row_num); + const size_t size = data_ref.size; + + dst_column->insert_data(reinterpret_cast<const char*>(&type), sizeof(uint8_t)); + dst_column->insert_data(reinterpret_cast<const char*>(&size), sizeof(size_t)); + dst_column->insert_data(data_ref.data, size); +} } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.h b/be/src/vec/data_types/serde/data_type_jsonb_serde.h index 5080b1ba46e..d6d29cce556 100644 --- a/be/src/vec/data_types/serde/data_type_jsonb_serde.h +++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.h @@ -71,6 +71,9 @@ public: int64_t end) const override; Status read_column_from_pb(IColumn& column, const PValues& arg) const override; + void write_one_cell_to_binary(const IColumn& src_column, ColumnString* dst_column, + int64_t row_num) const override; + private: template <bool is_binary_format> Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result, diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp index f21f160fb0a..d45b39c6d63 100644 --- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp @@ -393,5 +393,20 @@ Status DataTypeNullableSerDe::read_one_cell_from_json(IColumn& column, return Status::OK(); } +void DataTypeNullableSerDe::write_one_cell_to_binary(const IColumn& src_column, + ColumnString* dst_column, + int64_t row_num) const { + auto& col = assert_cast<const ColumnNullable&>(src_column); + uint8_t is_null = 0; + if (col.is_null_at(row_num)) [[unlikely]] { + is_null = 1; + dst_column->insert_data(reinterpret_cast<const char*>(is_null), sizeof(uint8_t)); + } else { + dst_column->insert_data(reinterpret_cast<const char*>(is_null), sizeof(uint8_t)); + auto& nested_col = col.get_nested_column(); + nested_serde->write_one_cell_to_binary(nested_col, dst_column, row_num); + } +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h b/be/src/vec/data_types/serde/data_type_nullable_serde.h index 6051c7f722d..51cbf54eaed 100644 --- a/be/src/vec/data_types/serde/data_type_nullable_serde.h +++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h @@ -99,6 +99,9 @@ public: int64_t row_num) const override; Status read_one_cell_from_json(IColumn& column, const rapidjson::Value& result) const override; + void write_one_cell_to_binary(const IColumn& src_column, ColumnString* dst_column, + int64_t row_num) const override; + private: template <bool is_binary_format> Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result, diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp index 9416fc9a8b3..5ba7fdf293a 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -393,6 +393,16 @@ Status DataTypeNumberSerDe<T>::write_column_to_orc(const std::string& timezone, return Status::OK(); } +template <typename T> +void DataTypeNumberSerDe<T>::write_one_cell_to_binary(const IColumn& src_column, + ColumnString* dst_column, + int64_t row_num) const { + const uint8_t type = static_cast<uint8_t>(TypeId<T>::value); + dst_column->insert_data(reinterpret_cast<const char*>(&type), sizeof(uint8_t)); + const auto& data_ref = assert_cast<const ColumnType&>(src_column).get_data_at(row_num); + dst_column->insert_data(data_ref.data, data_ref.size); +} + /// Explicit template instantiations - to avoid code bloat in headers. template class DataTypeNumberSerDe<UInt8>; template class DataTypeNumberSerDe<UInt16>; diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h b/be/src/vec/data_types/serde/data_type_number_serde.h index 203cd9dbf46..c9073f5e868 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.h +++ b/be/src/vec/data_types/serde/data_type_number_serde.h @@ -107,6 +107,9 @@ public: int64_t row_num) const override; Status read_one_cell_from_json(IColumn& column, const rapidjson::Value& result) const override; + void write_one_cell_to_binary(const IColumn& src_column, ColumnString* dst_column, + int64_t row_num) const override; + private: template <bool is_binary_format> Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result, diff --git a/be/src/vec/data_types/serde/data_type_serde.h b/be/src/vec/data_types/serde/data_type_serde.h index 5b0e8fab65e..4634afa1449 100644 --- a/be/src/vec/data_types/serde/data_type_serde.h +++ b/be/src/vec/data_types/serde/data_type_serde.h @@ -338,7 +338,7 @@ public: virtual Status read_one_cell_from_json(IColumn& column, const rapidjson::Value& result) const; virtual void write_one_cell_to_binary(const IColumn& src_column, ColumnString* dst_column, - int64_t row_num) { + int64_t row_num) const { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "write_one_cell_to_binary"); } diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h b/be/src/vec/data_types/serde/data_type_string_serde.h index 69a8cc26171..98cf89ada1e 100644 --- a/be/src/vec/data_types/serde/data_type_string_serde.h +++ b/be/src/vec/data_types/serde/data_type_string_serde.h @@ -366,6 +366,18 @@ public: return Status::OK(); } + void write_one_cell_to_binary(const IColumn& src_column, ColumnString* dst_column, + int64_t row_num) const override { + const uint8_t type = static_cast<uint8_t>(TypeIndex::String); + const auto& col = assert_cast<const ColumnType&>(src_column); + const auto& data_ref = col.get_data_at(row_num); + const size_t size = data_ref.size; + + dst_column->insert_data(reinterpret_cast<const char*>(&type), sizeof(uint8_t)); + dst_column->insert_data(reinterpret_cast<const char*>(&size), sizeof(size_t)); + dst_column->insert_data(data_ref.data, size); + } + private: template <bool is_binary_format> Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org