This is an automated email from the ASF dual-hosted git repository. airborne pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new cc2d146517f [Fix](segment compaction) fix error using of inverted index file writer in segment compaction #43114 (#43616) cc2d146517f is described below commit cc2d146517fffd3a58cc0981e98f711fe5a13161 Author: airborne12 <airborn...@gmail.com> AuthorDate: Mon Nov 11 20:29:40 2024 +0800 [Fix](segment compaction) fix error using of inverted index file writer in segment compaction #43114 (#43616) cherry pick from #43114 --- be/src/olap/rowset/beta_rowset_writer.cpp | 7 +++- be/src/olap/rowset/beta_rowset_writer.h | 4 +- be/src/olap/rowset/segcompaction.cpp | 2 +- be/src/olap/rowset/segcompaction.h | 5 +++ be/test/olap/segcompaction_test.cpp | 64 +++++++++++++++++++++++++++++++ 5 files changed, 78 insertions(+), 4 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index a19989af1e7..634e8b64429 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -948,7 +948,7 @@ Status BaseBetaRowsetWriter::create_inverted_index_file_writer( return Status::OK(); } -Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( +Status BetaRowsetWriter::create_segment_writer_for_segcompaction( std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) { DCHECK(begin >= 0 && end >= 0); std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, @@ -988,6 +988,11 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close()); } _segcompaction_worker->get_file_writer().reset(file_writer.release()); + if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer(); + idx_file_writer != nullptr) { + RETURN_IF_ERROR(idx_file_writer->close()); + } + _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release()); return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 6063f714177..ca2685f5956 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -280,6 +280,8 @@ public: Status flush_segment_writer_for_segcompaction( std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size, KeyBoundsPB& key_bounds); + Status create_segment_writer_for_segcompaction( + std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end); bool is_segcompacted() const { return _num_segcompacted > 0; } @@ -290,8 +292,6 @@ private: Status _check_segment_number_limit(size_t segnum) override; int64_t _num_seg() const override; Status _wait_flying_segcompaction(); - Status _create_segment_writer_for_segcompaction( - std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end); Status _segcompaction_if_necessary(); Status _segcompaction_rename_last_segments(); Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id); diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index fc8baf952c1..92b903d3a90 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -219,7 +219,7 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat Status SegcompactionWorker::_create_segment_writer_for_segcompaction( std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end) { - return _writer->_create_segment_writer_for_segcompaction(writer, begin, end); + return _writer->create_segment_writer_for_segcompaction(writer, begin, end); } Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) { diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index 67dd6889aad..f0f8aa6b257 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -25,6 +25,7 @@ #include "olap/merger.h" #include "olap/simple_rowid_conversion.h" #include "olap/tablet.h" +#include "segment_v2/inverted_index_file_writer.h" #include "segment_v2/segment.h" namespace doris { @@ -61,6 +62,9 @@ public: DeleteBitmapPtr get_converted_delete_bitmap() { return _converted_delete_bitmap; } io::FileWriterPtr& get_file_writer() { return _file_writer; } + InvertedIndexFileWriterPtr& get_inverted_index_file_writer() { + return _inverted_index_file_writer; + } // set the cancel flag, tasks already started will not be cancelled. bool cancel(); @@ -86,6 +90,7 @@ private: // Currently cloud storage engine doesn't need segcompaction BetaRowsetWriter* _writer = nullptr; io::FileWriterPtr _file_writer; + InvertedIndexFileWriterPtr _inverted_index_file_writer = nullptr; // for unique key mow table std::unique_ptr<SimpleRowIdConversion> _rowid_conversion; diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp index ba0d23acb02..32d724d246b 100644 --- a/be/test/olap/segcompaction_test.cpp +++ b/be/test/olap/segcompaction_test.cpp @@ -34,6 +34,7 @@ #include "olap/rowset/rowset_reader_context.h" #include "olap/rowset/rowset_writer.h" #include "olap/rowset/rowset_writer_context.h" +#include "olap/rowset/segment_v2/segment_writer.h" #include "olap/storage_engine.h" #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" @@ -178,6 +179,24 @@ protected: tablet_schema->init_from_pb(tablet_schema_pb); } + void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index, int64_t index_id, + const std::string& index_name, int32_t col_unique_id, + const std::string& column_type, const std::string& column_name, + bool parser = false) { + column_pb->set_unique_id(col_unique_id); + column_pb->set_name(column_name); + column_pb->set_type(column_type); + column_pb->set_is_key(false); + column_pb->set_is_nullable(true); + tablet_index->set_index_id(index_id); + tablet_index->set_index_name(index_name); + tablet_index->set_index_type(IndexType::INVERTED); + tablet_index->add_col_unique_id(col_unique_id); + if (parser) { + auto* properties = tablet_index->mutable_properties(); + (*properties)[INVERTED_INDEX_PARSER_KEY] = INVERTED_INDEX_PARSER_UNICODE; + } + } // use different id to avoid conflict void create_rowset_writer_context(int64_t id, TabletSchemaSPtr tablet_schema, RowsetWriterContext* rowset_writer_context) { @@ -830,6 +849,51 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { } } +TEST_F(SegCompactionTest, CreateSegCompactionWriter) { + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + TabletSchemaPB schema_pb; + schema_pb.set_keys_type(KeysType::DUP_KEYS); + schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2); + + construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000, "key_index", 0, "INT", + "key"); + construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, "v1_index", 1, "STRING", + "v1"); + construct_column(schema_pb.add_column(), schema_pb.add_index(), 10002, "v2_index", 2, "STRING", + "v2", true); + construct_column(schema_pb.add_column(), schema_pb.add_index(), 10003, "v3_index", 3, "INT", + "v3"); + + tablet_schema.reset(new TabletSchema); + tablet_schema->init_from_pb(schema_pb); + RowsetSharedPtr rowset; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + config::segcompaction_batch_size = 3; + std::vector<uint32_t> segment_num_rows; + { + RowsetWriterContext writer_context; + create_rowset_writer_context(10052, tablet_schema, &writer_context); + + auto res = RowsetFactory::create_rowset_writer(*l_engine, writer_context, false); + EXPECT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + EXPECT_EQ(Status::OK(), s); + auto beta_rowset_writer = dynamic_cast<BetaRowsetWriter*>(rowset_writer.get()); + EXPECT_TRUE(beta_rowset_writer != nullptr); + std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; + auto status = beta_rowset_writer->create_segment_writer_for_segcompaction(&writer, 0, 1); + EXPECT_TRUE(beta_rowset_writer != nullptr); + EXPECT_TRUE(status == Status::OK()); + int64_t inverted_index_file_size = 0; + status = writer->close_inverted_index(&inverted_index_file_size); + EXPECT_TRUE(status == Status::OK()); + std::cout << inverted_index_file_size << std::endl; + } +} + TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { config::enable_segcompaction = true; Status s; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org