This is an automated email from the ASF dual-hosted git repository.

airborne pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new cc2d146517f [Fix](segment compaction) fix error using of inverted 
index file writer in segment compaction #43114 (#43616)
cc2d146517f is described below

commit cc2d146517fffd3a58cc0981e98f711fe5a13161
Author: airborne12 <airborn...@gmail.com>
AuthorDate: Mon Nov 11 20:29:40 2024 +0800

    [Fix](segment compaction) fix error using of inverted index file writer in 
segment compaction #43114 (#43616)
    
    cherry pick from #43114
---
 be/src/olap/rowset/beta_rowset_writer.cpp |  7 +++-
 be/src/olap/rowset/beta_rowset_writer.h   |  4 +-
 be/src/olap/rowset/segcompaction.cpp      |  2 +-
 be/src/olap/rowset/segcompaction.h        |  5 +++
 be/test/olap/segcompaction_test.cpp       | 64 +++++++++++++++++++++++++++++++
 5 files changed, 78 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index a19989af1e7..634e8b64429 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -948,7 +948,7 @@ Status 
BaseBetaRowsetWriter::create_inverted_index_file_writer(
     return Status::OK();
 }
 
-Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
+Status BetaRowsetWriter::create_segment_writer_for_segcompaction(
         std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, 
int64_t end) {
     DCHECK(begin >= 0 && end >= 0);
     std::string path = 
BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
@@ -988,6 +988,11 @@ Status 
BetaRowsetWriter::_create_segment_writer_for_segcompaction(
         RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close());
     }
     _segcompaction_worker->get_file_writer().reset(file_writer.release());
+    if (auto& idx_file_writer = 
_segcompaction_worker->get_inverted_index_file_writer();
+        idx_file_writer != nullptr) {
+        RETURN_IF_ERROR(idx_file_writer->close());
+    }
+    
_segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release());
     return Status::OK();
 }
 
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index 6063f714177..ca2685f5956 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -280,6 +280,8 @@ public:
     Status flush_segment_writer_for_segcompaction(
             std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t 
index_size,
             KeyBoundsPB& key_bounds);
+    Status create_segment_writer_for_segcompaction(
+            std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, 
int64_t end);
 
     bool is_segcompacted() const { return _num_segcompacted > 0; }
 
@@ -290,8 +292,6 @@ private:
     Status _check_segment_number_limit(size_t segnum) override;
     int64_t _num_seg() const override;
     Status _wait_flying_segcompaction();
-    Status _create_segment_writer_for_segcompaction(
-            std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, 
int64_t end);
     Status _segcompaction_if_necessary();
     Status _segcompaction_rename_last_segments();
     Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, 
int32_t segment_id);
diff --git a/be/src/olap/rowset/segcompaction.cpp 
b/be/src/olap/rowset/segcompaction.cpp
index fc8baf952c1..92b903d3a90 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -219,7 +219,7 @@ Status 
SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
 
 Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
         std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, 
uint32_t end) {
-    return _writer->_create_segment_writer_for_segcompaction(writer, begin, 
end);
+    return _writer->create_segment_writer_for_segcompaction(writer, begin, 
end);
 }
 
 Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr 
