This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit daa171ee3a7df19135f364d0b532c07833af65c4 Author: airborne12 <airborn...@gmail.com> AuthorDate: Fri Mar 1 21:29:00 2024 +0800 [Update](cloud) add inverted index tmp dir support (#31484) --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 3 ++ .../inverted_index_compound_directory.cpp | 3 +- .../rowset/segment_v2/inverted_index_writer.cpp | 15 +++++++-- .../olap/rowset/segment_v2/inverted_index_writer.h | 36 ++++++++++++++++++++++ be/src/runtime/exec_env.h | 5 +++ be/src/runtime/exec_env_init.cpp | 2 ++ 7 files changed, 63 insertions(+), 3 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f7f4cc99a4b..f534e70126d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -907,6 +907,8 @@ DEFINE_Validator(file_cache_type, [](std::string_view config) -> bool { return config.empty() || config == "file_block_cache"; }); +DEFINE_String(tmp_file_dir, "tmp"); + DEFINE_Int32(s3_transfer_executor_pool_size, "2"); DEFINE_Bool(enable_time_lut, "true"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 6fcbecd8f71..70713447ca1 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1256,6 +1256,9 @@ DECLARE_mBool(check_segment_when_build_rowset_meta); // max s3 client retry times DECLARE_mInt32(max_s3_client_retry); +// write as inverted index tmp directory +DECLARE_String(tmp_file_dir); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp index 4ed3258d74d..1a0a3e9ab1d 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp @@ -399,7 +399,8 @@ void DorisCompoundDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_ void DorisCompoundDirectory::FSIndexOutput::init(const io::FileSystemSPtr& fileSystem, const char* path) { - Status status = fileSystem->create_file(path, &_writer); + io::FileWriterOptions opts {.create_empty_file = false}; + Status status = fileSystem->create_file(path, &_writer, &opts); DBUG_EXECUTE_IF( "DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_" "init", diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp index 07bea0c83f3..9cd9897f6f5 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp @@ -32,11 +32,13 @@ #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wshadow-field" #endif + #include "CLucene/analysis/standard95/StandardAnalyzer.h" + #ifdef __clang__ #pragma clang diagnostic pop #endif -#include "common/config.h" + #include "olap/field.h" #include "olap/inverted_index_parser.h" #include "olap/key_coder.h" @@ -50,6 +52,7 @@ #include "olap/tablet_schema.h" #include "olap/types.h" #include "runtime/collection_value.h" +#include "runtime/exec_env.h" #include "util/debug_points.h" #include "util/faststring.h" #include "util/slice.h" @@ -209,8 +212,14 @@ public: return Status::InternalError("init_fulltext_index directory already exists"); } + auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir(); + _lfs = io::global_local_filesystem(); + auto lfs_index_path = InvertedIndexDescriptor::get_temporary_index_path( + tmp_file_dir / _segment_file_name, _index_meta->index_id(), + _index_meta->get_index_suffix()); dir = std::unique_ptr<DorisCompoundDirectory>(DorisCompoundDirectoryFactory::getDirectory( - _fs, index_path.c_str(), use_compound_file_writer, can_use_ram_dir)); + _lfs, lfs_index_path.c_str(), use_compound_file_writer, can_use_ram_dir, nullptr, + _fs, index_path.c_str())); return Status::OK(); } @@ -451,6 +460,7 @@ public: } return Status::OK(); } + Status add_array_values(size_t field_size, const CollectionValue* values, size_t count) override { if constexpr (field_is_slice_type(field_type)) { @@ -620,6 +630,7 @@ private: std::string _segment_file_name; std::string _directory; io::FileSystemSPtr _fs; + io::FileSystemSPtr _lfs; const KeyCoder* _value_key_coder; const TabletIndex* _index_meta; InvertedIndexParserType _parser_type; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.h b/be/src/olap/rowset/segment_v2/inverted_index_writer.h index 44cc41789b5..fee81f8235a 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.h @@ -21,15 +21,22 @@ #include <stddef.h> #include <stdint.h> +#include <atomic> #include <memory> #include <string> +#include "common/config.h" #include "common/status.h" +#include "gutil/strings/split.h" #include "io/fs/file_system.h" +#include "io/fs/local_file_system.h" +#include "olap/options.h" namespace doris { class CollectionValue; + class Field; + class TabletIndex; namespace segment_v2 { @@ -66,5 +73,34 @@ private: DISALLOW_COPY_AND_ASSIGN(InvertedIndexColumnWriter); }; +class TmpFileDirs { +public: + TmpFileDirs(const std::vector<doris::StorePath>& store_paths) { + for (const auto& store_path : store_paths) { + _tmp_file_dirs.emplace_back(store_path.path + "/" + config::tmp_file_dir); + } + }; + + Status init() { + for (auto& tmp_file_dir : _tmp_file_dirs) { + bool exists = true; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_file_dir, &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_file_dir)); + } + } + return Status::OK(); + }; + + io::Path get_tmp_file_dir() { + size_t cur_index = _next_index.fetch_add(1); + return _tmp_file_dirs[cur_index % _tmp_file_dirs.size()]; + }; + +private: + std::vector<io::Path> _tmp_file_dirs; + std::atomic_size_t _next_index {0}; // use for round-robin +}; + } // namespace segment_v2 } // namespace doris diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 89d871be293..1fe5d14372a 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -33,6 +33,7 @@ #include "olap/memtable_memory_limiter.h" #include "olap/olap_define.h" #include "olap/options.h" +#include "olap/rowset/segment_v2/inverted_index_writer.h" #include "olap/tablet_fwd.h" #include "pipeline/pipeline_tracing.h" #include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove this include header @@ -58,6 +59,7 @@ class FileCacheFactory; namespace segment_v2 { class InvertedIndexSearcherCache; class InvertedIndexQueryCache; +class TmpFileDirs; } // namespace segment_v2 class WorkloadSchedPolicyMgr; @@ -277,6 +279,8 @@ public: return _pipeline_tracer_ctx.get(); } + segment_v2::TmpFileDirs* get_tmp_file_dirs() { return _tmp_file_dirs.get(); } + private: ExecEnv(); @@ -390,6 +394,7 @@ private: RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr; std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx; + std::unique_ptr<segment_v2::TmpFileDirs> _tmp_file_dirs; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 2f6ac61d646..de3b9a2f32f 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -149,6 +149,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, } init_doris_metrics(store_paths); _store_paths = store_paths; + _tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths); + RETURN_IF_ERROR(_tmp_file_dirs->init()); _user_function_cache = new UserFunctionCache(); static_cast<void>(_user_function_cache->init(doris::config::user_function_dir)); _external_scan_context_mgr = new ExternalScanContextMgr(this); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org