This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b2861975ec [FIX](array/map)fix array map batch append data with right 
next_array_item_rowid (#23779)
b2861975ec is described below

commit b2861975ecf0ec53daaf2e16675f27feb0e551ae
Author: amory <wangqian...@selectdb.com>
AuthorDate: Wed Sep 6 14:47:37 2023 +0800

    [FIX](array/map)fix array map batch append data with right 
next_array_item_rowid (#23779)
---
 be/src/olap/rowset/segment_v2/column_writer.cpp | 63 ++++++++++++++++++-------
 be/src/olap/rowset/segment_v2/column_writer.h   | 38 +++++++++++----
 be/src/vec/olap/olap_data_convertor.cpp         |  6 ++-
 3 files changed, 78 insertions(+), 29 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp 
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index ec2baa10f0..7a446f1123 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -221,7 +221,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& 
opts, const TabletColumn*
             length_column.set_index_length(-1); // no short key index
             std::unique_ptr<Field> 
bigint_field(FieldFactory::create(length_column));
             auto* length_writer =
-                    new ScalarColumnWriter(length_options, 
std::move(bigint_field), file_writer);
+                    new OffsetColumnWriter(length_options, 
std::move(bigint_field), file_writer);
 
             // if nullable, create null writer
             ScalarColumnWriter* null_writer = nullptr;
@@ -314,7 +314,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& 
opts, const TabletColumn*
             length_column.set_index_length(-1); // no short key index
             std::unique_ptr<Field> 
bigint_field(FieldFactory::create(length_column));
             auto* length_writer =
-                    new ScalarColumnWriter(length_options, 
std::move(bigint_field), file_writer);
+                    new OffsetColumnWriter(length_options, 
std::move(bigint_field), file_writer);
 
             // create null writer
             if (opts.meta->is_nullable()) {
@@ -731,6 +731,48 @@ Status ScalarColumnWriter::finish_current_page() {
 
 
////////////////////////////////////////////////////////////////////////////////
 
+////////////////////////////////////////////////////////////////////////////////
+// offset column writer
+////////////////////////////////////////////////////////////////////////////////
+
+OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts,
+                                       std::unique_ptr<Field> field, 
io::FileWriter* file_writer)
+        : ScalarColumnWriter(opts, std::move(field), file_writer) {
+    // now we only explain data in offset column as uint64
+    DCHECK(get_field()->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT);
+}
+
+OffsetColumnWriter::~OffsetColumnWriter() = default;
+
+Status OffsetColumnWriter::init() {
+    RETURN_IF_ERROR(ScalarColumnWriter::init());
+    register_flush_page_callback(this);
+    _next_offset = 0;
+    return Status::OK();
+}
+
+Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
+    size_t remaining = num_rows;
+    while (remaining > 0) {
+        size_t num_written = remaining;
+        RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
+        // _next_offset after append_data_in_current_page is the offset of 
next data, which will used in finish_current_page() to set 
next_array_item_ordinal
+        _next_offset = *(const uint64_t*)(*ptr);
+        remaining -= num_written;
+
+        if (_page_builder->is_page_full()) {
+            // get next data for next array_item_rowid
+            RETURN_IF_ERROR(finish_current_page());
+        }
+    }
+    return Status::OK();
+}
+
+Status OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
+    footer->set_next_array_item_ordinal(_next_offset);
+    return Status::OK();
+}
+
 StructColumnWriter::StructColumnWriter(
         const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
         ScalarColumnWriter* null_writer,
@@ -853,7 +895,7 @@ Status StructColumnWriter::finish_current_page() {
 
////////////////////////////////////////////////////////////////////////////////
 
 ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts, 
std::unique_ptr<Field> field,
-                                     ScalarColumnWriter* offset_writer,
+                                     OffsetColumnWriter* offset_writer,
                                      ScalarColumnWriter* null_writer,
                                      std::unique_ptr<ColumnWriter> item_writer)
         : ColumnWriter(std::move(field), opts.meta->is_nullable()),
@@ -871,7 +913,6 @@ Status ArrayColumnWriter::init() {
         RETURN_IF_ERROR(_null_writer->init());
     }
     RETURN_IF_ERROR(_item_writer->init());
-    _offset_writer->register_flush_page_callback(this);
     if (_opts.inverted_index) {
         auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
         if (writer != nullptr) {
@@ -885,11 +926,6 @@ Status ArrayColumnWriter::init() {
     return Status::OK();
 }
 
-Status ArrayColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
-    footer->set_next_array_item_ordinal(_item_writer->get_next_rowid());
-    return Status::OK();
-}
-
 Status ArrayColumnWriter::write_inverted_index() {
     if (_opts.inverted_index) {
         return _inverted_index_builder->finish();
@@ -1008,7 +1044,7 @@ Status ArrayColumnWriter::finish_current_page() {
 
 /// ============================= MapColumnWriter =====================////
 MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, 
std::unique_ptr<Field> field,
-                                 ScalarColumnWriter* null_writer, 
ScalarColumnWriter* offset_writer,
+                                 ScalarColumnWriter* null_writer, 
OffsetColumnWriter* offset_writer,
                                  std::vector<std::unique_ptr<ColumnWriter>>& 
kv_writers)
         : ColumnWriter(std::move(field), opts.meta->is_nullable()), 
_opts(opts) {
     CHECK_EQ(kv_writers.size(), 2);
@@ -1028,7 +1064,6 @@ Status MapColumnWriter::init() {
     }
     // here register_flush_page_callback to call this.put_extra_info_in_page()
     // when finish cur data page
-    _offsets_writer->register_flush_page_callback(this);
     for (auto& sub_writer : _kv_writers) {
         RETURN_IF_ERROR(sub_writer->init());
     }
@@ -1138,12 +1173,6 @@ Status MapColumnWriter::finish_current_page() {
     return Status::NotSupported("map writer has no data, can not 
finish_current_page");
 }
 
-// write this value for column reader to read according offsets
-Status MapColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
-    footer->set_next_array_item_ordinal(_kv_writers[0]->get_next_rowid());
-    return Status::OK();
-}
-
 Status MapColumnWriter::write_inverted_index() {
     if (_opts.inverted_index) {
         return _inverted_index_builder->finish();
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h 
b/be/src/olap/rowset/segment_v2/column_writer.h
index b5aabd4e3a..1bc0afb972 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -171,7 +171,7 @@ public:
 // Because some columns would be stored in a file, we should wait
 // until all columns has been finished, and then data can be written
 // to file
-class ScalarColumnWriter final : public ColumnWriter {
+class ScalarColumnWriter : public ColumnWriter {
 public:
     ScalarColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> 
field,
                        io::FileWriter* file_writer);
@@ -208,6 +208,7 @@ public:
 
     Status append_data_in_current_page(const uint8_t* ptr, size_t* 
num_written);
     friend class ArrayColumnWriter;
+    friend class OffsetColumnWriter;
 
 private:
     std::unique_ptr<PageBuilder> _page_builder;
@@ -276,6 +277,26 @@ private:
     FlushPageCallback* _new_page_callback = nullptr;
 };
 
+// offsetColumnWriter is used column which has offset column, like array, map.
+//  column type is only uint64 and should response for whole column value 
[start, end], end will set
+//  in footer.next_array_item_ordinal which in finish_cur_page() callback 
put_extra_info_in_page()
+class OffsetColumnWriter final : public ScalarColumnWriter, FlushPageCallback {
+public:
+    OffsetColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> 
field,
+                       io::FileWriter* file_writer);
+
+    ~OffsetColumnWriter() override;
+
+    Status init() override;
+
+    Status append_data(const uint8_t** ptr, size_t num_rows) override;
+
+private:
+    Status put_extra_info_in_page(DataPageFooterPB* footer) override;
+
+    uint64_t _next_offset;
+};
+
 class StructColumnWriter final : public ColumnWriter {
 public:
     explicit StructColumnWriter(const ColumnWriterOptions& opts, 
std::unique_ptr<Field> field,
@@ -328,10 +349,10 @@ private:
     ColumnWriterOptions _opts;
 };
 
-class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback {
+class ArrayColumnWriter final : public ColumnWriter {
 public:
     explicit ArrayColumnWriter(const ColumnWriterOptions& opts, 
std::unique_ptr<Field> field,
-                               ScalarColumnWriter* offset_writer, 
ScalarColumnWriter* null_writer,
+                               OffsetColumnWriter* offset_writer, 
ScalarColumnWriter* null_writer,
                                std::unique_ptr<ColumnWriter> item_writer);
     ~ArrayColumnWriter() override = default;
 
@@ -373,22 +394,21 @@ public:
     ordinal_t get_next_rowid() const override { return 
_offset_writer->get_next_rowid(); }
 
 private:
-    Status put_extra_info_in_page(DataPageFooterPB* header) override;
     Status write_null_column(size_t num_rows, bool is_null); // 
写入num_rows个null标记
     bool has_empty_items() const { return _item_writer->get_next_rowid() == 0; 
}
 
 private:
-    std::unique_ptr<ScalarColumnWriter> _offset_writer;
+    std::unique_ptr<OffsetColumnWriter> _offset_writer;
     std::unique_ptr<ScalarColumnWriter> _null_writer;
     std::unique_ptr<ColumnWriter> _item_writer;
     std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder;
     ColumnWriterOptions _opts;
 };
 
-class MapColumnWriter final : public ColumnWriter, public FlushPageCallback {
+class MapColumnWriter final : public ColumnWriter {
 public:
     explicit MapColumnWriter(const ColumnWriterOptions& opts, 
std::unique_ptr<Field> field,
-                             ScalarColumnWriter* null_writer, 
ScalarColumnWriter* offsets_writer,
+                             ScalarColumnWriter* null_writer, 
OffsetColumnWriter* offsets_writer,
                              std::vector<std::unique_ptr<ColumnWriter>>& 
_kv_writers);
 
     ~MapColumnWriter() override = default;
@@ -432,12 +452,10 @@ public:
     ordinal_t get_next_rowid() const override { return 
_offsets_writer->get_next_rowid(); }
 
 private:
-    Status put_extra_info_in_page(DataPageFooterPB* header) override;
-
     std::vector<std::unique_ptr<ColumnWriter>> _kv_writers;
     // we need null writer to make sure a row is null or not
     std::unique_ptr<ScalarColumnWriter> _null_writer;
-    std::unique_ptr<ScalarColumnWriter> _offsets_writer;
+    std::unique_ptr<OffsetColumnWriter> _offsets_writer;
     std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder;
     ColumnWriterOptions _opts;
 };
diff --git a/be/src/vec/olap/olap_data_convertor.cpp 
b/be/src/vec/olap/olap_data_convertor.cpp
index 181f1cd477..e7b59033c4 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -943,6 +943,7 @@ Status 
OlapBlockDataConvertor::OlapColumnDataConvertorArray::convert_to_olap(
     auto elem_size = end_offset - start_offset;
 
     _offsets.clear();
+    // we need all offsets, so reserve num_rows + 1 to make sure last offset 
can be got in offset column, instead of according to nested item column
     _offsets.reserve(_num_rows + 1);
     for (int i = 0; i <= _num_rows; ++i) {
         _offsets.push_back(column_array->offset_at(i + _row_pos) - 
start_offset + _base_offset);
@@ -1011,8 +1012,9 @@ Status 
OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
     auto elem_size = end_offset - start_offset;
 
     _offsets.clear();
-    _offsets.reserve(_num_rows);
-    for (int i = 0; i < _num_rows; ++i) {
+    // we need all offsets, so reserve num_rows + 1 to make sure last offset 
can be got in offset column, instead of according to nested item column
+    _offsets.reserve(_num_rows + 1);
+    for (int i = 0; i <= _num_rows; ++i) {
         _offsets.push_back(column_map->offset_at(i + _row_pos) - start_offset 
+ _base_offset);
     }
     _base_offset += elem_size;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to