xiaokang commented on code in PR #23489: URL: https://github.com/apache/doris/pull/23489#discussion_r1308451000
########## be/src/olap/rowset/segment_v2/column_writer.cpp: ########## @@ -905,49 +905,34 @@ size_t ArrayColumnWriter::get_inverted_index_size() { return 0; } -// Now we can only write data one by one. +// batch append data for array Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { - size_t remaining = num_rows; - const auto* col_cursor = reinterpret_cast<const CollectionValue*>(*ptr); - while (remaining > 0) { - // TODO llj: bulk write - size_t num_written = 1; - ordinal_t next_item_ordinal = _item_writer->get_next_rowid(); - RETURN_IF_ERROR(_offset_writer->append_data_in_current_page( - reinterpret_cast<uint8_t*>(&next_item_ordinal), &num_written)); - if (num_written < - 1) { // page is full, write first item offset and update current length page's start ordinal - RETURN_IF_ERROR(_offset_writer->finish_current_page()); - } else { - // write child item. - if (_item_writer->is_nullable()) { - auto* item_data_ptr = const_cast<CollectionValue*>(col_cursor)->mutable_data(); - for (size_t i = 0; i < col_cursor->length(); ++i) { - RETURN_IF_ERROR(_item_writer->append(col_cursor->is_null_at(i), item_data_ptr)); - item_data_ptr = (uint8_t*)item_data_ptr + _item_writer->get_field()->size(); - } - } else { - const void* data = col_cursor->data(); - RETURN_IF_ERROR(_item_writer->append_data(reinterpret_cast<const uint8_t**>(&data), - col_cursor->length())); - } - if (_opts.inverted_index) { - auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get()); - if (writer != nullptr) { - //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr - RETURN_IF_ERROR(_inverted_index_builder->add_array_values( - _item_writer->get_field()->size(), col_cursor, 1)); - } + // data_ptr contains + // [size, offset_ptr, item_data_ptr, item_nullmap_ptr] + auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr); + // total number length + size_t element_cnt = size_t((unsigned long)(*data_ptr)); + auto offset_data = *(data_ptr + 1); + const uint8_t* offsets_ptr = (const uint8_t*)offset_data; + + if (element_cnt > 0) { + auto data = *(data_ptr + 2); + auto nested_null_map = *(data_ptr + 3); + RETURN_IF_ERROR(_item_writer->append(reinterpret_cast<const uint8_t*>(nested_null_map), + reinterpret_cast<const void*>(data), element_cnt)); + if (_opts.inverted_index) { + auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get()); Review Comment: it's not always ScalarColumnWriter now. ########## be/src/olap/types.cpp: ########## @@ -216,7 +212,7 @@ TypeInfoPtr get_type_info(const segment_v2::ColumnMetaPB* column_meta_pb) { MapTypeInfo* map_type_info = new MapTypeInfo(std::move(key_type_info), std::move(value_type_info)); - return create_static_type_info_ptr(map_type_info); + return create_dynamic_type_info_ptr(map_type_info); Review Comment: so, there is memory leak before change to static dynamic? ########## be/src/olap/rowset/segment_v2/column_reader.cpp: ########## @@ -838,6 +838,7 @@ Status OffsetFileColumnIterator::_calculate_offsets( auto& offsets_data = column_offsets.get_data(); ordinal_t first_column_offset = offsets_data[start - 1]; // -1 is valid ordinal_t first_storage_offset = offsets_data[start]; + DCHECK(next_storage_offset >= first_storage_offset); Review Comment: should be > ########## be/src/olap/rowset/segment_v2/inverted_index_writer.cpp: ########## @@ -282,6 +282,60 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { return Status::OK(); } + Status add_array_values(size_t field_size, const void* value_ptr, const uint8_t* null_map, + const uint8_t* offsets_ptr, size_t count) override { + if (count == 0) { + // no values to add inverted index + return Status::OK(); + } + auto offsets = reinterpret_cast<const uint64_t*>(offsets_ptr); + if constexpr (field_is_slice_type(field_type)) { + if (_field == nullptr || _index_writer == nullptr) { + LOG(ERROR) << "field or index writer is null in inverted index writer."; + return Status::InternalError( + "field or index writer is null in inverted index writer"); + } + for (int i = 0; i < count; ++i) { + // offsets[i+1] is now row element count + std::vector<std::string> strings; + // [0, 3, 6] + // [10,20,30] [20,30,40], [30,40,50] + auto start_off = offsets[i]; + auto end_off = offsets[i + 1]; + for (auto j = start_off; j < end_off; ++j) { + if (null_map[j] == 1) { + continue; + } + auto* v = (Slice*)((const uint8_t*)value_ptr + j * field_size); + strings.emplace_back(std::string(v->get_data(), v->get_size())); + } + + auto value = join(strings, " "); + new_fulltext_field(value.c_str(), value.length()); + _rid++; + _index_writer->addDocument(_doc.get()); + } + } else if constexpr (field_is_numeric_type(field_type)) { + for (int i = 0; i < count; ++i) { + auto start_off = offsets[i]; + auto end_off = offsets[i + 1]; + for (size_t j = start_off; j < end_off; ++j) { + if (null_map[j] == 1) { + continue; + } + const CppType* p = &reinterpret_cast<const CppType*>(value_ptr)[j]; + std::string new_value; + size_t value_length = sizeof(CppType); + + _value_key_coder->full_encode_ascending(p, &new_value); + _bkd_writer->add((const uint8_t*)new_value.c_str(), value_length, _rid); + } + _row_ids_seen_for_bkd++; + _rid++; + } + } + return Status::OK(); + } Status add_array_values(size_t field_size, const CollectionValue* values, Review Comment: Is this function still necessary? ########## be/src/olap/rowset/segment_v2/column_writer.h: ########## @@ -345,6 +345,7 @@ class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback { Status write_data() override; Status write_ordinal_index() override; Status append_nulls(size_t num_rows) override; + Status append_nullable(const uint8_t* null_map, const uint8_t** ptr, size_t num_rows) override; Review Comment: encapsulate uint8** ptr to a struct, eg. ArrayConvertResult ########## be/src/olap/rowset/segment_v2/column_writer.cpp: ########## @@ -905,49 +905,34 @@ size_t ArrayColumnWriter::get_inverted_index_size() { return 0; } -// Now we can only write data one by one. +// batch append data for array Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { - size_t remaining = num_rows; - const auto* col_cursor = reinterpret_cast<const CollectionValue*>(*ptr); - while (remaining > 0) { - // TODO llj: bulk write - size_t num_written = 1; - ordinal_t next_item_ordinal = _item_writer->get_next_rowid(); - RETURN_IF_ERROR(_offset_writer->append_data_in_current_page( - reinterpret_cast<uint8_t*>(&next_item_ordinal), &num_written)); - if (num_written < - 1) { // page is full, write first item offset and update current length page's start ordinal - RETURN_IF_ERROR(_offset_writer->finish_current_page()); - } else { - // write child item. - if (_item_writer->is_nullable()) { - auto* item_data_ptr = const_cast<CollectionValue*>(col_cursor)->mutable_data(); - for (size_t i = 0; i < col_cursor->length(); ++i) { - RETURN_IF_ERROR(_item_writer->append(col_cursor->is_null_at(i), item_data_ptr)); - item_data_ptr = (uint8_t*)item_data_ptr + _item_writer->get_field()->size(); - } - } else { - const void* data = col_cursor->data(); - RETURN_IF_ERROR(_item_writer->append_data(reinterpret_cast<const uint8_t**>(&data), - col_cursor->length())); - } - if (_opts.inverted_index) { - auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get()); - if (writer != nullptr) { - //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr - RETURN_IF_ERROR(_inverted_index_builder->add_array_values( - _item_writer->get_field()->size(), col_cursor, 1)); - } + // data_ptr contains + // [size, offset_ptr, item_data_ptr, item_nullmap_ptr] + auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr); + // total number length + size_t element_cnt = size_t((unsigned long)(*data_ptr)); + auto offset_data = *(data_ptr + 1); + const uint8_t* offsets_ptr = (const uint8_t*)offset_data; + + if (element_cnt > 0) { + auto data = *(data_ptr + 2); + auto nested_null_map = *(data_ptr + 3); + RETURN_IF_ERROR(_item_writer->append(reinterpret_cast<const uint8_t*>(nested_null_map), + reinterpret_cast<const void*>(data), element_cnt)); + if (_opts.inverted_index) { + auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get()); + // now only support nested type is scala + if (writer != nullptr) { + //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr + _inverted_index_builder->add_array_values( Review Comment: check return status ########## be/src/olap/task/index_builder.cpp: ########## @@ -298,6 +298,33 @@ Status IndexBuilder::_add_nullable(const std::string& column_name, } return step; }; + if (field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { Review Comment: there is some old code for field->type() == FieldType::OLAP_FIELD_TYPE_ARRAY bellow. ########## be/src/vec/olap/olap_data_convertor.h: ########## @@ -429,18 +429,30 @@ class OlapBlockDataConvertor { std::vector<const void*> _results; }; - class OlapColumnDataConvertorArray - : public OlapColumnDataConvertorPaddedPODArray<CollectionValue> { + class OlapColumnDataConvertorArray : public OlapColumnDataConvertorBase { public: OlapColumnDataConvertorArray(OlapColumnDataConvertorBaseUPtr item_convertor) - : _item_convertor(std::move(item_convertor)) {} + : _item_convertor(std::move(item_convertor)) { + _base_offset = 0; + _results.resize(4); // size + offset + item_data + item_nullmap + } + const void* get_data() const override { return _results.data(); }; + const void* get_data_at(size_t offset) const override { + LOG(FATAL) << "now not support get_data_at for OlapColumnDataConvertorArray"; + }; Status convert_to_olap() override; private: - Status convert_to_olap(const UInt8* null_map, const ColumnArray* column_array, + // Status convert_to_olap(const UInt8* null_map, const ColumnArray* column_array, + // const DataTypeArray* data_type_array); + Status convert_to_olap(const ColumnArray* column_array, Review Comment: why delete null_map? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org