segments) {
diff --git a/be/src/olap/rowset/segcompaction.h 
b/be/src/olap/rowset/segcompaction.h
index 67dd6889aad..f0f8aa6b257 100644
--- a/be/src/olap/rowset/segcompaction.h
+++ b/be/src/olap/rowset/segcompaction.h
@@ -25,6 +25,7 @@
 #include "olap/merger.h"
 #include "olap/simple_rowid_conversion.h"
 #include "olap/tablet.h"
+#include "segment_v2/inverted_index_file_writer.h"
 #include "segment_v2/segment.h"
 
 namespace doris {
@@ -61,6 +62,9 @@ public:
     DeleteBitmapPtr get_converted_delete_bitmap() { return 
_converted_delete_bitmap; }
 
     io::FileWriterPtr& get_file_writer() { return _file_writer; }
+    InvertedIndexFileWriterPtr& get_inverted_index_file_writer() {
+        return _inverted_index_file_writer;
+    }
 
     // set the cancel flag, tasks already started will not be cancelled.
     bool cancel();
@@ -86,6 +90,7 @@ private:
     // Currently cloud storage engine doesn't need segcompaction
     BetaRowsetWriter* _writer = nullptr;
     io::FileWriterPtr _file_writer;
+    InvertedIndexFileWriterPtr _inverted_index_file_writer = nullptr;
 
     // for unique key mow table
     std::unique_ptr<SimpleRowIdConversion> _rowid_conversion;
diff --git a/be/test/olap/segcompaction_test.cpp 
b/be/test/olap/segcompaction_test.cpp
index ba0d23acb02..32d724d246b 100644
--- a/be/test/olap/segcompaction_test.cpp
+++ b/be/test/olap/segcompaction_test.cpp
@@ -34,6 +34,7 @@
 #include "olap/rowset/rowset_reader_context.h"
 #include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/rowset_writer_context.h"
+#include "olap/rowset/segment_v2/segment_writer.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_meta.h"
 #include "olap/tablet_schema.h"
@@ -178,6 +179,24 @@ protected:
         tablet_schema->init_from_pb(tablet_schema_pb);
     }
 
+    void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index, 
int64_t index_id,
+                          const std::string& index_name, int32_t col_unique_id,
+                          const std::string& column_type, const std::string& 
column_name,
+                          bool parser = false) {
+        column_pb->set_unique_id(col_unique_id);
+        column_pb->set_name(column_name);
+        column_pb->set_type(column_type);
+        column_pb->set_is_key(false);
+        column_pb->set_is_nullable(true);
+        tablet_index->set_index_id(index_id);
+        tablet_index->set_index_name(index_name);
+        tablet_index->set_index_type(IndexType::INVERTED);
+        tablet_index->add_col_unique_id(col_unique_id);
+        if (parser) {
+            auto* properties = tablet_index->mutable_properties();
+            (*properties)[INVERTED_INDEX_PARSER_KEY] = 
INVERTED_INDEX_PARSER_UNICODE;
+        }
+    }
     // use different id to avoid conflict
     void create_rowset_writer_context(int64_t id, TabletSchemaSPtr 
tablet_schema,
                                       RowsetWriterContext* 
rowset_writer_context) {
@@ -830,6 +849,51 @@ TEST_F(SegCompactionTest, 
SegCompactionThenReadUniqueTableSmall) {
     }
 }
 
+TEST_F(SegCompactionTest, CreateSegCompactionWriter) {
+    config::enable_segcompaction = true;
+    Status s;
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    
schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);
+
+    construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000, 
"key_index", 0, "INT",
+                     "key");
+    construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, 
"v1_index", 1, "STRING",
+                     "v1");
+    construct_column(schema_pb.add_column(), schema_pb.add_index(), 10002, 
"v2_index", 2, "STRING",
+                     "v2", true);
+    construct_column(schema_pb.add_column(), schema_pb.add_index(), 10003, 
"v3_index", 3, "INT",
+                     "v3");
+
+    tablet_schema.reset(new TabletSchema);
+    tablet_schema->init_from_pb(schema_pb);
+    RowsetSharedPtr rowset;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+    // rows_per_segment
+    config::segcompaction_batch_size = 3;
+    std::vector<uint32_t> segment_num_rows;
+    {
+        RowsetWriterContext writer_context;
+        create_rowset_writer_context(10052, tablet_schema, &writer_context);
+
+        auto res = RowsetFactory::create_rowset_writer(*l_engine, 
writer_context, false);
+        EXPECT_TRUE(res.has_value()) << res.error();
+        auto rowset_writer = std::move(res).value();
+        EXPECT_EQ(Status::OK(), s);
+        auto beta_rowset_writer = 
dynamic_cast<BetaRowsetWriter*>(rowset_writer.get());
+        EXPECT_TRUE(beta_rowset_writer != nullptr);
+        std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
+        auto status = 
beta_rowset_writer->create_segment_writer_for_segcompaction(&writer, 0, 1);
+        EXPECT_TRUE(beta_rowset_writer != nullptr);
+        EXPECT_TRUE(status == Status::OK());
+        int64_t inverted_index_file_size = 0;
+        status = writer->close_inverted_index(&inverted_index_file_size);
+        EXPECT_TRUE(status == Status::OK());
+        std::cout << inverted_index_file_size << std::endl;
+    }
+}
+
 TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {
     config::enable_segcompaction = true;
     Status s;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to