This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit daa171ee3a7df19135f364d0b532c07833af65c4
Author: airborne12 <airborn...@gmail.com>
AuthorDate: Fri Mar 1 21:29:00 2024 +0800

    [Update](cloud) add inverted index tmp dir support (#31484)
---
 be/src/common/config.cpp                           |  2 ++
 be/src/common/config.h                             |  3 ++
 .../inverted_index_compound_directory.cpp          |  3 +-
 .../rowset/segment_v2/inverted_index_writer.cpp    | 15 +++++++--
 .../olap/rowset/segment_v2/inverted_index_writer.h | 36 ++++++++++++++++++++++
 be/src/runtime/exec_env.h                          |  5 +++
 be/src/runtime/exec_env_init.cpp                   |  2 ++
 7 files changed, 63 insertions(+), 3 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f7f4cc99a4b..f534e70126d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -907,6 +907,8 @@ DEFINE_Validator(file_cache_type, [](std::string_view 
config) -> bool {
     return config.empty() || config == "file_block_cache";
 });
 
+DEFINE_String(tmp_file_dir, "tmp");
+
 DEFINE_Int32(s3_transfer_executor_pool_size, "2");
 
 DEFINE_Bool(enable_time_lut, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 6fcbecd8f71..70713447ca1 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1256,6 +1256,9 @@ DECLARE_mBool(check_segment_when_build_rowset_meta);
 // max s3 client retry times
 DECLARE_mInt32(max_s3_client_retry);
 
+// write as inverted index tmp directory
+DECLARE_String(tmp_file_dir);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git 
a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
index 4ed3258d74d..1a0a3e9ab1d 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
@@ -399,7 +399,8 @@ void 
DorisCompoundDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_
 
 void DorisCompoundDirectory::FSIndexOutput::init(const io::FileSystemSPtr& 
fileSystem,
                                                  const char* path) {
-    Status status = fileSystem->create_file(path, &_writer);
+    io::FileWriterOptions opts {.create_empty_file = false};
+    Status status = fileSystem->create_file(path, &_writer, &opts);
     DBUG_EXECUTE_IF(
             
"DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_"
             "init",
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index 07bea0c83f3..9cd9897f6f5 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -32,11 +32,13 @@
 #pragma clang diagnostic push
 #pragma clang diagnostic ignored "-Wshadow-field"
 #endif
+
 #include "CLucene/analysis/standard95/StandardAnalyzer.h"
+
 #ifdef __clang__
 #pragma clang diagnostic pop
 #endif
-#include "common/config.h"
+
 #include "olap/field.h"
 #include "olap/inverted_index_parser.h"
 #include "olap/key_coder.h"
@@ -50,6 +52,7 @@
 #include "olap/tablet_schema.h"
 #include "olap/types.h"
 #include "runtime/collection_value.h"
+#include "runtime/exec_env.h"
 #include "util/debug_points.h"
 #include "util/faststring.h"
 #include "util/slice.h"
@@ -209,8 +212,14 @@ public:
             return Status::InternalError("init_fulltext_index directory 
already exists");
         }
 
+        auto tmp_file_dir = 
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
+        _lfs = io::global_local_filesystem();
+        auto lfs_index_path = 
InvertedIndexDescriptor::get_temporary_index_path(
+                tmp_file_dir / _segment_file_name, _index_meta->index_id(),
+                _index_meta->get_index_suffix());
         dir = 
std::unique_ptr<DorisCompoundDirectory>(DorisCompoundDirectoryFactory::getDirectory(
-                _fs, index_path.c_str(), use_compound_file_writer, 
can_use_ram_dir));
+                _lfs, lfs_index_path.c_str(), use_compound_file_writer, 
can_use_ram_dir, nullptr,
+                _fs, index_path.c_str()));
         return Status::OK();
     }
 
@@ -451,6 +460,7 @@ public:
         }
         return Status::OK();
     }
