This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b2861975ec [FIX](array/map)fix array map batch append data with right next_array_item_rowid (#23779) b2861975ec is described below commit b2861975ecf0ec53daaf2e16675f27feb0e551ae Author: amory <wangqian...@selectdb.com> AuthorDate: Wed Sep 6 14:47:37 2023 +0800 [FIX](array/map)fix array map batch append data with right next_array_item_rowid (#23779) --- be/src/olap/rowset/segment_v2/column_writer.cpp | 63 ++++++++++++++++++------- be/src/olap/rowset/segment_v2/column_writer.h | 38 +++++++++++---- be/src/vec/olap/olap_data_convertor.cpp | 6 ++- 3 files changed, 78 insertions(+), 29 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index ec2baa10f0..7a446f1123 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -221,7 +221,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* length_column.set_index_length(-1); // no short key index std::unique_ptr<Field> bigint_field(FieldFactory::create(length_column)); auto* length_writer = - new ScalarColumnWriter(length_options, std::move(bigint_field), file_writer); + new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer); // if nullable, create null writer ScalarColumnWriter* null_writer = nullptr; @@ -314,7 +314,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* length_column.set_index_length(-1); // no short key index std::unique_ptr<Field> bigint_field(FieldFactory::create(length_column)); auto* length_writer = - new ScalarColumnWriter(length_options, std::move(bigint_field), file_writer); + new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer); // create null writer if (opts.meta->is_nullable()) { @@ -731,6 +731,48 @@ Status ScalarColumnWriter::finish_current_page() { //////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +// offset column writer +//////////////////////////////////////////////////////////////////////////////// + +OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts, + std::unique_ptr<Field> field, io::FileWriter* file_writer) + : ScalarColumnWriter(opts, std::move(field), file_writer) { + // now we only explain data in offset column as uint64 + DCHECK(get_field()->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT); +} + +OffsetColumnWriter::~OffsetColumnWriter() = default; + +Status OffsetColumnWriter::init() { + RETURN_IF_ERROR(ScalarColumnWriter::init()); + register_flush_page_callback(this); + _next_offset = 0; + return Status::OK(); +} + +Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { + size_t remaining = num_rows; + while (remaining > 0) { + size_t num_written = remaining; + RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written)); + // _next_offset after append_data_in_current_page is the offset of next data, which will used in finish_current_page() to set next_array_item_ordinal + _next_offset = *(const uint64_t*)(*ptr); + remaining -= num_written; + + if (_page_builder->is_page_full()) { + // get next data for next array_item_rowid + RETURN_IF_ERROR(finish_current_page()); + } + } + return Status::OK(); +} + +Status OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) { + footer->set_next_array_item_ordinal(_next_offset); + return Status::OK(); +} + StructColumnWriter::StructColumnWriter( const ColumnWriterOptions& opts, std::unique_ptr<Field> field, ScalarColumnWriter* null_writer, @@ -853,7 +895,7 @@ Status StructColumnWriter::finish_current_page() { //////////////////////////////////////////////////////////////////////////////// ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field, - ScalarColumnWriter* offset_writer, + OffsetColumnWriter* offset_writer, ScalarColumnWriter* null_writer, std::unique_ptr<ColumnWriter> item_writer) : ColumnWriter(std::move(field), opts.meta->is_nullable()), @@ -871,7 +913,6 @@ Status ArrayColumnWriter::init() { RETURN_IF_ERROR(_null_writer->init()); } RETURN_IF_ERROR(_item_writer->init()); - _offset_writer->register_flush_page_callback(this); if (_opts.inverted_index) { auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get()); if (writer != nullptr) { @@ -885,11 +926,6 @@ Status ArrayColumnWriter::init() { return Status::OK(); } -Status ArrayColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) { - footer->set_next_array_item_ordinal(_item_writer->get_next_rowid()); - return Status::OK(); -} - Status ArrayColumnWriter::write_inverted_index() { if (_opts.inverted_index) { return _inverted_index_builder->finish(); @@ -1008,7 +1044,7 @@ Status ArrayColumnWriter::finish_current_page() { /// ============================= MapColumnWriter =====================//// MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field, - ScalarColumnWriter* null_writer, ScalarColumnWriter* offset_writer, + ScalarColumnWriter* null_writer, OffsetColumnWriter* offset_writer, std::vector<std::unique_ptr<ColumnWriter>>& kv_writers) : ColumnWriter(std::move(field), opts.meta->is_nullable()), _opts(opts) { CHECK_EQ(kv_writers.size(), 2); @@ -1028,7 +1064,6 @@ Status MapColumnWriter::init() { } // here register_flush_page_callback to call this.put_extra_info_in_page() // when finish cur data page - _offsets_writer->register_flush_page_callback(this); for (auto& sub_writer : _kv_writers) { RETURN_IF_ERROR(sub_writer->init()); } @@ -1138,12 +1173,6 @@ Status MapColumnWriter::finish_current_page() { return Status::NotSupported("map writer has no data, can not finish_current_page"); } -// write this value for column reader to read according offsets -Status MapColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) { - footer->set_next_array_item_ordinal(_kv_writers[0]->get_next_rowid()); - return Status::OK(); -} - Status MapColumnWriter::write_inverted_index() { if (_opts.inverted_index) { return _inverted_index_builder->finish(); diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index b5aabd4e3a..1bc0afb972 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -171,7 +171,7 @@ public: // Because some columns would be stored in a file, we should wait // until all columns has been finished, and then data can be written // to file -class ScalarColumnWriter final : public ColumnWriter { +class ScalarColumnWriter : public ColumnWriter { public: ScalarColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field, io::FileWriter* file_writer); @@ -208,6 +208,7 @@ public: Status append_data_in_current_page(const uint8_t* ptr, size_t* num_written); friend class ArrayColumnWriter; + friend class OffsetColumnWriter; private: std::unique_ptr<PageBuilder> _page_builder; @@ -276,6 +277,26 @@ private: FlushPageCallback* _new_page_callback = nullptr; }; +// offsetColumnWriter is used column which has offset column, like array, map. +// column type is only uint64 and should response for whole column value [start, end], end will set +// in footer.next_array_item_ordinal which in finish_cur_page() callback put_extra_info_in_page() +class OffsetColumnWriter final : public ScalarColumnWriter, FlushPageCallback { +public: + OffsetColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field, + io::FileWriter* file_writer); + + ~OffsetColumnWriter() override; + + Status init() override; + + Status append_data(const uint8_t** ptr, size_t num_rows) override; + +private: + Status put_extra_info_in_page(DataPageFooterPB* footer) override; + + uint64_t _next_offset; +}; + class StructColumnWriter final : public ColumnWriter { public: explicit StructColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field, @@ -328,10 +349,10 @@ private: ColumnWriterOptions _opts; }; -class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback { +class ArrayColumnWriter final : public ColumnWriter { public: explicit ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field, - ScalarColumnWriter* offset_writer, ScalarColumnWriter* null_writer, + OffsetColumnWriter* offset_writer, ScalarColumnWriter* null_writer, std::unique_ptr<ColumnWriter> item_writer); ~ArrayColumnWriter() override = default; @@ -373,22 +394,21 @@ public: ordinal_t get_next_rowid() const override { return _offset_writer->get_next_rowid(); } private: - Status put_extra_info_in_page(DataPageFooterPB* header) override; Status write_null_column(size_t num_rows, bool is_null); // 写入num_rows个null标记 bool has_empty_items() const { return _item_writer->get_next_rowid() == 0; } private: - std::unique_ptr<ScalarColumnWriter> _offset_writer; + std::unique_ptr<OffsetColumnWriter> _offset_writer; std::unique_ptr<ScalarColumnWriter> _null_writer; std::unique_ptr<ColumnWriter> _item_writer; std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder; ColumnWriterOptions _opts; }; -class MapColumnWriter final : public ColumnWriter, public FlushPageCallback { +class MapColumnWriter final : public ColumnWriter { public: explicit MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field, - ScalarColumnWriter* null_writer, ScalarColumnWriter* offsets_writer, + ScalarColumnWriter* null_writer, OffsetColumnWriter* offsets_writer, std::vector<std::unique_ptr<ColumnWriter>>& _kv_writers); ~MapColumnWriter() override = default; @@ -432,12 +452,10 @@ public: ordinal_t get_next_rowid() const override { return _offsets_writer->get_next_rowid(); } private: - Status put_extra_info_in_page(DataPageFooterPB* header) override; - std::vector<std::unique_ptr<ColumnWriter>> _kv_writers; // we need null writer to make sure a row is null or not std::unique_ptr<ScalarColumnWriter> _null_writer; - std::unique_ptr<ScalarColumnWriter> _offsets_writer; + std::unique_ptr<OffsetColumnWriter> _offsets_writer; std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder; ColumnWriterOptions _opts; }; diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index 181f1cd477..e7b59033c4 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -943,6 +943,7 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorArray::convert_to_olap( auto elem_size = end_offset - start_offset; _offsets.clear(); + // we need all offsets, so reserve num_rows + 1 to make sure last offset can be got in offset column, instead of according to nested item column _offsets.reserve(_num_rows + 1); for (int i = 0; i <= _num_rows; ++i) { _offsets.push_back(column_array->offset_at(i + _row_pos) - start_offset + _base_offset); @@ -1011,8 +1012,9 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap( auto elem_size = end_offset - start_offset; _offsets.clear(); - _offsets.reserve(_num_rows); - for (int i = 0; i < _num_rows; ++i) { + // we need all offsets, so reserve num_rows + 1 to make sure last offset can be got in offset column, instead of according to nested item column + _offsets.reserve(_num_rows + 1); + for (int i = 0; i <= _num_rows; ++i) { _offsets.push_back(column_map->offset_at(i + _row_pos) - start_offset + _base_offset); } _base_offset += elem_size; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org