This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 397b0771e20 [opt] (inverted index) add inverted index file size for open file (#37482) 397b0771e20 is described below commit 397b0771e2022904d725dc79f245130cd68950a7 Author: Sun Chenyang <csun5...@gmail.com> AuthorDate: Fri Aug 16 19:40:44 2024 +0800 [opt] (inverted index) add inverted index file size for open file (#37482) --- be/src/cloud/cloud_rowset_writer.cpp | 8 + be/src/cloud/pb_convert.cpp | 8 + be/src/olap/compaction.cpp | 5 +- be/src/olap/rowset/beta_rowset.cpp | 6 +- be/src/olap/rowset/beta_rowset_writer.cpp | 14 +- be/src/olap/rowset/beta_rowset_writer.h | 60 +++++++ be/src/olap/rowset/beta_rowset_writer_v2.cpp | 2 +- be/src/olap/rowset/beta_rowset_writer_v2.h | 2 + be/src/olap/rowset/rowset_meta.cpp | 30 ++++ be/src/olap/rowset/rowset_meta.h | 10 ++ be/src/olap/rowset/segment_creator.cpp | 20 ++- be/src/olap/rowset/segment_creator.h | 8 +- .../segment_v2/inverted_index_compaction.cpp | 1 - .../segment_v2/inverted_index_compound_reader.cpp | 117 ++++++------ .../segment_v2/inverted_index_compound_reader.h | 29 ++- .../segment_v2/inverted_index_file_reader.cpp | 93 ++++++---- .../rowset/segment_v2/inverted_index_file_reader.h | 11 +- .../segment_v2/inverted_index_file_writer.cpp | 21 ++- .../rowset/segment_v2/inverted_index_file_writer.h | 16 +- .../segment_v2/inverted_index_fs_directory.cpp | 17 +- .../segment_v2/inverted_index_fs_directory.h | 2 +- be/src/olap/rowset/segment_v2/segment.cpp | 15 +- be/src/olap/rowset/segment_v2/segment.h | 7 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 16 +- be/src/olap/rowset/segment_v2/segment_writer.h | 7 +- .../rowset/segment_v2/vertical_segment_writer.cpp | 19 +- .../rowset/segment_v2/vertical_segment_writer.h | 9 +- be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 6 +- be/src/olap/task/index_builder.cpp | 4 +- .../segment_v2/inverted_index_array_test.cpp | 9 +- gensrc/proto/olap_common.proto | 12 +- gensrc/proto/olap_file.proto | 6 + .../test_compound_reader_fault_injection.out | 4 + .../test_inverted_index_file_size.out | 49 +++++ .../test_inverted_index_v2_file_size.out | 85 +++++++++ .../test_compound_reader_fault_injection.groovy | 62 +++++++ .../test_inverted_index_file_size.groovy | 145 +++++++++++++++ .../test_inverted_index_v2_file_size.groovy | 200 +++++++++++++++++++++ 38 files changed, 946 insertions(+), 189 deletions(-) diff --git a/be/src/cloud/cloud_rowset_writer.cpp b/be/src/cloud/cloud_rowset_writer.cpp index ad5c57fd21e..7753bf7b65b 100644 --- a/be/src/cloud/cloud_rowset_writer.cpp +++ b/be/src/cloud/cloud_rowset_writer.cpp @@ -115,6 +115,14 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) { _rowset_meta->add_segments_file_size(seg_file_size.value()); } + if (auto idx_files_info = _idx_files_info.get_inverted_files_info(_segment_start_id); + !idx_files_info.has_value()) [[unlikely]] { + LOG(ERROR) << "expected inverted index files info, but none presents: " + << idx_files_info.error(); + } else { + _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); + } + RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path, _rowset_meta, &rowset), "rowset init failed when build new rowset"); diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp index 24bdadead33..d5342186541 100644 --- a/be/src/cloud/pb_convert.cpp +++ b/be/src/cloud/pb_convert.cpp @@ -82,6 +82,8 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in) } out->set_enable_segments_file_size(in.enable_segments_file_size()); out->set_has_variant_type_in_schema(in.has_has_variant_type_in_schema()); + out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info()); + out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info()); } void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) { @@ -132,6 +134,8 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) { } out->set_enable_segments_file_size(in.enable_segments_file_size()); out->set_has_variant_type_in_schema(in.has_variant_type_in_schema()); + out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info()); + out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info()); } RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in) { @@ -190,6 +194,8 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in) out->set_schema_version(in.schema_version()); } out->set_enable_segments_file_size(in.enable_segments_file_size()); + out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info()); + out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info()); } void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) { @@ -237,6 +243,8 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) { out->set_schema_version(in.schema_version()); } out->set_enable_segments_file_size(in.enable_segments_file_size()); + out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info()); + out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info()); } TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB& in) { diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 9ed27bad382..9109c59e8c2 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -763,15 +763,17 @@ Status Compaction::do_inverted_index_compaction() { } } + std::vector<InvertedIndexFileInfo> all_inverted_index_file_info(dest_segment_num); uint64_t inverted_index_file_size = 0; for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) { auto inverted_index_file_writer = inverted_index_file_writers[seg_id].get(); if (Status st = inverted_index_file_writer->close(); !st.ok()) { status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg()); } else { - inverted_index_file_size += inverted_index_file_writer->get_index_file_size(); + inverted_index_file_size += inverted_index_file_writer->get_index_file_total_size(); inverted_index_file_size -= compacted_idx_file_size[seg_id]; } + all_inverted_index_file_info[seg_id] = inverted_index_file_writer->get_index_file_info(); } // check index compaction status. If status is not ok, we should return error and end this compaction round. if (!status.ok()) { @@ -786,6 +788,7 @@ Status Compaction::do_inverted_index_compaction() { _output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() + inverted_index_file_size); + _output_rowset->rowset_meta()->update_inverted_index_files_info(all_inverted_index_file_info); COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size()); LOG(INFO) << "succeed to do index compaction" diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 832ca314088..b269051e43f 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -180,7 +180,7 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se }; auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options, - segment); + segment, _rowset_meta->inverted_index_file_info(seg_id)); if (!s.ok()) { LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << rowset_id() << " : " << s.to_string(); @@ -538,8 +538,10 @@ Status BetaRowset::check_current_rowset_segment() { .cache_base_path {}, .file_size = _rowset_meta->segment_file_size(seg_id), }; + auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, - reader_options, &segment); + reader_options, &segment, + _rowset_meta->inverted_index_file_info(seg_id)); if (!s.ok()) { LOG(WARNING) << "segment can not be opened. file=" << seg_path; return s; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index ec1bba7621b..45f260bdfa1 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -195,7 +195,7 @@ BaseBetaRowsetWriter::BaseBetaRowsetWriter() _num_rows_written(0), _total_data_size(0), _total_index_size(0), - _segment_creator(_context, _seg_files) {} + _segment_creator(_context, _seg_files, _idx_files_info) {} BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) : _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {} @@ -737,6 +737,14 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { : _context.tablet_schema; _rowset_meta->set_tablet_schema(rowset_schema); + if (auto idx_files_info = _idx_files_info.get_inverted_files_info(_segment_start_id); + !idx_files_info.has_value()) [[unlikely]] { + LOG(ERROR) << "expected inverted index files info, but none presents: " + << idx_files_info.error(); + } else { + _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); + } + RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path, _rowset_meta, &rowset), "rowset init failed when build new rowset"); @@ -989,8 +997,8 @@ Status BetaRowsetWriter::flush_segment_writer_for_segcompaction( SegmentStatistics segstat; segstat.row_num = row_num; - segstat.data_size = segment_size + (*writer)->get_inverted_index_file_size(); - segstat.index_size = index_size + (*writer)->get_inverted_index_file_size(); + segstat.data_size = segment_size + (*writer)->get_inverted_index_total_size(); + segstat.index_size = index_size + (*writer)->get_inverted_index_total_size(); segstat.key_bounds = key_bounds; { std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index e51fccdc291..a7ec8fe87e9 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -18,6 +18,7 @@ #pragma once #include <fmt/format.h> +#include <gen_cpp/olap_common.pb.h> #include <gen_cpp/olap_file.pb.h> #include <algorithm> @@ -83,6 +84,60 @@ private: bool _closed {false}; }; +// Collect the size of the inverted index files +class InvertedIndexFilesInfo { +public: + // Get inverted index file info in segment id order. + // Return the info of inverted index files from seg_id_offset to the last one. + Result<std::vector<InvertedIndexFileInfo>> get_inverted_files_info(int seg_id_offset) { + std::lock_guard lock(_lock); + + Status st; + std::vector<InvertedIndexFileInfo> inverted_files_info(_inverted_index_files_info.size()); + bool succ = std::all_of( + _inverted_index_files_info.begin(), _inverted_index_files_info.end(), + [&](auto&& it) { + auto&& [seg_id, info] = it; + + int idx = seg_id - seg_id_offset; + if (idx >= inverted_files_info.size()) [[unlikely]] { + auto err_msg = fmt::format( + "invalid seg_id={} num_inverted_files_info={} seg_id_offset={}", + seg_id, inverted_files_info.size(), seg_id_offset); + DCHECK(false) << err_msg; + st = Status::InternalError(err_msg); + return false; + } + + auto& finfo = inverted_files_info[idx]; + if (finfo.has_index_size() || finfo.index_info_size() > 0) [[unlikely]] { + // File size should not been set + auto err_msg = fmt::format("duplicate seg_id={}", seg_id); + DCHECK(false) << err_msg; + st = Status::InternalError(err_msg); + return false; + } + finfo = info; + return true; + }); + + if (succ) { + return inverted_files_info; + } + + return ResultError(st); + } + + void add_file_info(int seg_id, InvertedIndexFileInfo file_info) { + std::lock_guard lock(_lock); + _inverted_index_files_info.emplace(seg_id, file_info); + } + +private: + std::unordered_map<int /* seg_id */, InvertedIndexFileInfo> _inverted_index_files_info; + mutable SpinLock _lock; +}; + class BaseBetaRowsetWriter : public RowsetWriter { public: BaseBetaRowsetWriter(); @@ -160,6 +215,8 @@ public: return _seg_files.get_file_writers(); } + InvertedIndexFilesInfo& get_inverted_index_files_info() { return _idx_files_info; } + private: void update_rowset_schema(TabletSchemaSPtr flush_schema); // build a tmp rowset for load segment to calc delete_bitmap @@ -212,6 +269,9 @@ protected: int64_t _delete_bitmap_ns = 0; int64_t _segment_writer_ns = 0; + + // map<segment_id, inverted_index_file_info> + InvertedIndexFilesInfo _idx_files_info; }; class SegcompactionWorker; diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp b/be/src/olap/rowset/beta_rowset_writer_v2.cpp index 95adf3d6e50..0d0ad435b9e 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp +++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp @@ -58,7 +58,7 @@ namespace doris { using namespace ErrorCode; BetaRowsetWriterV2::BetaRowsetWriterV2(const std::vector<std::shared_ptr<LoadStreamStub>>& streams) - : _segment_creator(_context, _seg_files), _streams(streams) {} + : _segment_creator(_context, _seg_files, _idx_files_info), _streams(streams) {} BetaRowsetWriterV2::~BetaRowsetWriterV2() = default; diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index d2267a3dbd1..174b70a072b 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -157,6 +157,8 @@ private: SegmentCreator _segment_creator; + InvertedIndexFilesInfo _idx_files_info; + fmt::memory_buffer vlog_buffer; std::vector<std::shared_ptr<LoadStreamStub>> _streams; diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index b969db7a2a2..2bc5a6cef85 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -233,6 +233,13 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { _rowset_meta_pb.add_segments_file_size(fsize); } } + if (_rowset_meta_pb.enable_inverted_index_file_info() && + other._rowset_meta_pb.enable_inverted_index_file_info()) { + for (auto finfo : other.inverted_index_file_info()) { + InvertedIndexFileInfo* new_file_info = _rowset_meta_pb.add_inverted_index_file_info(); + *new_file_info = finfo; + } + } // In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated // Otherwise the schema is stale and lead to wrong data read if (tablet_schema()->num_variant_columns() > 0) { @@ -249,6 +256,29 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { } } +InvertedIndexFileInfo RowsetMeta::inverted_index_file_info(int seg_id) { + return _rowset_meta_pb.enable_inverted_index_file_info() + ? (_rowset_meta_pb.inverted_index_file_info_size() > seg_id + ? _rowset_meta_pb.inverted_index_file_info(seg_id) + : InvertedIndexFileInfo()) + : InvertedIndexFileInfo(); +} + +void RowsetMeta::add_inverted_index_files_info( + const std::vector<InvertedIndexFileInfo>& idx_file_info) { + _rowset_meta_pb.set_enable_inverted_index_file_info(true); + for (auto finfo : idx_file_info) { + auto* new_file_info = _rowset_meta_pb.add_inverted_index_file_info(); + *new_file_info = finfo; + } +} + +void RowsetMeta::update_inverted_index_files_info( + const std::vector<InvertedIndexFileInfo>& idx_file_info) { + _rowset_meta_pb.clear_inverted_index_file_info(); + add_inverted_index_files_info(idx_file_info); +} + bool operator==(const RowsetMeta& a, const RowsetMeta& b) { if (a._rowset_id != b._rowset_id) return false; if (a._is_removed_from_rowset_meta != b._is_removed_from_rowset_meta) return false; diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index c5a573d760c..4f25c676f6b 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -357,6 +357,16 @@ public: // Used for partial update, when publish, partial update may add a new rowset and we should update rowset meta void merge_rowset_meta(const RowsetMeta& other); + InvertedIndexFileInfo inverted_index_file_info(int seg_id); + + const auto& inverted_index_file_info() const { + return _rowset_meta_pb.inverted_index_file_info(); + } + + void add_inverted_index_files_info(const std::vector<InvertedIndexFileInfo>& idx_file_info); + + void update_inverted_index_files_info(const std::vector<InvertedIndexFileInfo>& idx_file_info); + // Because the member field '_handle' is a raw pointer, use member func 'init' to replace copy ctor RowsetMeta(const RowsetMeta&) = delete; RowsetMeta operator=(const RowsetMeta&) = delete; diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 5e5dfc3733f..d969f5b3904 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -51,8 +51,9 @@ namespace doris { using namespace ErrorCode; -SegmentFlusher::SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files) - : _context(context), _seg_files(seg_files) {} +SegmentFlusher::SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files, + InvertedIndexFilesInfo& idx_files_info) + : _context(context), _seg_files(seg_files), _idx_files_info(idx_files_info) {} SegmentFlusher::~SegmentFlusher() = default; @@ -243,10 +244,11 @@ Status SegmentFlusher::_flush_segment_writer( uint32_t segment_id = writer->segment_id(); SegmentStatistics segstat; segstat.row_num = row_num; - segstat.data_size = segment_size + writer->inverted_index_file_size(); - segstat.index_size = index_size + writer->inverted_index_file_size(); + segstat.data_size = segment_size + writer->get_inverted_index_total_size(); + segstat.index_size = index_size + writer->get_inverted_index_total_size(); segstat.key_bounds = key_bounds; + _idx_files_info.add_file_info(segment_id, writer->get_inverted_index_file_info()); writer.reset(); RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat, flush_schema)); @@ -288,10 +290,11 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::Segment uint32_t segment_id = writer->get_segment_id(); SegmentStatistics segstat; segstat.row_num = row_num; - segstat.data_size = segment_size + writer->get_inverted_index_file_size(); - segstat.index_size = index_size + writer->get_inverted_index_file_size(); + segstat.data_size = segment_size + writer->get_inverted_index_total_size(); + segstat.index_size = index_size + writer->get_inverted_index_total_size(); segstat.key_bounds = key_bounds; + _idx_files_info.add_file_info(segment_id, writer->get_inverted_index_file_info()); writer.reset(); RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat, flush_schema)); @@ -325,8 +328,9 @@ int64_t SegmentFlusher::Writer::max_row_to_add(size_t row_avg_size_in_bytes) { return _writer->max_row_to_add(row_avg_size_in_bytes); } -SegmentCreator::SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files) - : _segment_flusher(context, seg_files) {} +SegmentCreator::SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files, + InvertedIndexFilesInfo& idx_files_info) + : _segment_flusher(context, seg_files, idx_files_info) {} Status SegmentCreator::add_block(const vectorized::Block* block) { if (block->rows() == 0) { diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index 97a8f177ad9..961e161853c 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -46,6 +46,7 @@ class VerticalSegmentWriter; struct SegmentStatistics; class BetaRowsetWriter; class SegmentFileCollection; +class InvertedIndexFilesInfo; class FileWriterCreator { public: @@ -93,7 +94,8 @@ private: class SegmentFlusher { public: - SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files); + SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files, + InvertedIndexFilesInfo& idx_files_info); ~SegmentFlusher(); @@ -158,6 +160,7 @@ private: private: RowsetWriterContext& _context; SegmentFileCollection& _seg_files; + InvertedIndexFilesInfo& _idx_files_info; // written rows by add_block/add_row std::atomic<int64_t> _num_rows_written = 0; @@ -169,7 +172,8 @@ private: class SegmentCreator { public: - SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files); + SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files, + InvertedIndexFilesInfo& idx_files_info); ~SegmentCreator() = default; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp index 33fcd10ef36..e47189f9137 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp @@ -40,7 +40,6 @@ Status compact_column(int64_t index_id, std::vector<lucene::store::Directory*>& "debug point: index compaction error"); } }) - lucene::store::Directory* dir = DorisFSDirectoryFactory::getDirectory(io::global_local_filesystem(), tmp_path.data()); lucene::analysis::SimpleAnalyzer<char> analyzer; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp index 67c3ac5253f..7613df112ed 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp @@ -32,6 +32,7 @@ #include "CLucene/SharedHeader.h" #include "olap/rowset/segment_v2/inverted_index_fs_directory.h" #include "olap/tablet_schema.h" +#include "util/debug_points.h" namespace doris { namespace io { @@ -65,7 +66,7 @@ protected: public: CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset, const int64_t length, - const int32_t readBufferSize = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE); + const int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE); CSIndexInput(const CSIndexInput& clone); ~CSIndexInput() override; void close() override; @@ -77,8 +78,8 @@ public: }; CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset, - const int64_t length, const int32_t _readBufferSize) - : BufferedIndexInput(_readBufferSize) { + const int64_t length, const int32_t read_buffer_size) + : BufferedIndexInput(read_buffer_size) { this->base = base; this->fileOffset = fileOffset; this->_length = length; @@ -110,25 +111,13 @@ CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone void CSIndexInput::close() {} -DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char* name, - int32_t read_buffer_size, bool open_idx_file_cache) - : readBufferSize(read_buffer_size), - dir(d), - ram_dir(new lucene::store::RAMDirectory()), - file_name(name), - entries(_CLNEW EntriesType(true, true)) { - bool success = false; +DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size) + : _ram_dir(new lucene::store::RAMDirectory()), + _stream(stream), + _entries(_CLNEW EntriesType(true, true)), + _read_buffer_size(read_buffer_size) { try { - if (dir->fileLength(name) == 0) { - LOG(WARNING) << "CompoundReader open failed, index file " << name << " is empty."; - _CLTHROWA(CL_ERR_IO, - fmt::format("CompoundReader open failed, index file {} is empty", name) - .c_str()); - } - stream = dir->openInput(name, readBufferSize); - stream->setIdxFileCache(open_idx_file_cache); - - int32_t count = stream->readVInt(); + int32_t count = _stream->readVInt(); ReaderFileEntry* entry = nullptr; TCHAR tid[CL_MAX_PATH]; uint8_t buffer[BUFFER_LENGTH]; @@ -139,37 +128,50 @@ DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char entry->file_name = aid; entry->offset = stream->readLong(); entry->length = stream->readLong(); - entries->put(aid, entry); + DBUG_EXECUTE_IF("construct_DorisCompoundReader_failed", { + CLuceneError err; + err.set(CL_ERR_IO, "construct_DorisCompoundReader_failed"); + throw err; + }) + _entries->put(aid, entry); // read header file data if (entry->offset < 0) { copyFile(entry->file_name.c_str(), entry->length, buffer, BUFFER_LENGTH); } } - - success = true; - } - _CLFINALLY(if (!success && (stream != nullptr)) { + } catch (...) { try { - stream->close(); - _CLDELETE(stream) + if (_stream != nullptr) { + _stream->close(); + _CLDELETE(_stream) + } + if (_entries != nullptr) { + _entries->clear(); + _CLDELETE(_entries); + } + if (_ram_dir) { + _ram_dir->close(); + _CLDELETE(_ram_dir) + } } catch (CLuceneError& err) { if (err.number() != CL_ERR_IO) { throw err; } } - }) + throw; + } } void DorisCompoundReader::copyFile(const char* file, int64_t file_length, uint8_t* buffer, int64_t buffer_length) { - std::unique_ptr<lucene::store::IndexOutput> output(ram_dir->createOutput(file)); + std::unique_ptr<lucene::store::IndexOutput> output(_ram_dir->createOutput(file)); int64_t start_ptr = output->getFilePointer(); int64_t remainder = file_length; int64_t chunk = buffer_length; while (remainder > 0) { int64_t len = std::min(std::min(chunk, file_length), remainder); - stream->readBytes(buffer, len); + _stream->readBytes(buffer, len); output->writeBytes(buffer, len); remainder -= len; } @@ -178,7 +180,7 @@ void DorisCompoundReader::copyFile(const char* file, int64_t file_length, uint8_ swprintf(buf, CL_MAX_PATH + 100, _T("Non-zero remainder length after copying") _T(": %d (id: %s, length: %d, buffer size: %d)"), - (int)remainder, file_name.c_str(), (int)file_length, (int)chunk); + (int)remainder, file, (int)file_length, (int)chunk); _CLTHROWT(CL_ERR_IO, buf); } @@ -203,7 +205,7 @@ DorisCompoundReader::~DorisCompoundReader() { LOG(ERROR) << "DorisCompoundReader finalize error:" << err.what(); } } - _CLDELETE(entries) + _CLDELETE(_entries) } const char* DorisCompoundReader::getClassName() { @@ -214,26 +216,22 @@ const char* DorisCompoundReader::getObjectName() const { } bool DorisCompoundReader::list(std::vector<std::string>* names) const { - for (EntriesType::const_iterator i = entries->begin(); i != entries->end(); i++) { + for (EntriesType::const_iterator i = _entries->begin(); i != _entries->end(); i++) { names->push_back(i->first); } return true; } bool DorisCompoundReader::fileExists(const char* name) const { - return entries->exists((char*)name); -} - -lucene::store::Directory* DorisCompoundReader::getDirectory() { - return dir; + return _entries->exists((char*)name); } int64_t DorisCompoundReader::fileModified(const char* name) const { - return dir->fileModified(name); + return 0; } int64_t DorisCompoundReader::fileLength(const char* name) const { - ReaderFileEntry* e = entries->get((char*)name); + ReaderFileEntry* e = _entries->get((char*)name); if (e == nullptr) { char buf[CL_MAX_PATH + 30]; strcpy(buf, "File "); @@ -257,12 +255,12 @@ bool DorisCompoundReader::openInput(const char* name, bool DorisCompoundReader::openInput(const char* name, lucene::store::IndexInput*& ret, CLuceneError& error, int32_t bufferSize) { - if (stream == nullptr) { + if (_stream == nullptr) { error.set(CL_ERR_IO, "Stream closed"); return false; } - const ReaderFileEntry* entry = entries->get((char*)name); + const ReaderFileEntry* entry = _entries->get((char*)name); if (entry == nullptr) { char buf[CL_MAX_PATH + 26]; snprintf(buf, CL_MAX_PATH + 26, "No sub-file with id %s found", name); @@ -271,34 +269,30 @@ bool DorisCompoundReader::openInput(const char* name, lucene::store::IndexInput* } // If file is in RAM, just return. - if (ram_dir && ram_dir->fileExists(name)) { - return ram_dir->openInput(name, ret, error, bufferSize); + if (_ram_dir && _ram_dir->fileExists(name)) { + return _ram_dir->openInput(name, ret, error, bufferSize); } if (bufferSize < 1) { - bufferSize = readBufferSize; + bufferSize = _read_buffer_size; } - ret = _CLNEW CSIndexInput(stream, entry->offset, entry->length, bufferSize); + ret = _CLNEW CSIndexInput(_stream, entry->offset, entry->length, bufferSize); return true; } void DorisCompoundReader::close() { std::lock_guard<std::mutex> wlock(_this_lock); - if (stream != nullptr) { - stream->close(); - _CLDELETE(stream) - } - if (entries != nullptr) { - entries->clear(); + if (_stream != nullptr) { + _stream->close(); + _CLDELETE(_stream) } - if (ram_dir) { - ram_dir->close(); - _CLDELETE(ram_dir) + if (_entries != nullptr) { + _entries->clear(); } - if (dir) { - dir->close(); - _CLDECDELETE(dir) + if (_ram_dir) { + _ram_dir->close(); + _CLDELETE(_ram_dir) } _closed = true; } @@ -324,12 +318,11 @@ lucene::store::IndexOutput* DorisCompoundReader::createOutput(const char* /*name } std::string DorisCompoundReader::toString() const { - return std::string("DorisCompoundReader@") + this->directory + std::string("; file_name: ") + - std::string(file_name); + return std::string("DorisCompoundReader@"); } CL_NS(store)::IndexInput* DorisCompoundReader::getDorisIndexInput() { - return stream; + return _stream; } } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h index 1ca2d6ad371..a30c39f8a2f 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h @@ -65,16 +65,12 @@ using EntriesType = lucene::util::Deletor::Object<ReaderFileEntry>>; class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory { private: - int32_t readBufferSize; - // base info - lucene::store::Directory* dir = nullptr; - lucene::store::RAMDirectory* ram_dir = nullptr; - std::string directory; - std::string file_name; - CL_NS(store)::IndexInput* stream = nullptr; - EntriesType* entries = nullptr; + lucene::store::RAMDirectory* _ram_dir = nullptr; + CL_NS(store)::IndexInput* _stream = nullptr; + EntriesType* _entries = nullptr; std::mutex _this_lock; bool _closed = false; + int32_t _read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE; protected: /** Removes an existing file in the directory-> */ @@ -83,10 +79,10 @@ protected: public: explicit DorisCompoundReader( CL_NS(store)::IndexInput* stream, EntriesType* entries_clone, - int32_t _readBufferSize = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE) - : readBufferSize(_readBufferSize), - stream(stream), - entries(_CLNEW EntriesType(true, true)) { + int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE) + : _stream(stream), + _entries(_CLNEW EntriesType(true, true)), + _read_buffer_size(read_buffer_size) { for (auto& e : *entries_clone) { auto* origin_entry = e.second; auto* entry = _CLNEW ReaderFileEntry(); @@ -94,17 +90,15 @@ public: entry->file_name = origin_entry->file_name; entry->offset = origin_entry->offset; entry->length = origin_entry->length; - entries->put(aid, entry); + _entries->put(aid, entry); } }; - DorisCompoundReader(lucene::store::Directory* dir, const char* name, - int32_t _readBufferSize = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE, - bool open_idx_file_cache = false); + DorisCompoundReader(CL_NS(store)::IndexInput* stream, + int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE); ~DorisCompoundReader() override; void copyFile(const char* file, int64_t file_length, uint8_t* buffer, int64_t buffer_length); bool list(std::vector<std::string>* names) const override; bool fileExists(const char* name) const override; - lucene::store::Directory* getDirectory(); int64_t fileModified(const char* name) const override; int64_t fileLength(const char* name) const override; bool openInput(const char* name, lucene::store::IndexInput*& ret, CLuceneError& err, @@ -116,7 +110,6 @@ public: lucene::store::IndexOutput* createOutput(const char* name) override; void close() override; std::string toString() const override; - std::string getFileName() { return file_name; } std::string getPath() const; static const char* getClassName(); const char* getObjectName() const override; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp index 09a6a62aaa6..e0c75922c98 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp @@ -47,34 +47,39 @@ Status InvertedIndexFileReader::_init_from_v2(int32_t read_buffer_size) { std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing try { - int64_t file_size = 0; - Status st = _fs->file_size(index_file_full_path, &file_size); - DBUG_EXECUTE_IF("inverted file read error: index file not found", { - st = Status::Error<doris::ErrorCode::NOT_FOUND>("index file not found"); - }) - if (st.code() == ErrorCode::NOT_FOUND) { - return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( - "inverted index file {} is not found", index_file_full_path); - } else if (!st.ok()) { - return st; - } - if (file_size == 0) { - LOG(WARNING) << "inverted index file " << index_file_full_path << " is empty."; - return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( - "inverted index file {} is empty", index_file_full_path); - } - CLuceneError err; CL_NS(store)::IndexInput* index_input = nullptr; - auto ok = DorisFSDirectory::FSIndexInput::open(_fs, index_file_full_path.c_str(), - index_input, err, read_buffer_size); + + // 1. get file size from meta + int64_t file_size = -1; + if (_idx_file_info.has_index_size()) { + file_size = _idx_file_info.index_size(); + } + file_size = file_size == 0 ? -1 : file_size; + + DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", { + if (file_size == -1) { + return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur file size = -1, file is {}", index_file_full_path); + } + }) + + // 2. open file + auto ok = DorisFSDirectory::FSIndexInput::open( + _fs, index_file_full_path.c_str(), index_input, err, read_buffer_size, file_size); if (!ok) { + if (err.number() == CL_ERR_FileNotFound) { + return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( + "inverted index file {} is not found.", index_file_full_path); + } return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( "CLuceneError occur when open idx file {}, error msg: {}", index_file_full_path, err.what()); } index_input->setIdxFileCache(_open_idx_file_cache); _stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input); + + // 3. read file int32_t version = _stream->readInt(); // Read version number if (version == InvertedIndexStorageFormatPB::V2) { DCHECK(version == _storage_format); @@ -153,23 +158,49 @@ Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::_open( std::unique_ptr<DorisCompoundReader> compound_reader; if (_storage_format == InvertedIndexStorageFormatPB::V1) { - DorisFSDirectory* dir = nullptr; auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1( _index_path_prefix, index_id, index_suffix); try { - std::filesystem::path path(index_file_path); - dir = DorisFSDirectoryFactory::getDirectory(_fs, path.parent_path().c_str()); - compound_reader = std::make_unique<DorisCompoundReader>( - dir, path.filename().c_str(), _read_buffer_size, _open_idx_file_cache); - } catch (CLuceneError& err) { - if (dir != nullptr) { - dir->close(); - _CLDELETE(dir) + CLuceneError err; + CL_NS(store)::IndexInput* index_input = nullptr; + + // 1. get file size from meta + int64_t file_size = -1; + if (_idx_file_info.index_info_size() > 0) { + for (const auto& idx_info : _idx_file_info.index_info()) { + if (index_id == idx_info.index_id() && + index_suffix == idx_info.index_suffix()) { + file_size = idx_info.index_file_size(); + break; + } + } } - if (err.number() == CL_ERR_FileNotFound) { - return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( - "inverted index path: {} not exist.", index_file_path)); + file_size = file_size == 0 ? -1 : file_size; + DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", { + if (file_size == -1) { + return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur file size = -1, file is {}", index_file_path)); + } + }) + + // 2. open file + auto ok = DorisFSDirectory::FSIndexInput::open( + _fs, index_file_path.c_str(), index_input, err, _read_buffer_size, file_size); + if (!ok) { + // now index_input = nullptr + if (err.number() == CL_ERR_FileNotFound) { + return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( + "inverted index file {} is not found.", index_file_path)); + } + return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur when open idx file {}, error msg: {}", index_file_path, + err.what())); } + + // 3. read file in DorisCompoundReader + index_input->setIdxFileCache(_open_idx_file_cache); + compound_reader = std::make_unique<DorisCompoundReader>(index_input, _read_buffer_size); + } catch (CLuceneError& err) { return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( "CLuceneError occur when open idx file {}, error msg: {}", index_file_path, err.what())); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h index 1414f493e4b..8bc28b1882f 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h @@ -51,10 +51,12 @@ public: std::map<std::pair<int64_t, std::string>, std::unique_ptr<EntriesType>>; InvertedIndexFileReader(io::FileSystemSPtr fs, std::string index_path_prefix, - InvertedIndexStorageFormatPB storage_format) + InvertedIndexStorageFormatPB storage_format, + InvertedIndexFileInfo idx_file_info = InvertedIndexFileInfo()) : _fs(std::move(fs)), _index_path_prefix(std::move(index_path_prefix)), - _storage_format(storage_format) {} + _storage_format(storage_format), + _idx_file_info(idx_file_info) {} Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size, bool open_idx_file_cache = false); @@ -65,6 +67,8 @@ public: Status index_file_exist(const TabletIndex* index_meta, bool* res) const; Status has_null(const TabletIndex* index_meta, bool* res) const; Result<InvertedIndexDirectoryMap> get_all_directories(); + // open file v2, init _stream + int64_t get_inverted_file_size() const { return _stream == nullptr ? 0 : _stream->length(); } private: Status _init_from_v2(int32_t read_buffer_size); @@ -72,7 +76,7 @@ private: const std::string& index_suffix) const; IndicesEntriesMap _indices_entries; - std::unique_ptr<CL_NS(store)::IndexInput> _stream; + std::unique_ptr<CL_NS(store)::IndexInput> _stream = nullptr; const io::FileSystemSPtr _fs; std::string _index_path_prefix; int32_t _read_buffer_size = -1; @@ -80,6 +84,7 @@ private: InvertedIndexStorageFormatPB _storage_format; mutable std::shared_mutex _mutex; // Use mutable for const read operations bool _inited = false; + InvertedIndexFileInfo _idx_file_info; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp index 6eb54878924..d11b9fa54d0 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp @@ -84,8 +84,8 @@ Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) { return Status::OK(); } -size_t InvertedIndexFileWriter::headerLength() { - size_t header_size = 0; +int64_t InvertedIndexFileWriter::headerLength() { + int64_t header_size = 0; header_size += sizeof(int32_t) * 2; // Account for the size of the version number and number of indices @@ -122,7 +122,7 @@ Status InvertedIndexFileWriter::close() { }) if (_storage_format == InvertedIndexStorageFormatPB::V1) { try { - _file_size = write_v1(); + _total_file_size = write_v1(); for (const auto& entry : _indices_dirs) { const auto& dir = entry.second; // delete index path, which contains separated inverted index files @@ -137,7 +137,7 @@ Status InvertedIndexFileWriter::close() { } } else { try { - _file_size = write_v2(); + _total_file_size = write_v2(); for (const auto& entry : _indices_dirs) { const auto& dir = entry.second; // delete index path, which contains separated inverted index files @@ -220,8 +220,8 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire input->close(); } -size_t InvertedIndexFileWriter::write_v1() { - size_t total_size = 0; +int64_t InvertedIndexFileWriter::write_v1() { + int64_t total_size = 0; for (const auto& entry : _indices_dirs) { const int64_t index_id = entry.first.first; const auto& index_suffix = entry.first.second; @@ -330,6 +330,12 @@ size_t InvertedIndexFileWriter::write_v1() { output->close(); //LOG(INFO) << (idx_path / idx_name).c_str() << " size:" << compound_file_size; total_size += compound_file_size; + InvertedIndexFileInfo_IndexInfo index_info; + index_info.set_index_id(index_id); + index_info.set_index_suffix(index_suffix); + index_info.set_index_file_size(compound_file_size); + auto* new_index_info = _file_info.add_index_info(); + *new_index_info = index_info; } catch (CLuceneError& err) { LOG(ERROR) << "CLuceneError occur when close idx file " << InvertedIndexDescriptor::get_index_file_path_v1(_index_path_prefix, @@ -342,7 +348,7 @@ size_t InvertedIndexFileWriter::write_v1() { return total_size; } -size_t InvertedIndexFileWriter::write_v2() { +int64_t InvertedIndexFileWriter::write_v2() { // Create the output stream to write the compound file int64_t current_offset = headerLength(); @@ -434,6 +440,7 @@ size_t InvertedIndexFileWriter::write_v2() { _CLDECDELETE(out_dir) auto compound_file_size = compound_file_output->getFilePointer(); compound_file_output->close(); + _file_info.set_index_size(compound_file_size); return compound_file_size; } } // namespace doris::segment_v2 \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h index 024c1dec986..2aceb671d80 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h @@ -19,6 +19,7 @@ #include <CLucene.h> // IWYU pragma: keep #include <CLucene/store/IndexInput.h> +#include <gen_cpp/olap_common.pb.h> #include <gen_cpp/olap_file.pb.h> #include <string> @@ -60,11 +61,12 @@ public: Status delete_index(const TabletIndex* index_meta); Status initialize(InvertedIndexDirectoryMap& indices_dirs); ~InvertedIndexFileWriter() = default; - size_t write_v2(); - size_t write_v1(); + int64_t write_v2(); + int64_t write_v1(); Status close(); - size_t headerLength(); - size_t get_index_file_size() const { return _file_size; } + int64_t headerLength(); + InvertedIndexFileInfo get_index_file_info() const { return _file_info; } + int64_t get_index_file_total_size() const { return _total_file_size; } const io::FileSystemSPtr& get_fs() const { return _fs; } void sort_files(std::vector<FileInfo>& file_infos); void copyFile(const char* fileName, lucene::store::Directory* dir, @@ -80,10 +82,14 @@ private: std::string _rowset_id; int64_t _seg_id; InvertedIndexStorageFormatPB _storage_format; - size_t _file_size = 0; + // v1: all file size + // v2: file size + int64_t _total_file_size = 0; // write to disk or stream io::FileWriterPtr _idx_v2_writer; io::FileWriterOptions _opts; + + InvertedIndexFileInfo _file_info; }; } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp index 27e03b43da2..f752c530020 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp @@ -118,7 +118,7 @@ public: bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const char* path, IndexInput*& ret, CLuceneError& error, - int32_t buffer_size) { + int32_t buffer_size, int64_t file_size) { CND_PRECONDITION(path != nullptr, "path is NULL"); if (buffer_size == -1) { @@ -130,21 +130,26 @@ bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const ch reader_options.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE : io::FileCachePolicy::NO_CACHE; reader_options.is_doris_table = true; + reader_options.file_size = file_size; Status st = fs->open_file(path, &h->_reader, &reader_options); DBUG_EXECUTE_IF("inverted file read error: index file not found", { st = Status::Error<doris::ErrorCode::NOT_FOUND>("index file not found"); }) if (st.code() == ErrorCode::NOT_FOUND) { - error.set(CL_ERR_FileNotFound, "File does not exist"); + error.set(CL_ERR_FileNotFound, fmt::format("File does not exist, file is {}", path).data()); } else if (st.code() == ErrorCode::IO_ERROR) { - error.set(CL_ERR_IO, "File open io error"); + error.set(CL_ERR_IO, fmt::format("File open io error, file is {}", path).data()); } else if (st.code() == ErrorCode::PERMISSION_DENIED) { - error.set(CL_ERR_IO, "File Access denied"); - } else { - error.set(CL_ERR_IO, "Could not open file"); + error.set(CL_ERR_IO, fmt::format("File Access denied, file is {}", path).data()); + } else if (!st.ok()) { + error.set(CL_ERR_IO, fmt::format("Could not open file, file is {}", path).data()); } //Check if a valid handle was retrieved if (st.ok() && h->_reader) { + if (h->_reader->size() == 0) { + // may be an empty file + LOG(INFO) << "Opened inverted index file is empty, file is " << path; + } //Store the file length h->_length = h->_reader->size(); h->_fpos = 0; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h index 357ac65c678..59ae6db1a96 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h @@ -189,7 +189,7 @@ protected: public: static bool open(const io::FileSystemSPtr& fs, const char* path, IndexInput*& ret, - CLuceneError& error, int32_t bufferSize = -1); + CLuceneError& error, int32_t bufferSize = -1, int64_t file_size = -1); ~FSIndexInput() override; IndexInput* clone() const override; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 0208ed635e1..54c77c8afc4 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -90,11 +90,12 @@ std::string file_cache_key_str(const std::string& seg_path) { Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, - const io::FileReaderOptions& reader_options, - std::shared_ptr<Segment>* output) { + const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output, + InvertedIndexFileInfo idx_file_info) { io::FileReaderSPtr file_reader; RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options)); - std::shared_ptr<Segment> segment(new Segment(segment_id, rowset_id, std::move(tablet_schema))); + std::shared_ptr<Segment> segment( + new Segment(segment_id, rowset_id, std::move(tablet_schema), idx_file_info)); segment->_fs = fs; segment->_file_reader = std::move(file_reader); auto st = segment->_open(); @@ -136,11 +137,13 @@ Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t se return Status::OK(); } -Segment::Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema) +Segment::Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, + InvertedIndexFileInfo idx_file_info) : _segment_id(segment_id), _meta_mem_usage(0), _rowset_id(rowset_id), - _tablet_schema(std::move(tablet_schema)) { + _tablet_schema(std::move(tablet_schema)), + _idx_file_info(idx_file_info) { g_total_segment_num << 1; } @@ -184,7 +187,7 @@ Status Segment::_open_inverted_index() { _fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix( _file_reader->path().native())}, - _tablet_schema->get_inverted_index_storage_format()); + _tablet_schema->get_inverted_index_storage_format(), _idx_file_info); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index 2baeadcaf07..dd61e7eb831 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -82,7 +82,7 @@ public: static Status open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, const io::FileReaderOptions& reader_options, - std::shared_ptr<Segment>* output); + std::shared_ptr<Segment>* output, InvertedIndexFileInfo idx_file_info = {}); static io::UInt128Wrapper file_cache_key(std::string_view rowset_id, uint32_t seg_id); io::UInt128Wrapper file_cache_key() const { @@ -195,7 +195,8 @@ public: private: DISALLOW_COPY_AND_ASSIGN(Segment); - Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema); + Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, + InvertedIndexFileInfo idx_file_info = InvertedIndexFileInfo()); // open segment file and read the minimum amount of necessary information (footer) Status _open(); Status _parse_footer(SegmentFooterPB* footer); @@ -271,6 +272,8 @@ private: // inverted index file reader std::shared_ptr<InvertedIndexFileReader> _inverted_index_file_reader; DorisCallOnce<Status> _inverted_index_file_reader_open; + + InvertedIndexFileInfo _idx_file_info; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index f20af3df80a..42e625746f3 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -1093,13 +1093,6 @@ uint64_t SegmentWriter::estimate_segment_size() { return size; } -size_t SegmentWriter::try_get_inverted_index_file_size() { - if (_inverted_index_file_writer != nullptr) { - return _inverted_index_file_writer->get_index_file_size(); - } - return 0; -} - Status SegmentWriter::finalize_columns_data() { if (_has_key) { _row_count = _num_rows_written; @@ -1166,8 +1159,8 @@ Status SegmentWriter::finalize_footer(uint64_t* segment_file_size) { } if (_inverted_index_file_writer != nullptr) { RETURN_IF_ERROR(_inverted_index_file_writer->close()); + _inverted_index_file_info = _inverted_index_file_writer->get_index_file_info(); } - _inverted_index_file_size = try_get_inverted_index_file_size(); return Status::OK(); } @@ -1400,5 +1393,12 @@ Status SegmentWriter::_generate_short_key_index( return Status::OK(); } +int64_t SegmentWriter::get_inverted_index_total_size() { + if (_inverted_index_file_writer != nullptr) { + return _inverted_index_file_writer->get_index_file_total_size(); + } + return 0; +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 41c3d5da3a7..32723e72fb0 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -102,9 +102,10 @@ public: int64_t max_row_to_add(size_t row_avg_size_in_bytes); uint64_t estimate_segment_size(); - size_t try_get_inverted_index_file_size(); - size_t get_inverted_index_file_size() const { return _inverted_index_file_size; } + InvertedIndexFileInfo get_inverted_index_file_info() const { return _inverted_index_file_info; } + int64_t get_inverted_index_total_size(); + uint32_t num_rows_written() const { return _num_rows_written; } // for partial update @@ -197,7 +198,7 @@ private: SegmentFooterPB _footer; size_t _num_key_columns; size_t _num_short_key_columns; - size_t _inverted_index_file_size; + InvertedIndexFileInfo _inverted_index_file_info; std::unique_ptr<ShortKeyIndexBuilder> _short_key_index_builder; std::unique_ptr<PrimaryKeyIndexBuilder> _primary_key_index_builder; std::vector<std::unique_ptr<ColumnWriter>> _column_writers; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 3e23b1fda52..ecb248b7fe9 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -1010,13 +1010,6 @@ uint64_t VerticalSegmentWriter::_estimated_remaining_size() { return size; } -size_t VerticalSegmentWriter::_calculate_inverted_index_file_size() { - if (_inverted_index_file_writer != nullptr) { - return _inverted_index_file_writer->get_index_file_size(); - } - return 0; -} - Status VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) { uint64_t index_start = _file_writer->bytes_appended(); RETURN_IF_ERROR(_write_ordinal_index()); @@ -1035,7 +1028,10 @@ Status VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) { RETURN_IF_ERROR(_write_short_key_index()); *index_size = _file_writer->bytes_appended() - index_start; } - _inverted_index_file_size = _calculate_inverted_index_file_size(); + + if (_inverted_index_file_writer != nullptr) { + _inverted_index_file_info = _inverted_index_file_writer->get_index_file_info(); + } // reset all column writers and data_conveter clear(); @@ -1199,5 +1195,12 @@ void VerticalSegmentWriter::_set_max_key(const Slice& key) { _max_key.append(key.get_data(), key.get_size()); } +int64_t VerticalSegmentWriter::get_inverted_index_total_size() { + if (_inverted_index_file_writer != nullptr) { + return _inverted_index_file_writer->get_index_file_total_size(); + } + return 0; +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index c52deea40a0..831747712b0 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -99,7 +99,9 @@ public: [[nodiscard]] std::string data_dir_path() const { return _data_dir == nullptr ? "" : _data_dir->path(); } - [[nodiscard]] size_t inverted_index_file_size() const { return _inverted_index_file_size; } + [[nodiscard]] InvertedIndexFileInfo get_inverted_index_file_info() const { + return _inverted_index_file_info; + } [[nodiscard]] uint32_t num_rows_written() const { return _num_rows_written; } // for partial update @@ -120,13 +122,14 @@ public: TabletSchemaSPtr flush_schema() const { return _flush_schema; }; + int64_t get_inverted_index_total_size(); + void clear(); private: void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const TabletColumn& column); Status _create_column_writer(uint32_t cid, const TabletColumn& column, const TabletSchemaSPtr& schema); - size_t _calculate_inverted_index_file_size(); uint64_t _estimated_remaining_size(); Status _write_ordinal_index(); Status _write_zone_map(); @@ -171,7 +174,7 @@ private: SegmentFooterPB _footer; size_t _num_key_columns; size_t _num_short_key_columns; - size_t _inverted_index_file_size; + InvertedIndexFileInfo _inverted_index_file_info; std::unique_ptr<ShortKeyIndexBuilder> _short_key_index_builder; std::unique_ptr<PrimaryKeyIndexBuilder> _primary_key_index_builder; std::vector<std::unique_ptr<ColumnWriter>> _column_writers; diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index ee687d18edc..ced0fb880c4 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -205,8 +205,10 @@ Status VerticalBetaRowsetWriter<T>::final_flush() { LOG(WARNING) << "Fail to finalize segment footer, " << st; return st; } - this->_total_data_size += segment_size + segment_writer->get_inverted_index_file_size(); - this->_total_index_size += segment_writer->get_inverted_index_file_size(); + this->_total_data_size += segment_size + segment_writer->get_inverted_index_total_size(); + this->_total_index_size += segment_writer->get_inverted_index_total_size(); + this->_idx_files_info.add_file_info(segment_writer->get_segment_id(), + segment_writer->get_inverted_index_file_info()); segment_writer.reset(); } return Status::OK(); diff --git a/be/src/olap/task/index_builder.cpp b/be/src/olap/task/index_builder.cpp index e4a3332ad17..38a52d1d211 100644 --- a/be/src/olap/task/index_builder.cpp +++ b/be/src/olap/task/index_builder.cpp @@ -310,7 +310,7 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta LOG(ERROR) << "close inverted_index_writer error:" << st; return st; } - inverted_index_size += inverted_index_writer->get_index_file_size(); + inverted_index_size += inverted_index_writer->get_index_file_total_size(); } _inverted_index_file_writers.clear(); output_rowset_meta->set_data_disk_size(output_rowset_meta->data_disk_size() + @@ -465,7 +465,7 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta LOG(ERROR) << "close inverted_index_writer error:" << st; return st; } - inverted_index_size += inverted_index_file_writer->get_index_file_size(); + inverted_index_size += inverted_index_file_writer->get_index_file_total_size(); } _inverted_index_builders.clear(); _inverted_index_file_writers.clear(); diff --git a/be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp b/be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp index 7ebf89300da..0482ae7e1b5 100644 --- a/be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp +++ b/be/test/olap/rowset/segment_v2/inverted_index_array_test.cpp @@ -59,9 +59,12 @@ public: const std::string kTestDir = "./ut_dir/inverted_index_array_test"; void check_terms_stats(string dir_str, string file_str) { - auto fs = io::global_local_filesystem(); - std::unique_ptr<DorisCompoundReader> reader = std::make_unique<DorisCompoundReader>( - DorisFSDirectoryFactory::getDirectory(fs, dir_str.c_str()), file_str.c_str(), 4096); + CLuceneError err; + CL_NS(store)::IndexInput* index_input = nullptr; + DorisFSDirectory::FSIndexInput::open(io::global_local_filesystem(), file_str.c_str(), + index_input, err, 4096); + std::unique_ptr<DorisCompoundReader> reader = + std::make_unique<DorisCompoundReader>(index_input, 4096); std::cout << "Term statistics for " << file_str << std::endl; std::cout << "==================================" << std::endl; lucene::store::Directory* dir = reader.get(); diff --git a/gensrc/proto/olap_common.proto b/gensrc/proto/olap_common.proto index e60aa7603fc..a305f2ce460 100644 --- a/gensrc/proto/olap_common.proto +++ b/gensrc/proto/olap_common.proto @@ -62,4 +62,14 @@ message PTopNCounter { enum FileType { SEGMENT_FILE = 1; INVERTED_INDEX_FILE = 2; -} \ No newline at end of file +} + +message InvertedIndexFileInfo { + message IndexInfo { + required int64 index_id = 1; + required int64 index_file_size = 2 [default = -1]; + optional string index_suffix = 3; + } + repeated IndexInfo index_info = 1; // for inverted index v1 + optional int64 index_size = 2; // for inverted index v2 +} diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 69ac88d4a72..9f5dad5886e 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -131,6 +131,9 @@ message RowsetMetaPB { // the segments_file_size maybe is empty or error optional bool enable_segments_file_size = 1004; optional bool has_variant_type_in_schema = 1005; + + optional bool enable_inverted_index_file_info = 1006; + repeated InvertedIndexFileInfo inverted_index_file_info = 1007; } message SchemaDictKeyList { @@ -214,6 +217,9 @@ message RowsetMetaCloudPB { optional bool has_variant_type_in_schema = 104; // dict key lists for compress schema info optional SchemaDictKeyList schema_dict_key_list = 105; + + optional bool enable_inverted_index_file_info = 106; + repeated InvertedIndexFileInfo inverted_index_file_info = 107; } message SegmentStatisticsPB { diff --git a/regression-test/data/inverted_index_p0/test_compound_reader_fault_injection.out b/regression-test/data/inverted_index_p0/test_compound_reader_fault_injection.out new file mode 100644 index 00000000000..cc8db5dd8b6 --- /dev/null +++ b/regression-test/data/inverted_index_p0/test_compound_reader_fault_injection.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +5 + diff --git a/regression-test/data/inverted_index_p0/test_inverted_index_file_size.out b/regression-test/data/inverted_index_p0/test_inverted_index_file_size.out new file mode 100644 index 00000000000..37d0b96d8a1 --- /dev/null +++ b/regression-test/data/inverted_index_p0/test_inverted_index_file_size.out @@ -0,0 +1,49 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +3860 + +-- !sql -- +125 + +-- !sql -- +3860 + +-- !sql -- +125 + +-- !sql -- +3860 + +-- !sql -- +125 + +-- !sql -- +3860 + +-- !sql -- +125 + +-- !sql -- +3860 + +-- !sql -- +125 + +-- !sql -- +3860 + +-- !sql -- +125 + +-- !sql -- +3860 + +-- !sql -- +125 + +-- !sql -- +3860 + +-- !sql -- +125 + diff --git a/regression-test/data/inverted_index_p0/test_inverted_index_v2_file_size.out b/regression-test/data/inverted_index_p0/test_inverted_index_v2_file_size.out new file mode 100644 index 00000000000..bdb27e98fa9 --- /dev/null +++ b/regression-test/data/inverted_index_p0/test_inverted_index_v2_file_size.out @@ -0,0 +1,85 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 andy andy love apple 100 +1 bason bason hate pear 99 +2 andy andy love apple 100 +2 bason bason hate pear 99 +3 andy andy love apple 100 +3 bason bason hate pear 99 + +-- !sql -- +1 andy andy love apple 100 +2 andy andy love apple 100 +3 andy andy love apple 100 + +-- !sql -- +1 bason bason hate pear 99 +2 bason bason hate pear 99 +3 bason bason hate pear 99 + +-- !sql -- +1 bason bason hate pear 99 +2 bason bason hate pear 99 +3 bason bason hate pear 99 + +-- !sql -- +1 andy andy love apple 100 +1 bason bason hate pear 99 +2 andy andy love apple 100 +2 bason bason hate pear 99 +3 andy andy love apple 100 +3 bason bason hate pear 99 + +-- !sql -- +1 andy andy love apple 100 +2 andy andy love apple 100 +3 andy andy love apple 100 + +-- !sql -- +1 bason bason hate pear 99 +2 bason bason hate pear 99 +3 bason bason hate pear 99 + +-- !sql -- +1 bason bason hate pear 99 +2 bason bason hate pear 99 +3 bason bason hate pear 99 + +-- !sql -- +1 andy andy love apple 100 +1 andy andy love apple 100 +1 bason bason hate pear 99 +1 bason bason hate pear 99 +2 andy andy love apple 100 +2 andy andy love apple 100 +2 bason bason hate pear 99 +2 bason bason hate pear 99 +3 andy andy love apple 100 +3 andy andy love apple 100 +3 bason bason hate pear 99 +3 bason bason hate pear 99 + +-- !sql -- +1 andy andy love apple 100 +1 andy andy love apple 100 +2 andy andy love apple 100 +2 andy andy love apple 100 +3 andy andy love apple 100 +3 andy andy love apple 100 + +-- !sql -- +1 bason bason hate pear 99 +1 bason bason hate pear 99 +2 bason bason hate pear 99 +2 bason bason hate pear 99 +3 bason bason hate pear 99 +3 bason bason hate pear 99 + +-- !sql -- +1 bason bason hate pear 99 +1 bason bason hate pear 99 +2 bason bason hate pear 99 +2 bason bason hate pear 99 +3 bason bason hate pear 99 +3 bason bason hate pear 99 + diff --git a/regression-test/suites/inverted_index_p0/test_compound_reader_fault_injection.groovy b/regression-test/suites/inverted_index_p0/test_compound_reader_fault_injection.groovy new file mode 100644 index 00000000000..8cf49742fe3 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/test_compound_reader_fault_injection.groovy @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +suite("test_compound_reader_fault_injection", "nonConcurrent") { + // define a sql table + def testTable = "httplogs" + + sql "DROP TABLE IF EXISTS ${testTable}" + sql """ + CREATE TABLE ${testTable} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` string NULL COMMENT "", + `request` string NULL COMMENT "", + `status` string NULL COMMENT "", + `size` string NULL COMMENT "", + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "inverted_index_storage_format" = "V1" + ); + """ + + sql """ INSERT INTO ${testTable} VALUES (893964617, '40.135.0.0', 'GET /images/hm_bg.jpg HTTP/1.0', 200, 24736); """ + sql """ INSERT INTO ${testTable} VALUES (893964653, '232.0.0.0', 'GET /images/hm_bg.jpg HTTP/1.0', 200, 3781); """ + sql """ INSERT INTO ${testTable} VALUES (893964672, '26.1.0.0', 'GET /images/hm_bg.jpg HTTP/1.0', 304, 0); """ + sql """ INSERT INTO ${testTable} VALUES (893964672, '26.1.0.0', 'GET /images/hm_bg.jpg HTTP/1.0', 304, 0); """ + sql """ INSERT INTO ${testTable} VALUES (893964653, '232.0.0.0', 'GET /images/hm_bg.jpg HTTP/1.0', 200, 3781); """ + + sql 'sync' + + try { + GetDebugPoint().enableDebugPointForAllBEs("construct_DorisCompoundReader_failed") + try { + sql """ select count() from ${testTable} where (request match 'HTTP'); """ + } catch (Exception e) { + log.info(e.getMessage()) + assertTrue(e.getMessage().contains("construct_DorisCompoundReader_failed")) + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs("construct_DorisCompoundReader_failed") + qt_sql """ select count() from ${testTable} where (request match 'HTTP'); """ + } +} diff --git a/regression-test/suites/inverted_index_p0/test_inverted_index_file_size.groovy b/regression-test/suites/inverted_index_p0/test_inverted_index_file_size.groovy new file mode 100644 index 00000000000..a2748cb93c9 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/test_inverted_index_file_size.groovy @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +suite("test_inverted_index_file_size", "nonConcurrent"){ + def tableName = "test_inverted_index_file_size" + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_config = { key, value -> + + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def load_data = { + // load the json data + streamLoad { + table "${tableName}" + + set 'read_json_by_line', 'true' + set 'format', 'json' + file 'documents-1000.json' // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def run_compaction_and_wait = { + //TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + // trigger compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + if (compactJson.status.toLowerCase() == "fail") { + logger.info("Compaction was done automatically!") + } else { + assertEquals("success", compactJson.status.toLowerCase()) + } + } + + // wait for all compactions done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + } + + def test_table = { format -> + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE ${tableName} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` varchar(20) NULL COMMENT "", + `request` text NULL COMMENT "", + `status` varchar(11) NULL COMMENT "", + `size` int(11) NULL COMMENT "", + INDEX clientip_idx (`clientip`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '', + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '', + INDEX status_idx (`status`) USING INVERTED COMMENT '', + INDEX size_idx (`size`) USING INVERTED COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY RANDOM BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "inverted_index_storage_format" = "${format}" + ); + """ + + load_data.call() + load_data.call() + load_data.call() + load_data.call() + load_data.call() + + qt_sql """ select count() from ${tableName} where clientip match '17.0.0.0' and request match 'GET' and status match '200' and size > 200 """ + qt_sql """ select count() from ${tableName} where clientip match_phrase '17.0.0.0' and request match_phrase 'GET' and status match '200' and size > 200 """ + run_compaction_and_wait.call() + qt_sql """ select count() from ${tableName} where clientip match '17.0.0.0' and request match 'GET' and status match '200' and size > 200 """ + qt_sql """ select count() from ${tableName} where clientip match_phrase '17.0.0.0' and request match_phrase 'GET' and status match '200' and size > 200 """ + + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("file_size_not_in_rowset_meta") + set_be_config.call("inverted_index_compaction_enable", "true") + test_table.call("V1") + test_table.call("V2") + set_be_config.call("inverted_index_compaction_enable", "false") + test_table.call("V1") + test_table.call("V2") + } finally { + GetDebugPoint().disableDebugPointForAllBEs("file_size_not_in_rowset_meta") + set_be_config.call("inverted_index_compaction_enable", "true") + } + +} \ No newline at end of file diff --git a/regression-test/suites/inverted_index_p0/test_inverted_index_v2_file_size.groovy b/regression-test/suites/inverted_index_p0/test_inverted_index_v2_file_size.groovy new file mode 100644 index 00000000000..1484b7284a3 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/test_inverted_index_v2_file_size.groovy @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_index_index_V2_file_size", "nonConcurrent") { + def isCloudMode = isCloudMode() + def tableName = "test_index_index_V2_file_size" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_config = { key, value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def trigger_full_compaction_on_tablets = { tablets -> + for (def tablet : tablets) { + String tablet_id = tablet.TabletId + String backend_id = tablet.BackendId + int times = 1 + + String compactionStatus; + do{ + def (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + ++times + sleep(2000) + compactionStatus = parseJson(out.trim()).status.toLowerCase(); + } while (compactionStatus!="success" && times<=10) + + + if (compactionStatus == "fail") { + logger.info("Compaction was done automatically!") + } + } + } + + def wait_full_compaction_done = { tablets -> + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + String backend_id = tablet.BackendId + def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + } + + def get_rowset_count = { tablets -> + int rowsetCount = 0 + for (def tablet in tablets) { + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List<String>) tabletJson.rowsets).size() + } + return rowsetCount + } + + boolean invertedIndexCompactionEnable = false + try { + String backend_id; + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "inverted_index_compaction_enable") { + invertedIndexCompactionEnable = Boolean.parseBoolean(((List<String>) ele)[2]) + logger.info("inverted_index_compaction_enable: ${((List<String>) ele)[2]}") + } + } + set_be_config.call("inverted_index_compaction_enable", "true") + + sql """ DROP TABLE IF EXISTS ${tableName}; """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `hobbies` text NULL, + `score` int(11) NULL, + index index_name (name) using inverted, + index index_hobbies (hobbies) using inverted properties("parser"="english"), + index index_score (score) using inverted + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( "replication_num" = "1", "disable_auto_compaction" = "true"); + """ + + //TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100); """ + sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99); """ + sql """ INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 100); """ + sql """ INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 99); """ + sql """ INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 100); """ + sql """ INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 99); """ + + GetDebugPoint().enableDebugPointForAllBEs("match.invert_index_not_support_execute_match") + + qt_sql """ select * from ${tableName} order by id, name, hobbies, score """ + qt_sql """ select * from ${tableName} where name match "andy" order by id, name, hobbies, score """ + qt_sql """ select * from ${tableName} where hobbies match "pear" order by id, name, hobbies, score """ + qt_sql """ select * from ${tableName} where score < 100 order by id, name, hobbies, score """ + + // trigger full compactions for all tablets in ${tableName} + trigger_full_compaction_on_tablets.call(tablets) + + // wait for full compaction done + wait_full_compaction_done.call(tablets) + + def dedup_tablets = deduplicate_tablets(tablets) + + // In the p0 testing environment, there are no expected operations such as scaling down BE (backend) services + // if tablets or dedup_tablets is empty, exception is thrown, and case fail + int replicaNum = Math.floor(tablets.size() / dedup_tablets.size()) + if (replicaNum != 1 && replicaNum != 3) + { + assert(false); + } + + // after full compaction, there is only 1 rowset. + def count = get_rowset_count.call(tablets); + if (isCloudMode) { + assert (count == (1 + 1) * replicaNum) + } else { + assert (count == 1 * replicaNum) + } + + qt_sql """ select * from ${tableName} order by id, name, hobbies, score """ + qt_sql """ select * from ${tableName} where name match "andy" order by id, name, hobbies, score """ + qt_sql """ select * from ${tableName} where hobbies match "pear" order by id, name, hobbies, score """ + qt_sql """ select * from ${tableName} where score < 100 order by id, name, hobbies, score """ + + // insert more data and trigger full compaction again + sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100); """ + sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99); """ + sql """ INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 100); """ + sql """ INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 99); """ + sql """ INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 100); """ + sql """ INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 99); """ + + set_be_config.call("inverted_index_compaction_enable", "false") + // trigger full compactions for all tablets in ${tableName} + trigger_full_compaction_on_tablets.call(tablets) + + // wait for full compaction done + wait_full_compaction_done.call(tablets) + + // after full compaction, there is only 1 rowset. + count = get_rowset_count.call(tablets); + if (isCloudMode) { + assert (count == (1 + 1) * replicaNum) + } else { + assert (count == 1 * replicaNum) + } + + qt_sql """ select * from ${tableName} order by id, name, hobbies, score """ + qt_sql """ select * from ${tableName} where name match "andy" order by id, name, hobbies, score """ + qt_sql """ select * from ${tableName} where hobbies match "pear" order by id, name, hobbies, score """ + qt_sql """ select * from ${tableName} where score < 100 order by id, name, hobbies, score """ + } finally { + GetDebugPoint().disableDebugPointForAllBEs("match.invert_index_not_support_execute_match") + set_be_config.call("inverted_index_compaction_enable", invertedIndexCompactionEnable.toString()) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org