This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b0519d29efa [fix](inverted index) Writing to the inverted index also writes to the file cache. (#39076) b0519d29efa is described below commit b0519d29efa43801ae5dd5dfa1e3849632bf731a Author: zzzxl <33418555+zzzxl1...@users.noreply.github.com> AuthorDate: Tue Aug 13 16:38:55 2024 +0800 [fix](inverted index) Writing to the inverted index also writes to the file cache. (#39076) 1. When write_file_cache is true, writing to the inverted index also writes to the file cache. --- be/src/olap/compaction.cpp | 3 + be/src/olap/rowset/beta_rowset_writer.cpp | 8 +- be/src/olap/rowset/rowset_writer_context.h | 10 ++ .../segment_v2/inverted_index_file_writer.cpp | 3 + .../rowset/segment_v2/inverted_index_file_writer.h | 3 + .../segment_v2/inverted_index_fs_directory.cpp | 18 +++- .../segment_v2/inverted_index_fs_directory.h | 9 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 2 + .../rowset/segment_v2/vertical_segment_writer.cpp | 2 + be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 9 +- .../test_index_writer_file_cache.groovy | 116 +++++++++++++++++++++ 11 files changed, 162 insertions(+), 21 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 8c109eec1c1..9ed27bad382 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -686,6 +686,9 @@ Status Compaction::do_inverted_index_compaction() { << st; return st; } + for (const auto& writer : inverted_index_file_writers) { + writer->set_file_writer_opts(ctx.get_file_writer_options()); + } } // use tmp file dir to store index files diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index f3a0ade24f3..ec1bba7621b 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -846,13 +846,7 @@ Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) { Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path, io::FileWriterPtr& file_writer) { - io::FileWriterOptions opts { - .write_file_cache = _context.write_file_cache, - .is_cold_data = _context.is_hot_data, - .file_cache_expiration = - _context.file_cache_ttl_sec > 0 && _context.newest_write_timestamp > 0 - ? _context.newest_write_timestamp + _context.file_cache_ttl_sec - : 0}; + io::FileWriterOptions opts = _context.get_file_writer_options(); Status st = _context.fs()->create_file(path, &file_writer, &opts); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index 0130916bfb4..e13f7efe6e9 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -140,6 +140,16 @@ struct RowsetWriterContext { return *storage_resource->fs; } } + + io::FileWriterOptions get_file_writer_options() const { + io::FileWriterOptions opts { + .write_file_cache = write_file_cache, + .is_cold_data = is_hot_data, + .file_cache_expiration = file_cache_ttl_sec > 0 && newest_write_timestamp > 0 + ? newest_write_timestamp + file_cache_ttl_sec + : 0}; + return opts; + } }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp index f2ac0e92265..6eb54878924 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp @@ -283,6 +283,7 @@ size_t InvertedIndexFileWriter::write_v1() { ram_dir.close(); auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, idx_path.c_str()); + out_dir->set_file_writer_opts(_opts); auto* out = out_dir->createOutput(idx_name.c_str()); if (out == nullptr) { @@ -348,6 +349,8 @@ size_t InvertedIndexFileWriter::write_v2() { io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)}; auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, index_path.parent_path().c_str()); + out_dir->set_file_writer_opts(_opts); + std::unique_ptr<lucene::store::IndexOutput> compound_file_output; // idx v2 writer != nullptr means memtable on sink node now if (_idx_v2_writer != nullptr) { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h index b9f9b983e44..024c1dec986 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h @@ -71,6 +71,8 @@ public: lucene::store::IndexOutput* output, uint8_t* buffer, int64_t bufferLength); InvertedIndexStorageFormatPB get_storage_format() const { return _storage_format; } + void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts = opts; } + private: InvertedIndexDirectoryMap _indices_dirs; const io::FileSystemSPtr _fs; @@ -81,6 +83,7 @@ private: size_t _file_size = 0; // write to disk or stream io::FileWriterPtr _idx_v2_writer; + io::FileWriterOptions _opts; }; } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp index 0443bf345ba..27e03b43da2 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp @@ -84,9 +84,6 @@ namespace doris::segment_v2 { const char* const DorisFSDirectory::WRITE_LOCK_FILE = "write.lock"; class DorisFSDirectory::FSIndexOutput : public lucene::store::BufferedIndexOutput { -private: - io::FileWriterPtr _writer; - protected: void flushBuffer(const uint8_t* b, const int32_t size) override; @@ -96,6 +93,12 @@ public: ~FSIndexOutput() override; void close() override; int64_t length() const override; + + void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts = opts; } + +private: + io::FileWriterPtr _writer; + io::FileWriterOptions _opts; }; class DorisFSDirectory::FSIndexOutputV2 : public lucene::store::BufferedIndexOutput { @@ -242,7 +245,13 @@ void DorisFSDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_t len) } void DorisFSDirectory::FSIndexOutput::init(const io::FileSystemSPtr& fs, const char* path) { - Status status = fs->create_file(path, &_writer); + DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput::init.file_cache", { + if (fs->type() == io::FileSystemType::S3 && _opts.write_file_cache == false) { + _CLTHROWA(CL_ERR_IO, "Inverted index failed to enter file cache"); + } + }); + + Status status = fs->create_file(path, &_writer, &_opts); DBUG_EXECUTE_IF( "DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_" "init", @@ -579,6 +588,7 @@ lucene::store::IndexOutput* DorisFSDirectory::createOutput(const char* name) { assert(!exists); } auto* ret = _CLNEW FSIndexOutput(); + ret->set_file_writer_opts(_opts); try { ret->init(_fs, fl); } catch (CLuceneError& err) { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h index b3e0352d7ad..357ac65c678 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h @@ -29,6 +29,7 @@ #include "CLucene/SharedHeader.h" #include "io/fs/file_reader_writer_fwd.h" #include "io/fs/file_system.h" +#include "io/fs/file_writer.h" #include "io/io_common.h" class CLuceneError; @@ -46,8 +47,6 @@ class CLUCENE_EXPORT DorisFSDirectory : public lucene::store::Directory { public: static const char* const WRITE_LOCK_FILE; static const int64_t MAX_HEADER_DATA_SIZE = 1024 * 128; // 128k -private: - int filemode; protected: mutable std::mutex _this_lock; @@ -91,6 +90,12 @@ public: virtual void init(const io::FileSystemSPtr& fs, const char* path, lucene::store::LockFactory* lock_factory = nullptr); + + void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts = opts; } + +private: + int32_t filemode; + io::FileWriterOptions _opts; }; class CLUCENE_EXPORT DorisRAMFSDirectory : public DorisFSDirectory { diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 36b200fe8e3..f20af3df80a 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -139,6 +139,8 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, _opts.rowset_ctx->rowset_id.to_string(), segment_id, _tablet_schema->get_inverted_index_storage_format(), std::move(inverted_file_writer)); + _inverted_index_file_writer->set_file_writer_opts( + _opts.rowset_ctx->get_file_writer_options()); } } diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index aa9376a8d78..3e23b1fda52 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -120,6 +120,8 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32 _opts.rowset_ctx->rowset_id.to_string(), segment_id, _tablet_schema->get_inverted_index_storage_format(), std::move(inverted_file_writer)); + _inverted_index_file_writer->set_file_writer_opts( + _opts.rowset_ctx->get_file_writer_options()); } } diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index 1db74843697..ee687d18edc 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -165,14 +165,7 @@ Status VerticalBetaRowsetWriter<T>::_create_segment_writer( int seg_id = this->_num_segment.fetch_add(1, std::memory_order_relaxed); io::FileWriterPtr file_writer; - io::FileWriterOptions opts { - .write_file_cache = this->_context.write_file_cache, - .is_cold_data = this->_context.is_hot_data, - .file_cache_expiration = this->_context.file_cache_ttl_sec > 0 && - this->_context.newest_write_timestamp > 0 - ? this->_context.newest_write_timestamp + - this->_context.file_cache_ttl_sec - : 0}; + io::FileWriterOptions opts = this->_context.get_file_writer_options(); auto path = context.segment_path(seg_id); auto& fs = context.fs_ref(); diff --git a/regression-test/suites/fault_injection_p0/test_index_writer_file_cache.groovy b/regression-test/suites/fault_injection_p0/test_index_writer_file_cache.groovy new file mode 100644 index 00000000000..b26794e3671 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_index_writer_file_cache.groovy @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +suite("test_index_writer_file_cache_fault_injection", "nonConcurrent") { + if (!isCloudMode()) { + return; + } + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def testTable1 = "test_index_writer_file_cache_fault_injection_1" + def testTable2 = "test_index_writer_file_cache_fault_injection_2" + + sql "DROP TABLE IF EXISTS ${testTable1}" + sql """ + CREATE TABLE ${testTable1} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` string NULL COMMENT "", + `request` string NULL COMMENT "", + `status` int(11) NULL COMMENT "", + `size` int(11) NULL COMMENT "", + INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '', + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT '', + INDEX status_idx (`status`) USING INVERTED COMMENT '', + INDEX size_idx (`size`) USING INVERTED COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + sql "DROP TABLE IF EXISTS ${testTable2}" + sql """ + CREATE TABLE ${testTable2} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` string NULL COMMENT "", + `request` string NULL COMMENT "", + `status` int(11) NULL COMMENT "", + `size` int(11) NULL COMMENT "", + INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '', + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT '', + INDEX status_idx (`status`) USING INVERTED COMMENT '', + INDEX size_idx (`size`) USING INVERTED COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + def insert_and_compaction = { tableName -> + sql """ INSERT INTO ${tableName} VALUES (893964617, '40.135.0.0', 'GET /images/hm_bg.jpg HTTP/1.0', 200, 24736); """ + sql """ INSERT INTO ${tableName} VALUES (893964653, '232.0.0.0', 'GET /images/hm_bg.jpg HTTP/1.0', 200, 3781); """ + sql """ INSERT INTO ${tableName} VALUES (893964672, '26.1.0.0', 'GET /images/hm_bg.jpg HTTP/1.0', 304, 0); """ + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + String backend_id = tablet.BackendId + def (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertEquals("success", compactJson.status.toLowerCase()) + } + + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + String backend_id = tablet.BackendId + def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput::init.file_cache") + + insert_and_compaction.call(testTable1); + insert_and_compaction.call(testTable2); + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput::init.file_cache") + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org