+
     Status add_array_values(size_t field_size, const CollectionValue* values,
                             size_t count) override {
         if constexpr (field_is_slice_type(field_type)) {
@@ -620,6 +630,7 @@ private:
     std::string _segment_file_name;
     std::string _directory;
     io::FileSystemSPtr _fs;
+    io::FileSystemSPtr _lfs;
     const KeyCoder* _value_key_coder;
     const TabletIndex* _index_meta;
     InvertedIndexParserType _parser_type;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.h 
b/be/src/olap/rowset/segment_v2/inverted_index_writer.h
index 44cc41789b5..fee81f8235a 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.h
@@ -21,15 +21,22 @@
 #include <stddef.h>
 #include <stdint.h>
 
+#include <atomic>
 #include <memory>
 #include <string>
 
+#include "common/config.h"
 #include "common/status.h"
+#include "gutil/strings/split.h"
 #include "io/fs/file_system.h"
+#include "io/fs/local_file_system.h"
+#include "olap/options.h"
 
 namespace doris {
 class CollectionValue;
+
 class Field;
+
 class TabletIndex;
 
 namespace segment_v2 {
@@ -66,5 +73,34 @@ private:
     DISALLOW_COPY_AND_ASSIGN(InvertedIndexColumnWriter);
 };
 
+class TmpFileDirs {
+public:
+    TmpFileDirs(const std::vector<doris::StorePath>& store_paths) {
+        for (const auto& store_path : store_paths) {
+            _tmp_file_dirs.emplace_back(store_path.path + "/" + 
config::tmp_file_dir);
+        }
+    };
+
+    Status init() {
+        for (auto& tmp_file_dir : _tmp_file_dirs) {
+            bool exists = true;
+            
RETURN_IF_ERROR(io::global_local_filesystem()->exists(tmp_file_dir, &exists));
+            if (!exists) {
+                
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_file_dir));
+            }
+        }
+        return Status::OK();
+    };
+
+    io::Path get_tmp_file_dir() {
+        size_t cur_index = _next_index.fetch_add(1);
+        return _tmp_file_dirs[cur_index % _tmp_file_dirs.size()];
+    };
+
+private:
+    std::vector<io::Path> _tmp_file_dirs;
+    std::atomic_size_t _next_index {0}; // use for round-robin
+};
+
 } // namespace segment_v2
 } // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 89d871be293..1fe5d14372a 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -33,6 +33,7 @@
 #include "olap/memtable_memory_limiter.h"
 #include "olap/olap_define.h"
 #include "olap/options.h"
+#include "olap/rowset/segment_v2/inverted_index_writer.h"
 #include "olap/tablet_fwd.h"
 #include "pipeline/pipeline_tracing.h"
 #include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove 
this include header
@@ -58,6 +59,7 @@ class FileCacheFactory;
 namespace segment_v2 {
 class InvertedIndexSearcherCache;
 class InvertedIndexQueryCache;
+class TmpFileDirs;
 } // namespace segment_v2
 
 class WorkloadSchedPolicyMgr;
@@ -277,6 +279,8 @@ public:
         return _pipeline_tracer_ctx.get();
     }
 
+    segment_v2::TmpFileDirs* get_tmp_file_dirs() { return 
_tmp_file_dirs.get(); }
+
 private:
     ExecEnv();
 
@@ -390,6 +394,7 @@ private:
     RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr;
 
     std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx;
+    std::unique_ptr<segment_v2::TmpFileDirs> _tmp_file_dirs;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 2f6ac61d646..de3b9a2f32f 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -149,6 +149,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     }
     init_doris_metrics(store_paths);
     _store_paths = store_paths;
+    _tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths);
+    RETURN_IF_ERROR(_tmp_file_dirs->init());
     _user_function_cache = new UserFunctionCache();
     
static_cast<void>(_user_function_cache->init(doris::config::user_function_dir));
     _external_scan_context_mgr = new ExternalScanContextMgr(this);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to