This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2013dcd0e9 [refactor](load) cleanup segment flush logic in beta rowset writer (#21635) 2013dcd0e9 is described below commit 2013dcd0e96fb79493884dc933e9f2e6f941986d Author: Kaijie Chen <c...@apache.org> AuthorDate: Tue Jul 18 18:17:57 2023 +0800 [refactor](load) cleanup segment flush logic in beta rowset writer (#21635) --- be/src/olap/memtable_flush_executor.cpp | 7 +- be/src/olap/rowset/beta_rowset_writer.cpp | 222 +++++++++++++----------------- be/src/olap/rowset/beta_rowset_writer.h | 47 ++++--- be/src/olap/rowset/rowset_writer.h | 18 +-- be/src/olap/rowset/segcompaction.cpp | 2 +- be/src/olap/tablet.cpp | 3 +- 6 files changed, 132 insertions(+), 167 deletions(-) diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index cf478393af..70cd74e214 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -99,8 +99,11 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in int64_t duration_ns; SCOPED_RAW_TIMER(&duration_ns); std::unique_ptr<vectorized::Block> block = memtable->to_block(); - SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_rowset_writer->unfold_variant_column_and_flush_block( - block.get(), segment_id, memtable->flush_mem_tracker(), flush_size))); + { + SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker()); + SKIP_MEMORY_CHECK(RETURN_IF_ERROR( + _rowset_writer->flush_memtable(block.get(), segment_id, flush_size))); + } _memtable_stat += memtable->stat(); DorisMetrics::instance()->memtable_flush_total->increment(1); DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index d99c8e9ee4..4aa7815423 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -89,10 +89,10 @@ BetaRowsetWriter::~BetaRowsetWriter() { if (!fs) { return; } - auto max_segment_id = std::max(_num_segment.load(), _next_segment_id.load()); - for (int i = 0; i < max_segment_id; ++i) { - std::string seg_path = BetaRowset::segment_file_path( - _context.rowset_dir, _context.rowset_id, _segment_start_id + i); + DCHECK_LE(_segment_start_id + _num_segment, _next_segment_id); + for (int i = _segment_start_id; i < _next_segment_id; ++i) { + std::string seg_path = + BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i); // Even if an error is encountered, these files that have not been cleaned up // will be cleaned up by the GC background. So here we only print the error // message when we encounter an error. @@ -134,11 +134,9 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return Status::OK(); } if (UNLIKELY(_segment_writer == nullptr)) { - FlushContext ctx; - ctx.block = block; - RETURN_IF_ERROR(_create_segment_writer(&_segment_writer, &ctx)); + RETURN_IF_ERROR(_create_segment_writer(_segment_writer, allocate_segment_id())); } - return _add_block(block, &_segment_writer); + return _add_block(block, _segment_writer); } Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { @@ -446,46 +444,38 @@ Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() { return status; } -Status BetaRowsetWriter::_do_add_block(const vectorized::Block* block, - std::unique_ptr<segment_v2::SegmentWriter>* segment_writer, - size_t row_offset, size_t input_row_num) { - auto s = (*segment_writer)->append_block(block, row_offset, input_row_num); +Status BetaRowsetWriter::_add_rows(const vectorized::Block* block, + std::unique_ptr<segment_v2::SegmentWriter>& segment_writer, + size_t row_offset, size_t input_row_num) { + auto s = segment_writer->append_block(block, row_offset, input_row_num); if (UNLIKELY(!s.ok())) { return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to append block: {}", s.to_string()); } + _raw_num_rows_written += input_row_num; return Status::OK(); } Status BetaRowsetWriter::_add_block(const vectorized::Block* block, - std::unique_ptr<segment_v2::SegmentWriter>* segment_writer, - const FlushContext* flush_ctx) { + std::unique_ptr<segment_v2::SegmentWriter>& segment_writer) { size_t block_size_in_bytes = block->bytes(); size_t block_row_num = block->rows(); size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num); size_t row_offset = 0; - if (flush_ctx != nullptr && flush_ctx->segment_id.has_value()) { - // the entire block (memtable) should be flushed into single segment - RETURN_IF_ERROR(_do_add_block(block, segment_writer, 0, block_row_num)); - _raw_num_rows_written += block_row_num; - return Status::OK(); - } - do { - auto max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes); + auto max_row_add = segment_writer->max_row_to_add(row_avg_size_in_bytes); if (UNLIKELY(max_row_add < 1)) { // no space for another single row, need flush now RETURN_IF_ERROR(_flush_segment_writer(segment_writer)); - RETURN_IF_ERROR(_create_segment_writer(segment_writer, flush_ctx)); - max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes); + RETURN_IF_ERROR(_create_segment_writer(segment_writer, allocate_segment_id())); + max_row_add = segment_writer->max_row_to_add(row_avg_size_in_bytes); DCHECK(max_row_add > 0); } size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add)); - RETURN_IF_ERROR(_do_add_block(block, segment_writer, row_offset, input_row_num)); + RETURN_IF_ERROR(_add_rows(block, segment_writer, row_offset, input_row_num)); row_offset += input_row_num; } while (row_offset < block_row_num); - _raw_num_rows_written += block_row_num; return Status::OK(); } @@ -515,52 +505,45 @@ Status BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr row Status BetaRowsetWriter::flush() { if (_segment_writer != nullptr) { - RETURN_IF_ERROR(_flush_segment_writer(&_segment_writer)); + RETURN_IF_ERROR(_flush_segment_writer(_segment_writer)); } return Status::OK(); } -Status BetaRowsetWriter::unfold_variant_column_and_flush_block( - vectorized::Block* block, int32_t segment_id, - const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t* flush_size) { - SCOPED_CONSUME_MEM_TRACKER(flush_mem_tracker); - +Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segment_id, + int64_t* flush_size) { if (block->rows() == 0) { return Status::OK(); } - FlushContext ctx; - ctx.block = block; + TabletSchemaSPtr flush_schema; if (_context.tablet_schema->is_dynamic_schema()) { // Unfold variant column - RETURN_IF_ERROR(_unfold_variant_column(*block, ctx.flush_schema)); + RETURN_IF_ERROR(_unfold_variant_column(*block, flush_schema)); + } + { + SCOPED_RAW_TIMER(&_segment_writer_ns); + RETURN_IF_ERROR(_flush_single_block(block, segment_id, flush_size, flush_schema)); } - ctx.segment_id = std::optional<int32_t> {segment_id}; - SCOPED_RAW_TIMER(&_segment_writer_ns); - RETURN_IF_ERROR(flush_single_block(block, flush_size, &ctx)); RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); RETURN_IF_ERROR(_segcompaction_if_necessary()); return Status::OK(); } -Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block, int64* flush_size, - const FlushContext* ctx) { +Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block) { if (block->rows() == 0) { return Status::OK(); } + return _flush_single_block(block, allocate_segment_id()); +} +Status BetaRowsetWriter::_flush_single_block(const vectorized::Block* block, int32_t segment_id, + int64_t* flush_size, TabletSchemaSPtr flush_schema) { std::unique_ptr<segment_v2::SegmentWriter> writer; - RETURN_IF_ERROR(_create_segment_writer(&writer, ctx)); - segment_v2::SegmentWriter* raw_writer = writer.get(); - int32_t segment_id = writer->get_segment_id(); - RETURN_IF_ERROR(_add_block(block, &writer, ctx)); - // if segment_id is present in flush context, - // the entire memtable should be flushed into a single segment - if (ctx != nullptr && ctx->segment_id.has_value()) { - DCHECK_EQ(writer->get_segment_id(), segment_id); - DCHECK_EQ(writer.get(), raw_writer); - } - RETURN_IF_ERROR(_flush_segment_writer(&writer, flush_size)); + bool no_compression = block->bytes() <= config::segment_compression_threshold_kb * 1024; + RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); + RETURN_IF_ERROR(_add_rows(block, writer, 0, block->rows())); + RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); return Status::OK(); } @@ -600,7 +583,7 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_r RowsetSharedPtr BetaRowsetWriter::build() { // make sure all segments are flushed - DCHECK_EQ(_num_segment, _next_segment_id); + DCHECK_EQ(_segment_start_id + _num_segment, _next_segment_id); // TODO(lingbin): move to more better place, or in a CreateBlockBatch? for (auto& file_writer : _file_writers) { Status status = file_writer->close(); @@ -762,12 +745,12 @@ RowsetSharedPtr BetaRowsetWriter::_build_tmp() { return rowset; } -Status BetaRowsetWriter::_create_file_writer(std::string path, io::FileWriterPtr* file_writer) { +Status BetaRowsetWriter::_create_file_writer(std::string path, io::FileWriterPtr& file_writer) { auto fs = _rowset_meta->fs(); if (!fs) { return Status::Error<INIT_FAILED>("get fs failed"); } - Status st = fs->create_file(path, file_writer); + Status st = fs->create_file(path, &file_writer); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; return st; @@ -777,85 +760,71 @@ Status BetaRowsetWriter::_create_file_writer(std::string path, io::FileWriterPtr return Status::OK(); } -Status BetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr* file_writer) { +Status BetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer) { std::string path; path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id); return _create_file_writer(path, file_writer); } -Status BetaRowsetWriter::_create_file_writer(uint32_t begin, uint32_t end, - io::FileWriterPtr* file_writer) { - std::string path; - path = BetaRowset::local_segment_path_segcompacted(_context.rowset_dir, _context.rowset_id, - begin, end); - return _create_file_writer(path, file_writer); +Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( + std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) { + DCHECK(begin >= 0 && end >= 0); + std::string path = BetaRowset::local_segment_path_segcompacted(_context.rowset_dir, + _context.rowset_id, begin, end); + io::FileWriterPtr file_writer; + RETURN_IF_ERROR(_create_file_writer(path, file_writer)); + + segment_v2::SegmentWriterOptions writer_options; + writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; + writer_options.rowset_ctx = &_context; + writer_options.write_type = _context.write_type; + writer_options.write_type = DataWriteType::TYPE_COMPACTION; + + writer->reset(new segment_v2::SegmentWriter(file_writer.get(), _num_segcompacted, + _context.tablet_schema, _context.tablet, + _context.data_dir, _context.max_rows_per_segment, + writer_options, _context.mow_context)); + if (_segcompaction_worker.get_file_writer() != nullptr) { + _segcompaction_worker.get_file_writer()->close(); + } + _segcompaction_worker.get_file_writer().reset(file_writer.release()); + + return Status::OK(); } -Status BetaRowsetWriter::_do_create_segment_writer( - std::unique_ptr<segment_v2::SegmentWriter>* writer, bool is_segcompaction, int64_t begin, - int64_t end, const FlushContext* flush_ctx) { - Status st; - std::string path; - int32_t segment_id = 0; +Status BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer, + int32_t segment_id, bool no_compression, + TabletSchemaSPtr flush_schema) { + RETURN_IF_ERROR(_check_segment_number_limit()); io::FileWriterPtr file_writer; - if (is_segcompaction) { - DCHECK(begin >= 0 && end >= 0); - st = _create_file_writer(begin, end, &file_writer); - } else { - int32_t segid_offset = (flush_ctx != nullptr && flush_ctx->segment_id.has_value()) - ? flush_ctx->segment_id.value() - : allocate_segment_id(); - segment_id = segid_offset + _segment_start_id; - st = create_file_writer(segment_id, &file_writer); - } - if (!st.ok()) { - return st; - } + RETURN_IF_ERROR(create_file_writer(segment_id, file_writer)); segment_v2::SegmentWriterOptions writer_options; writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; writer_options.rowset_ctx = &_context; writer_options.write_type = _context.write_type; - if (is_segcompaction) { - writer_options.write_type = DataWriteType::TYPE_COMPACTION; + if (no_compression) { + writer_options.compression_type = NO_COMPRESSION; } - if (is_segcompaction) { - writer->reset(new segment_v2::SegmentWriter( - file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet, - _context.data_dir, _context.max_rows_per_segment, writer_options, - _context.mow_context)); - if (_segcompaction_worker.get_file_writer() != nullptr) { - _segcompaction_worker.get_file_writer()->close(); - } - _segcompaction_worker.get_file_writer().reset(file_writer.release()); - } else { - const auto& tablet_schema = flush_ctx && flush_ctx->flush_schema ? flush_ctx->flush_schema - : _context.tablet_schema; - if (flush_ctx && flush_ctx->block && - flush_ctx->block->bytes() <= config::segment_compression_threshold_kb * 1024) { - writer_options.compression_type = NO_COMPRESSION; - } - writer->reset(new segment_v2::SegmentWriter( - file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir, - _context.max_rows_per_segment, writer_options, _context.mow_context)); - { - std::lock_guard<SpinLock> l(_lock); - _file_writers.push_back(std::move(file_writer)); - } - auto s = (*writer)->init(); - if (!s.ok()) { - LOG(WARNING) << "failed to init segment writer: " << s.to_string(); - writer->reset(nullptr); - return s; - } + const auto& tablet_schema = flush_schema ? flush_schema : _context.tablet_schema; + writer.reset(new segment_v2::SegmentWriter( + file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir, + _context.max_rows_per_segment, writer_options, _context.mow_context)); + { + std::lock_guard<SpinLock> l(_lock); + _file_writers.push_back(std::move(file_writer)); + } + auto s = writer->init(); + if (!s.ok()) { + LOG(WARNING) << "failed to init segment writer: " << s.to_string(); + writer.reset(); + return s; } - return Status::OK(); } -Status BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer, - const FlushContext* flush_ctx) { +Status BetaRowsetWriter::_check_segment_number_limit() { size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) { return Status::Error<TOO_MANY_SEGMENTS>( @@ -864,43 +833,42 @@ Status BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::Segm _context.tablet_id, _context.rowset_id.to_string(), config::max_segment_num_per_rowset, _num_segment, _segcompacted_point, _num_segcompacted); - } else { - return _do_create_segment_writer(writer, false, -1, -1, flush_ctx); } + return Status::OK(); } -Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer, +Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer, int64_t* flush_size) { - uint32_t segid = (*writer)->get_segment_id(); - uint32_t row_num = (*writer)->num_rows_written(); + uint32_t segid = writer->get_segment_id(); + uint32_t row_num = writer->num_rows_written(); - if ((*writer)->num_rows_written() == 0) { + if (writer->num_rows_written() == 0) { return Status::OK(); } uint64_t segment_size; uint64_t index_size; - Status s = (*writer)->finalize(&segment_size, &index_size); + Status s = writer->finalize(&segment_size, &index_size); if (!s.ok()) { return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string()); } VLOG_DEBUG << "tablet_id:" << _context.tablet_id - << " flushing filename: " << (*writer)->get_data_dir()->path() + << " flushing filename: " << writer->get_data_dir()->path() << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment; KeyBoundsPB key_bounds; - Slice min_key = (*writer)->min_encoded_key(); - Slice max_key = (*writer)->max_encoded_key(); + Slice min_key = writer->min_encoded_key(); + Slice max_key = writer->max_encoded_key(); DCHECK_LE(min_key.compare(max_key), 0); key_bounds.set_min_key(min_key.to_string()); key_bounds.set_max_key(max_key.to_string()); SegmentStatistics segstat; segstat.row_num = row_num; - segstat.data_size = segment_size + (*writer)->get_inverted_index_file_size(); - segstat.index_size = index_size + (*writer)->get_inverted_index_file_size(); + segstat.data_size = segment_size + writer->get_inverted_index_file_size(); + segstat.index_size = index_size + writer->get_inverted_index_file_size(); segstat.key_bounds = key_bounds; - writer->reset(); + writer.reset(); if (flush_size) { *flush_size = segment_size + index_size; } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index e88efdc000..bb3e55d764 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -84,20 +84,18 @@ public: Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) override; - Status create_file_writer(uint32_t segment_id, io::FileWriterPtr* writer); + Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer); void add_segment(uint32_t segid, SegmentStatistics& segstat); Status flush() override; - Status unfold_variant_column_and_flush_block( - vectorized::Block* block, int32_t segment_id, - const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t* flush_size) override; + Status flush_memtable(vectorized::Block* block, int32_t segment_id, + int64_t* flush_size) override; // Return the file size flushed to disk in "flush_size" // This method is thread-safe. - Status flush_single_block(const vectorized::Block* block, int64_t* flush_size, - const FlushContext* ctx = nullptr) override; + Status flush_single_block(const vectorized::Block* block) override; RowsetSharedPtr build() override; @@ -129,31 +127,38 @@ public: Status wait_flying_segcompaction() override; - void set_segment_start_id(int32_t start_id) override { _segment_start_id = start_id; } + void set_segment_start_id(int32_t start_id) override { + _segment_start_id = start_id; + _next_segment_id = start_id; + } int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; } int64_t segment_writer_ns() override { return _segment_writer_ns; } private: - Status _do_add_block(const vectorized::Block* block, - std::unique_ptr<segment_v2::SegmentWriter>* segment_writer, - size_t row_offset, size_t input_row_num); + Status _add_rows(const vectorized::Block* block, + std::unique_ptr<segment_v2::SegmentWriter>& segment_writer, size_t row_offset, + size_t input_row_num); Status _add_block(const vectorized::Block* block, - std::unique_ptr<segment_v2::SegmentWriter>* writer, - const FlushContext* flush_ctx = nullptr); - - Status _create_file_writer(std::string path, io::FileWriterPtr* file_writer); - Status _create_file_writer(uint32_t begin, uint32_t end, io::FileWriterPtr* writer); - Status _do_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer, - bool is_segcompaction, int64_t begin, int64_t end, - const FlushContext* ctx = nullptr); - Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer, - const FlushContext* ctx = nullptr); - Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer, + std::unique_ptr<segment_v2::SegmentWriter>& writer); + + Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer); + Status _check_segment_number_limit(); + Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer, + int32_t segment_id, bool no_compression = false, + TabletSchemaSPtr flush_schema = nullptr); + Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer, int64_t* flush_size = nullptr); + Status _flush_single_block(const vectorized::Block* block, int32_t segment_id, + int64_t* flush_size = nullptr, + TabletSchemaSPtr flush_schema = nullptr); Status _generate_delete_bitmap(int32_t segment_id); void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta); + + // segment compaction + Status _create_segment_writer_for_segcompaction( + std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end); Status _segcompaction_if_necessary(); Status _segcompaction_ramaining_if_necessary(); Status _load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments, diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 45db4afe34..9a6ae89dd6 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -35,14 +35,6 @@ namespace doris { class MemTable; -// Context for single memtable flush -struct FlushContext { - ENABLE_FACTORY_CREATOR(FlushContext); - TabletSchemaSPtr flush_schema = nullptr; - const vectorized::Block* block = nullptr; - std::optional<int32_t> segment_id = std::nullopt; -}; - class RowsetWriter { public: RowsetWriter() = default; @@ -78,15 +70,13 @@ public: "RowsetWriter not support final_flush"); } - virtual Status unfold_variant_column_and_flush_block( - vectorized::Block* block, int32_t segment_id, - const std::shared_ptr<MemTracker>& flush_mem_tracker, int64_t* flush_size) { + virtual Status flush_memtable(vectorized::Block* block, int32_t segment_id, + int64_t* flush_size) { return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>( - "RowsetWriter not support unfold_variant_column_and_flush_block"); + "RowsetWriter not support flush_memtable"); } - virtual Status flush_single_block(const vectorized::Block* block, int64_t* flush_size, - const FlushContext* ctx = nullptr) { + virtual Status flush_single_block(const vectorized::Block* block) { return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>( "RowsetWriter not support flush_single_block"); } diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 234651a278..cac1e78fa6 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -187,7 +187,7 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat Status SegcompactionWorker::_create_segment_writer_for_segcompaction( std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin, uint64_t end) { - return _writer->_do_create_segment_writer(writer, true, begin, end); + return _writer->_create_segment_writer_for_segcompaction(writer, begin, end); } Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 9ad6096bfe..e53661dce8 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2977,8 +2977,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, RETURN_IF_ERROR(generate_new_block_for_partial_update( rowset_schema, read_plan_ori, read_plan_update, rsid_to_rowset, &block)); sort_block(block, ordered_block); - int64_t size; - RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block, &size)); + RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block)); } LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() << " rowset: " << rowset_id << " seg_id: " << seg->id() << " dummy_version: " << end_version + 1 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org