This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c7ae2a7d22 [Refactor & Bugfix](static variables) move some static 
vairables to exec_env (#24029)
c7ae2a7d22 is described below

commit c7ae2a7d220429a39c52ded077c0d5b484223f38
Author: zhiqqqq <seuhezhiqi...@163.com>
AuthorDate: Wed Sep 13 09:27:03 2023 +0800

    [Refactor & Bugfix](static variables) move some static vairables to 
exec_env (#24029)
---
 be/src/common/daemon.cpp                           |   7 +-
 be/src/common/daemon.h                             |   2 +-
 be/src/common/resource_tls.cpp                     |  69 ------
 be/src/common/resource_tls.h                       |  30 ---
 be/src/http/action/file_cache_action.cpp           |   4 +-
 be/src/io/cache/block/block_file_cache_factory.cpp |   6 +-
 be/src/io/cache/block/block_file_cache_factory.h   |   2 +-
 .../io/cache/block/cached_remote_file_reader.cpp   |   8 +-
 be/src/io/fs/s3_file_write_bufferpool.h            |   4 +-
 be/src/olap/olap_define.h                          |  14 ++
 be/src/olap/page_cache.cpp                         |  18 +-
 be/src/olap/page_cache.h                           |   9 +-
 .../rowset/segment_v2/inverted_index_cache.cpp     |  13 +-
 .../olap/rowset/segment_v2/inverted_index_cache.h  |  29 ++-
 be/src/olap/schema_cache.cpp                       |  10 +-
 be/src/olap/schema_cache.h                         |  10 +-
 be/src/olap/segment_loader.cpp                     |   8 +-
 be/src/olap/segment_loader.h                       |   9 +-
 be/src/olap/storage_engine.cpp                     |  67 +++---
 be/src/olap/storage_engine.h                       |   9 +-
 be/src/olap/tablet_schema_cache.cpp                |  12 +-
 be/src/olap/tablet_schema_cache.h                  |  22 +-
 be/src/pipeline/task_scheduler.cpp                 |   4 +-
 be/src/pipeline/task_scheduler.h                   |   2 +-
 be/src/runtime/broker_mgr.cpp                      |   2 +-
 be/src/runtime/broker_mgr.h                        |   3 +-
 be/src/runtime/exec_env.cpp                        |   5 +-
 be/src/runtime/exec_env.h                          | 102 +++++++--
 be/src/runtime/exec_env_init.cpp                   | 240 +++++++++++++++++----
 be/src/runtime/external_scan_context_mgr.cpp       |   2 +-
 be/src/runtime/external_scan_context_mgr.h         |   4 +-
 be/src/runtime/fragment_mgr.cpp                    |   4 +-
 be/src/runtime/fragment_mgr.h                      |   2 +
 be/src/runtime/frontend_info.h                     |   1 +
 be/src/runtime/group_commit_mgr.cpp                |   9 +-
 be/src/runtime/group_commit_mgr.h                  |   2 +
 be/src/runtime/load_channel_mgr.cpp                |   5 +-
 be/src/runtime/load_channel_mgr.h                  |   2 +
 be/src/runtime/load_path_mgr.cpp                   |   2 +-
 be/src/runtime/load_path_mgr.h                     |   3 +-
 be/src/runtime/memory/cache_manager.h              |  12 +-
 be/src/runtime/result_buffer_mgr.cpp               |   2 +-
 be/src/runtime/result_buffer_mgr.h                 |   5 +-
 be/src/runtime/routine_load/data_consumer_pool.h   |   4 +-
 .../routine_load/routine_load_task_executor.cpp    |   9 +-
 .../routine_load/routine_load_task_executor.h      |   2 +
 be/src/runtime/task_group/task_group_manager.cpp   |   3 -
 be/src/runtime/task_group/task_group_manager.h     |   4 +-
 be/src/runtime/user_function_cache.cpp             |   4 +-
 be/src/service/brpc_service.cpp                    |  12 +-
 be/src/service/doris_main.cpp                      | 112 +++-------
 be/src/service/http_service.cpp                    |   4 +
 be/src/service/http_service.h                      |   2 +
 be/src/service/point_query_executor.cpp            |  22 +-
 be/src/service/point_query_executor.h              |  12 +-
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  18 +-
 be/src/vec/exec/scan/scanner_scheduler.h           |   4 +-
 be/test/common/resource_tls_test.cpp               |  49 -----
 be/test/olap/delete_bitmap_calculator_test.cpp     |   4 +-
 be/test/olap/delete_handler_test.cpp               |   7 +-
 be/test/olap/delta_writer_test.cpp                 |   5 +-
 .../olap/engine_storage_migration_task_test.cpp    |   7 +-
 be/test/olap/memtable_flush_executor_test.cpp      |   6 +-
 be/test/olap/memtable_memory_limiter_test.cpp      |  12 +-
 be/test/olap/ordered_data_compaction_test.cpp      |   5 +-
 be/test/olap/rowid_conversion_test.cpp             |   4 +-
 be/test/olap/rowset/beta_rowset_test.cpp           |   9 +-
 be/test/olap/rowset/rowset_meta_manager_test.cpp   |   3 +
 be/test/olap/tablet_cooldown_test.cpp              |   6 +-
 be/test/olap/tablet_mgr_test.cpp                   |   3 +
 be/test/olap/tablet_test.cpp                       |   4 +-
 be/test/olap/txn_manager_test.cpp                  |   1 +
 be/test/runtime/external_scan_context_mgr_test.cpp |   8 +-
 be/test/runtime/load_stream_test.cpp               |   8 +-
 .../runtime/routine_load_task_executor_test.cpp    |   5 +-
 be/test/testutil/run_all_tests.cpp                 |  15 +-
 be/test/vec/olap/vertical_compaction_test.cpp      |   4 +-
 77 files changed, 636 insertions(+), 516 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index c17bf1367f..43a3445382 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -63,10 +63,6 @@
 
 namespace doris {
 
-Daemon::~Daemon() {
-    stop();
-}
-
 void Daemon::tcmalloc_gc_thread() {
     // TODO All cache GC wish to be supported
 #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && 
!defined(THREAD_SANITIZER) && \
@@ -389,7 +385,9 @@ void Daemon::start() {
 }
 
 void Daemon::stop() {
+    LOG(INFO) << "Doris daemon is stopping.";
     if (_stop_background_threads_latch.count() == 0) {
+        LOG(INFO) << "Doris daemon stop returned since no bg threads latch.";
         return;
     }
     _stop_background_threads_latch.count_down();
@@ -398,6 +396,7 @@ void Daemon::stop() {
             t->join();
         }
     }
+    LOG(INFO) << "Doris daemon stopped after background threads are joined.";
 }
 
 } // namespace doris
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index c552f55f03..139584ba93 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -28,7 +28,7 @@ namespace doris {
 class Daemon {
 public:
     Daemon() : _stop_background_threads_latch(1) {}
-    ~Daemon();
+    ~Daemon() = default;
 
     // Start background threads
     void start();
diff --git a/be/src/common/resource_tls.cpp b/be/src/common/resource_tls.cpp
deleted file mode 100644
index 9b5ddc6815..0000000000
--- a/be/src/common/resource_tls.cpp
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "common/resource_tls.h"
-
-#include <gen_cpp/Types_types.h>
-#include <pthread.h>
-
-#include <ostream>
-
-#include "common/logging.h"
-
-namespace doris {
-
-static pthread_key_t s_resource_key;
-static bool s_is_init = false;
-
-static void resource_destructor(void* value) {
-    TResourceInfo* info = (TResourceInfo*)value;
-    if (info != nullptr) {
-        delete info;
-    }
-}
-
-void ResourceTls::init() {
-    int ret = pthread_key_create(&s_resource_key, resource_destructor);
-    if (ret != 0) {
-        LOG(ERROR) << "create pthread key for resource failed.";
-        return;
-    }
-    s_is_init = true;
-}
-
-TResourceInfo* ResourceTls::get_resource_tls() {
-    if (!s_is_init) {
-        return nullptr;
-    }
-    return (TResourceInfo*)pthread_getspecific(s_resource_key);
-}
-
-int ResourceTls::set_resource_tls(TResourceInfo* info) {
-    if (!s_is_init) {
-        return -1;
-    }
-    TResourceInfo* old_info = 
(TResourceInfo*)pthread_getspecific(s_resource_key);
-
-    int ret = pthread_setspecific(s_resource_key, info);
-    if (ret == 0) {
-        // OK, now we delete old one
-        delete old_info;
-    }
-    return ret;
-}
-
-} // namespace doris
diff --git a/be/src/common/resource_tls.h b/be/src/common/resource_tls.h
deleted file mode 100644
index deed16496d..0000000000
--- a/be/src/common/resource_tls.h
+++ /dev/null
@@ -1,30 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-namespace doris {
-
-class TResourceInfo;
-class ResourceTls {
-public:
-    static void init();
-    static TResourceInfo* get_resource_tls();
-    static int set_resource_tls(TResourceInfo*);
-};
-
-} // namespace doris
diff --git a/be/src/http/action/file_cache_action.cpp 
b/be/src/http/action/file_cache_action.cpp
index db432d5598..c55e639eea 100644
--- a/be/src/http/action/file_cache_action.cpp
+++ b/be/src/http/action/file_cache_action.cpp
@@ -42,9 +42,9 @@ Status FileCacheAction::_handle_header(HttpRequest* req, 
std::string* json_metri
     if (operation == "release") {
         size_t released = 0;
         if (req->param("base_path") != "") {
-            released = 
io::FileCacheFactory::instance().try_release(req->param("base_path"));
+            released = 
io::FileCacheFactory::instance()->try_release(req->param("base_path"));
         } else {
-            released = io::FileCacheFactory::instance().try_release();
+            released = io::FileCacheFactory::instance()->try_release();
         }
         EasyJson json;
         json["released_elements"] = released;
diff --git a/be/src/io/cache/block/block_file_cache_factory.cpp 
b/be/src/io/cache/block/block_file_cache_factory.cpp
index 8741ced5c0..6bbbba8023 100644
--- a/be/src/io/cache/block/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block/block_file_cache_factory.cpp
@@ -31,15 +31,15 @@
 #include "io/cache/block/block_file_cache_settings.h"
 #include "io/cache/block/block_lru_file_cache.h"
 #include "io/fs/local_file_system.h"
+#include "runtime/exec_env.h"
 
 namespace doris {
 class TUniqueId;
 
 namespace io {
 
-FileCacheFactory& FileCacheFactory::instance() {
-    static FileCacheFactory ret;
-    return ret;
+FileCacheFactory* FileCacheFactory::instance() {
+    return ExecEnv::GetInstance()->file_cache_factory();
 }
 
 size_t FileCacheFactory::try_release() {
diff --git a/be/src/io/cache/block/block_file_cache_factory.h 
b/be/src/io/cache/block/block_file_cache_factory.h
index c8ed2893f1..0b6b6504c9 100644
--- a/be/src/io/cache/block/block_file_cache_factory.h
+++ b/be/src/io/cache/block/block_file_cache_factory.h
@@ -37,7 +37,7 @@ namespace io {
  */
 class FileCacheFactory {
 public:
-    static FileCacheFactory& instance();
+    static FileCacheFactory* instance();
 
     void create_file_cache(const std::string& cache_base_path,
                            const FileCacheSettings& file_cache_settings, 
Status* status);
diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp 
b/be/src/io/cache/block/cached_remote_file_reader.cpp
index f1662418cf..86ecb14a75 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/block/cached_remote_file_reader.cpp
@@ -47,21 +47,21 @@ 
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader
     _is_doris_table = opts.is_doris_table;
     if (_is_doris_table) {
         _cache_key = IFileCache::hash(path().filename().native());
-        _cache = FileCacheFactory::instance().get_by_path(_cache_key);
+        _cache = FileCacheFactory::instance()->get_by_path(_cache_key);
     } else {
         // Use path and modification time to build cache key
         std::string unique_path = fmt::format("{}:{}", path().native(), 
opts.mtime);
         _cache_key = IFileCache::hash(unique_path);
         if (!opts.cache_base_path.empty()) {
             // from query session variable: file_cache_base_path
-            _cache = 
FileCacheFactory::instance().get_by_path(opts.cache_base_path);
+            _cache = 
FileCacheFactory::instance()->get_by_path(opts.cache_base_path);
             if (_cache == nullptr) {
                 LOG(WARNING) << "Can't get cache from base path: " << 
opts.cache_base_path
                              << ", using random instead.";
-                _cache = FileCacheFactory::instance().get_by_path(_cache_key);
+                _cache = FileCacheFactory::instance()->get_by_path(_cache_key);
             }
         }
-        _cache = FileCacheFactory::instance().get_by_path(path().native());
+        _cache = FileCacheFactory::instance()->get_by_path(path().native());
     }
 }
 
diff --git a/be/src/io/fs/s3_file_write_bufferpool.h 
b/be/src/io/fs/s3_file_write_bufferpool.h
index ad5f698f98..7e8bf01e19 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.h
+++ b/be/src/io/fs/s3_file_write_bufferpool.h
@@ -28,6 +28,7 @@
 #include "common/config.h"
 #include "common/status.h"
 #include "io/fs/s3_common.h"
+#include "runtime/exec_env.h"
 #include "util/slice.h"
 
 namespace doris {
@@ -126,8 +127,7 @@ public:
               doris::ThreadPool* thread_pool);
 
     static S3FileBufferPool* GetInstance() {
-        static S3FileBufferPool _pool;
-        return &_pool;
+        return ExecEnv::GetInstance()->get_s3_file_buffer_pool();
     }
 
     void reclaim(Slice buf) {
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index e0e1d919a5..901e0403f0 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -179,6 +179,20 @@ const std::string REMOTE_TABLET_GC_PREFIX = "tgc_";
         }                      \
     } while (0)
 
+#define SAFE_STOP(ptr)        \
+    do {                      \
+        if (nullptr != ptr) { \
+            ptr->stop();      \
+        }                     \
+    } while (0)
+
+#define SAFE_SHUTDOWN(ptr)    \
+    do {                      \
+        if (nullptr != ptr) { \
+            ptr->shutdown();  \
+        }                     \
+    } while (0)
+
 #ifndef BUILD_VERSION
 #define BUILD_VERSION "Unknown"
 #endif
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index 57049bdc6d..1779c293f8 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -21,16 +21,16 @@
 
 #include <ostream>
 
-namespace doris {
-
-StoragePageCache* StoragePageCache::_s_instance = nullptr;
+#include "runtime/exec_env.h"
 
-void StoragePageCache::create_global_cache(size_t capacity, int32_t 
index_cache_percentage,
-                                           int64_t pk_index_cache_capacity, 
uint32_t num_shards) {
-    DCHECK(_s_instance == nullptr);
-    static StoragePageCache instance(capacity, index_cache_percentage, 
pk_index_cache_capacity,
-                                     num_shards);
-    _s_instance = &instance;
+namespace doris {
+StoragePageCache* StoragePageCache::create_global_cache(size_t capacity,
+                                                        int32_t 
index_cache_percentage,
+                                                        int64_t 
pk_index_cache_capacity,
+                                                        uint32_t num_shards) {
+    StoragePageCache* res = new StoragePageCache(capacity, 
index_cache_percentage,
+                                                 pk_index_cache_capacity, 
num_shards);
+    return res;
 }
 
 StoragePageCache::StoragePageCache(size_t capacity, int32_t 
index_cache_percentage,
diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h
index e1e48f2856..c5c4e99099 100644
--- a/be/src/olap/page_cache.h
+++ b/be/src/olap/page_cache.h
@@ -127,13 +127,13 @@ public:
     static constexpr uint32_t kDefaultNumShards = 16;
 
     // Create global instance of this class
-    static void create_global_cache(size_t capacity, int32_t 
index_cache_percentage,
-                                    int64_t pk_index_cache_capacity,
-                                    uint32_t num_shards = kDefaultNumShards);
+    static StoragePageCache* create_global_cache(size_t capacity, int32_t 
index_cache_percentage,
+                                                 int64_t 
pk_index_cache_capacity,
+                                                 uint32_t num_shards = 
kDefaultNumShards);
 
     // Return global instance.
     // Client should call create_global_cache before.
-    static StoragePageCache* instance() { return _s_instance; }
+    static StoragePageCache* instance() { return 
ExecEnv::GetInstance()->get_storage_page_cache(); }
 
     StoragePageCache(size_t capacity, int32_t index_cache_percentage,
                      int64_t pk_index_cache_capacity, uint32_t num_shards);
@@ -165,7 +165,6 @@ public:
 
 private:
     StoragePageCache();
-    static StoragePageCache* _s_instance;
 
     int32_t _index_cache_percentage = 0;
     std::unique_ptr<DataPageCache> _data_page_cache = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
index 055365cf31..f399f752c3 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
@@ -31,6 +31,7 @@
 #include "olap/olap_common.h"
 #include "olap/rowset/segment_v2/inverted_index_compound_directory.h"
 #include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
+#include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
 #include "util/defer_op.h"
 #include "util/runtime_profile.h"
@@ -38,8 +39,6 @@
 namespace doris {
 namespace segment_v2 {
 
-InvertedIndexSearcherCache* InvertedIndexSearcherCache::_s_instance = nullptr;
-
 IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const 
io::FileSystemSPtr& fs,
                                                                   const 
std::string& index_dir,
                                                                   const 
std::string& file_name) {
@@ -55,10 +54,10 @@ IndexSearcherPtr 
InvertedIndexSearcherCache::build_index_searcher(const io::File
     return index_searcher;
 }
 
-void InvertedIndexSearcherCache::create_global_instance(size_t capacity, 
uint32_t num_shards) {
-    DCHECK(_s_instance == nullptr);
-    static InvertedIndexSearcherCache instance(capacity, num_shards);
-    _s_instance = &instance;
+InvertedIndexSearcherCache* InvertedIndexSearcherCache::create_global_instance(
+        size_t capacity, uint32_t num_shards) {
+    InvertedIndexSearcherCache* res = new InvertedIndexSearcherCache(capacity, 
num_shards);
+    return res;
 }
 
 InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, 
uint32_t num_shards)
@@ -211,8 +210,6 @@ Cache::Handle* InvertedIndexSearcherCache::_insert(const 
InvertedIndexSearcherCa
     return lru_handle;
 }
 
-InvertedIndexQueryCache* InvertedIndexQueryCache::_s_instance = nullptr;
-
 bool InvertedIndexQueryCache::lookup(const CacheKey& key, 
InvertedIndexQueryCacheHandle* handle) {
     if (key.encode().empty()) {
         return false;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h 
b/be/src/olap/rowset/segment_v2/inverted_index_cache.h
index c67e17ddda..19a9eb7734 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h
@@ -37,6 +37,7 @@
 #include "io/fs/path.h"
 #include "olap/lru_cache.h"
 #include "olap/rowset/segment_v2/inverted_index_query_type.h"
+#include "runtime/exec_env.h"
 #include "runtime/memory/lru_cache_policy.h"
 #include "runtime/memory/mem_tracker.h"
 #include "util/slice.h"
@@ -72,7 +73,8 @@ public:
 
     // Create global instance of this class.
     // "capacity" is the capacity of lru cache.
-    static void create_global_instance(size_t capacity, uint32_t num_shards = 
16);
+    static InvertedIndexSearcherCache* create_global_instance(size_t capacity,
+                                                              uint32_t 
num_shards = 16);
 
     void reset() {
         _cache.reset();
@@ -80,15 +82,11 @@ public:
         // Reset or clear the state of the object.
     }
 
-    static void reset_global_instance() {
-        if (_s_instance != nullptr) {
-            _s_instance->reset();
-        }
-    }
-
     // Return global instance.
     // Client should call create_global_cache before.
-    static InvertedIndexSearcherCache* instance() { return _s_instance; }
+    static InvertedIndexSearcherCache* instance() {
+        return ExecEnv::GetInstance()->get_inverted_index_searcher_cache();
+    }
 
     static IndexSearcherPtr build_index_searcher(const io::FileSystemSPtr& fs,
                                                  const std::string& index_dir,
@@ -123,7 +121,6 @@ private:
     Cache::Handle* _insert(const InvertedIndexSearcherCache::CacheKey& key, 
CacheValue* value);
 
 private:
-    static InvertedIndexSearcherCache* _s_instance;
     std::unique_ptr<MemTracker> _mem_tracker = nullptr;
 };
 
@@ -223,15 +220,16 @@ public:
     };
 
     // Create global instance of this class
-    static void create_global_cache(size_t capacity, uint32_t num_shards = 16) 
{
-        DCHECK(_s_instance == nullptr);
-        static InvertedIndexQueryCache instance(capacity, num_shards);
-        _s_instance = &instance;
+    static InvertedIndexQueryCache* create_global_cache(size_t capacity, 
uint32_t num_shards = 16) {
+        InvertedIndexQueryCache* res = new InvertedIndexQueryCache(capacity, 
num_shards);
+        return res;
     }
 
     // Return global instance.
     // Client should call create_global_cache before.
-    static InvertedIndexQueryCache* instance() { return _s_instance; }
+    static InvertedIndexQueryCache* instance() {
+        return ExecEnv::GetInstance()->get_inverted_index_query_cache();
+    }
 
     InvertedIndexQueryCache() = delete;
 
@@ -246,9 +244,6 @@ public:
                 InvertedIndexQueryCacheHandle* handle);
 
     int64_t mem_consumption();
-
-private:
-    static InvertedIndexQueryCache* _s_instance;
 };
 
 class InvertedIndexQueryCacheHandle {
diff --git a/be/src/olap/schema_cache.cpp b/be/src/olap/schema_cache.cpp
index 39d1a60a4c..7bf6b592c6 100644
--- a/be/src/olap/schema_cache.cpp
+++ b/be/src/olap/schema_cache.cpp
@@ -35,7 +35,9 @@
 
 namespace doris {
 
-SchemaCache* SchemaCache::_s_instance = nullptr;
+SchemaCache* SchemaCache::instance() {
+    return ExecEnv::GetInstance()->schema_cache();
+}
 
 // format: tabletId-unique_id1-uniqueid2...-version-type
 std::string SchemaCache::get_schema_key(int32_t tablet_id, const 
TabletSchemaSPtr& schema,
@@ -69,10 +71,4 @@ std::string SchemaCache::get_schema_key(int32_t tablet_id, 
const std::vector<TCo
     return key;
 }
 
-void SchemaCache::create_global_instance(size_t capacity) {
-    DCHECK(_s_instance == nullptr);
-    static SchemaCache instance(capacity);
-    _s_instance = &instance;
-}
-
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h
index 5d94c92837..dbce5336d8 100644
--- a/be/src/olap/schema_cache.h
+++ b/be/src/olap/schema_cache.h
@@ -48,7 +48,7 @@ class SchemaCache : public LRUCachePolicy {
 public:
     enum class Type { TABLET_SCHEMA = 0, SCHEMA = 1 };
 
-    static SchemaCache* instance() { return _s_instance; }
+    static SchemaCache* instance();
 
     static void create_global_instance(size_t capacity);
 
@@ -62,7 +62,7 @@ public:
     // Get a shared cached schema from cache, schema_key is a subset of column 
unique ids
     template <typename SchemaType>
     SchemaType get_schema(const std::string& schema_key) {
-        if (!_s_instance || schema_key.empty()) {
+        if (!instance() || schema_key.empty()) {
             return {};
         }
         auto lru_handle = _cache->lookup(schema_key);
@@ -84,7 +84,7 @@ public:
     // Insert a shared Schema into cache, schema_key is full column unique ids
     template <typename SchemaType>
     void insert_schema(const std::string& key, SchemaType schema) {
-        if (!_s_instance || key.empty()) {
+        if (!instance() || key.empty()) {
             return;
         }
         CacheValue* value = new CacheValue;
@@ -115,12 +115,12 @@ public:
         SchemaSPtr schema = nullptr;
     };
 
-private:
     SchemaCache(size_t capacity)
             : LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CACHE, capacity, 
LRUCacheType::NUMBER,
                              config::schema_cache_sweep_time_sec) {}
+
+private:
     static constexpr char SCHEMA_DELIMITER = '-';
-    static SchemaCache* _s_instance;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index 617b5e6fff..8d759cce8e 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -24,12 +24,8 @@
 
 namespace doris {
 
-SegmentLoader* SegmentLoader::_s_instance = nullptr;
-
-void SegmentLoader::create_global_instance(size_t capacity) {
-    DCHECK(_s_instance == nullptr);
-    static SegmentLoader instance(capacity);
-    _s_instance = &instance;
+SegmentLoader* SegmentLoader::instance() {
+    return ExecEnv::GetInstance()->segment_loader();
 }
 
 bool SegmentCache::lookup(const SegmentCache::CacheKey& key, 
SegmentCacheHandle* handle) {
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index d5849e6097..d6b1d07940 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -95,6 +95,8 @@ public:
 
 class SegmentLoader {
 public:
+    static SegmentLoader* instance();
+
     // Create global instance of this class.
     // "capacity" is the capacity of lru cache.
     // TODO: Currently we use the number of rowset as the cache capacity.
@@ -102,11 +104,6 @@ public:
     // This is because currently we cannot accurately estimate the memory 
occupied by a segment.
     // After the estimation of segment memory usage is provided later, it is 
recommended
     // to use Memory as the capacity limit of the cache.
-    static void create_global_instance(size_t capacity);
-
-    // Return global instance.
-    // Client should call create_global_cache before.
-    static SegmentLoader* instance() { return _s_instance; }
 
     SegmentLoader(size_t capacity) { _segment_cache = 
std::make_unique<SegmentCache>(capacity); }
 
@@ -121,8 +118,6 @@ public:
 
 private:
     SegmentLoader();
-
-    static SegmentLoader* _s_instance;
     std::unique_ptr<SegmentCache> _segment_cache = nullptr;
 };
 
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index b425076fc0..8bace65617 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -93,8 +93,6 @@ using namespace ErrorCode;
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS);
 
-StorageEngine* StorageEngine::_s_instance = nullptr;
-
 static Status _validate_options(const EngineOptions& options) {
     if (options.store_paths.empty()) {
         return Status::InternalError("store paths is empty");
@@ -102,14 +100,11 @@ static Status _validate_options(const EngineOptions& 
options) {
     return Status::OK();
 }
 
-Status StorageEngine::open(const EngineOptions& options,
-                           std::unique_ptr<StorageEngine>* engine_ptr) {
-    RETURN_IF_ERROR(_validate_options(options));
-    LOG(INFO) << "starting backend using uid:" << 
options.backend_uid.to_string();
-    std::unique_ptr<StorageEngine> engine(new StorageEngine(options));
-    RETURN_NOT_OK_STATUS_WITH_WARN(engine->_open(), "open engine failed");
+Status StorageEngine::open() {
+    RETURN_IF_ERROR(_validate_options(_options));
+    LOG(INFO) << "starting backend using uid:" << 
_options.backend_uid.to_string();
+    RETURN_NOT_OK_STATUS_WITH_WARN(_open(), "open engine failed");
     LOG(INFO) << "success to init storage engine.";
-    *engine_ptr = std::move(engine);
     return Status::OK();
 }
 
@@ -130,7 +125,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
           _default_rowset_type(BETA_ROWSET),
           _heartbeat_flags(nullptr),
           _stream_load_recorder(nullptr) {
-    _s_instance = this;
     REGISTER_HOOK_METRIC(unused_rowsets_count, [this]() {
         // std::lock_guard<std::mutex> lock(_gc_mutex);
         return _unused_rowsets.size();
@@ -139,30 +133,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
 
 StorageEngine::~StorageEngine() {
     stop();
-
-    DEREGISTER_HOOK_METRIC(unused_rowsets_count);
-
-    if (_base_compaction_thread_pool) {
-        _base_compaction_thread_pool->shutdown();
-    }
-    if (_cumu_compaction_thread_pool) {
-        _cumu_compaction_thread_pool->shutdown();
-    }
-    if (_single_replica_compaction_thread_pool) {
-        _single_replica_compaction_thread_pool->shutdown();
-    }
-
-    if (_seg_compaction_thread_pool) {
-        _seg_compaction_thread_pool->shutdown();
-    }
-    if (_tablet_meta_checkpoint_thread_pool) {
-        _tablet_meta_checkpoint_thread_pool->shutdown();
-    }
-    if (_cold_data_compaction_thread_pool) {
-        _cold_data_compaction_thread_pool->shutdown();
-    }
     _clear();
-    _s_instance = nullptr;
 }
 
 Status StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
@@ -543,7 +514,10 @@ void StorageEngine::_exit_if_too_many_disks_are_failed() {
 }
 
 void StorageEngine::stop() {
-    if (_stopped) return;
+    if (_stopped) {
+        LOG(WARNING) << "Storage engine is stopped twice.";
+        return;
+    }
     // trigger the waiting threads
     notify_listeners();
 
@@ -582,7 +556,32 @@ void StorageEngine::stop() {
     THREADS_JOIN(_path_gc_threads);
     THREADS_JOIN(_path_scan_threads);
 #undef THREADS_JOIN
+
+    if (_base_compaction_thread_pool) {
+        _base_compaction_thread_pool->shutdown();
+    }
+    if (_cumu_compaction_thread_pool) {
+        _cumu_compaction_thread_pool->shutdown();
+    }
+    if (_single_replica_compaction_thread_pool) {
+        _single_replica_compaction_thread_pool->shutdown();
+    }
+
+    if (_seg_compaction_thread_pool) {
+        _seg_compaction_thread_pool->shutdown();
+    }
+    if (_tablet_meta_checkpoint_thread_pool) {
+        _tablet_meta_checkpoint_thread_pool->shutdown();
+    }
+    if (_cold_data_compaction_thread_pool) {
+        _cold_data_compaction_thread_pool->shutdown();
+    }
+
+    _memtable_flush_executor.reset(nullptr);
+    _calc_delete_bitmap_executor.reset(nullptr);
+
     _stopped = true;
+    LOG(INFO) << "Storage engine is stopped.";
 }
 
 void StorageEngine::_clear() {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 02e6ec5f7b..b5cb99ec21 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -47,6 +47,7 @@
 #include "olap/rowset/segment_v2/segment.h"
 #include "olap/tablet.h"
 #include "olap/task/index_builder.h"
+#include "runtime/exec_env.h"
 #include "runtime/heartbeat_flags.h"
 #include "util/countdown_latch.h"
 
@@ -81,9 +82,9 @@ public:
     StorageEngine(const EngineOptions& options);
     ~StorageEngine();
 
-    static Status open(const EngineOptions& options, 
std::unique_ptr<StorageEngine>* engine_ptr);
+    [[nodiscard]] Status open();
 
-    static StorageEngine* instance() { return _s_instance; }
+    static StorageEngine* instance() { return 
ExecEnv::GetInstance()->get_storage_engine(); }
 
     Status create_tablet(const TCreateTabletReq& request, RuntimeProfile* 
profile);
 
@@ -178,6 +179,7 @@ public:
     // option: update disk usage after sweep
     Status start_trash_sweep(double* usage, bool ignore_guard = false);
 
+    // Must call stop() before storage_engine is deconstructed
     void stop();
 
     void get_tablet_rowset_versions(const PGetTabletVersionsRequest* request,
@@ -373,8 +375,7 @@ private:
     int32_t _effective_cluster_id;
     bool _is_all_cluster_id_exist;
 
-    static StorageEngine* _s_instance;
-    bool _stopped;
+    std::atomic_bool _stopped {false};
 
     std::mutex _gc_mutex;
     // map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we 
need custom hash func
diff --git a/be/src/olap/tablet_schema_cache.cpp 
b/be/src/olap/tablet_schema_cache.cpp
index 2e4b221492..90880665ad 100644
--- a/be/src/olap/tablet_schema_cache.cpp
+++ b/be/src/olap/tablet_schema_cache.cpp
@@ -19,12 +19,7 @@
 
 namespace doris {
 
-TabletSchemaCache::~TabletSchemaCache() {
-    stop_and_join();
-}
-
 TabletSchemaSPtr TabletSchemaCache::insert(const std::string& key) {
-    DCHECK(_s_instance != nullptr);
     std::lock_guard guard(_mtx);
     auto iter = _cache.find(key);
     if (iter == _cache.end()) {
@@ -41,11 +36,18 @@ TabletSchemaSPtr TabletSchemaCache::insert(const 
std::string& key) {
     return iter->second;
 }
 
+void TabletSchemaCache::start() {
+    std::thread t(&TabletSchemaCache::_recycle, this);
+    t.detach();
+    LOG(INFO) << "TabletSchemaCache started";
+}
+
 void TabletSchemaCache::stop() {
     _should_stop = true;
     while (!_is_stopped) {
         std::this_thread::sleep_for(std::chrono::seconds(1));
     }
+    LOG(INFO) << "TabletSchemaCache stopped";
 }
 
 /**
diff --git a/be/src/olap/tablet_schema_cache.h 
b/be/src/olap/tablet_schema_cache.h
index 6c692aaf46..93798983c8 100644
--- a/be/src/olap/tablet_schema_cache.h
+++ b/be/src/olap/tablet_schema_cache.h
@@ -24,31 +24,28 @@
 #include <unordered_map>
 
 #include "olap/tablet_schema.h"
+#include "runtime/exec_env.h"
 #include "util/doris_metrics.h"
 
 namespace doris {
 
 class TabletSchemaCache {
 public:
-    ~TabletSchemaCache();
+    ~TabletSchemaCache() = default;
 
-    static void create_global_schema_cache() {
-        DCHECK(_s_instance == nullptr);
-        static TabletSchemaCache instance;
-        _s_instance = &instance;
-        std::thread t(&TabletSchemaCache::_recycle, _s_instance);
-        t.detach();
+    static TabletSchemaCache* create_global_schema_cache() {
+        TabletSchemaCache* res = new TabletSchemaCache();
+        return res;
     }
 
-    static TabletSchemaCache* instance() { return _s_instance; }
-
-    static void stop_and_join() {
-        DCHECK(_s_instance != nullptr);
-        _s_instance->stop();
+    static TabletSchemaCache* instance() {
+        return ExecEnv::GetInstance()->get_tablet_schema_cache();
     }
 
     TabletSchemaSPtr insert(const std::string& key);
 
+    void start();
+
     void stop();
 
 private:
@@ -58,7 +55,6 @@ private:
     void _recycle();
 
 private:
-    static inline TabletSchemaCache* _s_instance = nullptr;
     std::mutex _mtx;
     std::unordered_map<std::string, TabletSchemaSPtr> _cache;
     std::atomic_bool _should_stop = {false};
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 0ce4115c0c..c4278c3807 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -189,7 +189,7 @@ void 
BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
 }
 
 TaskScheduler::~TaskScheduler() {
-    shutdown();
+    stop();
 }
 
 Status TaskScheduler::start() {
@@ -340,7 +340,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state)
     task->fragment_context()->close_a_pipeline();
 }
 
-void TaskScheduler::shutdown() {
+void TaskScheduler::stop() {
     if (!this->_shutdown.load()) {
         this->_shutdown.store(true);
         _blocked_task_scheduler->shutdown();
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index ac9389c088..13b9e734d6 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -89,7 +89,7 @@ public:
 
     Status start();
 
-    void shutdown();
+    void stop();
 
     TaskQueue* task_queue() const { return _task_queue.get(); }
 
diff --git a/be/src/runtime/broker_mgr.cpp b/be/src/runtime/broker_mgr.cpp
index 6d17f32ffb..613b916d06 100644
--- a/be/src/runtime/broker_mgr.cpp
+++ b/be/src/runtime/broker_mgr.cpp
@@ -53,7 +53,7 @@ BrokerMgr::BrokerMgr(ExecEnv* exec_env) : 
_exec_env(exec_env), _stop_background_
     });
 }
 
-BrokerMgr::~BrokerMgr() {
+void BrokerMgr::stop() {
     DEREGISTER_HOOK_METRIC(broker_count);
     _stop_background_threads_latch.count_down();
     if (_ping_thread) {
diff --git a/be/src/runtime/broker_mgr.h b/be/src/runtime/broker_mgr.h
index fd7a67e43c..d9238aed2c 100644
--- a/be/src/runtime/broker_mgr.h
+++ b/be/src/runtime/broker_mgr.h
@@ -35,8 +35,9 @@ class Thread;
 class BrokerMgr {
 public:
     BrokerMgr(ExecEnv* exec_env);
-    ~BrokerMgr();
+    ~BrokerMgr() = default;
     void init();
+    void stop();
     const std::string& get_client_id(const TNetworkAddress& address);
 
 private:
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index c30fb20db8..27986f5de3 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -23,6 +23,7 @@
 #include <utility>
 
 #include "common/config.h"
+#include "olap/olap_define.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/frontend_info.h"
 #include "time.h"
@@ -31,10 +32,8 @@
 
 namespace doris {
 
-ExecEnv::ExecEnv() = default;
-
 ExecEnv::~ExecEnv() {
-    _destroy();
+    destroy();
 }
 
 const std::string& ExecEnv::token() const {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f6c61d26f3..fd54d44166 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -17,7 +17,7 @@
 
 #pragma once
 
-#include <gen_cpp/HeartbeatService_types.h>
+#include <common/multi_version.h>
 #include <stddef.h>
 
 #include <algorithm>
@@ -32,12 +32,13 @@
 
 #include "common/status.h"
 #include "olap/memtable_memory_limiter.h"
+#include "olap/olap_define.h"
 #include "olap/options.h"
+#include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove 
this include header
 #include "util/threadpool.h"
 #include "vec/common/hash_table/phmap_fwd_decl.h"
 
 namespace doris {
-struct FrontendInfo;
 namespace vectorized {
 class VDataStreamMgr;
 class ScannerScheduler;
@@ -49,6 +50,15 @@ class TaskScheduler;
 namespace taskgroup {
 class TaskGroupManager;
 }
+namespace io {
+class S3FileBufferPool;
+class FileCacheFactory;
+} // namespace io
+namespace segment_v2 {
+class InvertedIndexSearcherCache;
+class InvertedIndexQueryCache;
+} // namespace segment_v2
+
 class BfdParser;
 class BrokerMgr;
 template <class T>
@@ -80,6 +90,14 @@ class HeartbeatFlags;
 class FrontendServiceClient;
 class FileMetaCache;
 class GroupCommitMgr;
+class TabletSchemaCache;
+class UserFunctionCache;
+class SchemaCache;
+class StoragePageCache;
+class SegmentLoader;
+class LookupConnectionCache;
+class RowCache;
+class CacheManager;
 
 inline bool k_doris_exit = false;
 
@@ -89,9 +107,15 @@ inline bool k_doris_exit = false;
 // once to properly initialise service state.
 class ExecEnv {
 public:
+    // Empty destructor because the compiler-generated one requires full
+    // declarations for classes in scoped_ptrs.
+    ~ExecEnv();
+
     // Initial exec environment. must call this to init all
-    static Status init(ExecEnv* env, const std::vector<StorePath>& 
store_paths);
-    static void destroy(ExecEnv* exec_env);
+    [[nodiscard]] static Status init(ExecEnv* env, const 
std::vector<StorePath>& store_paths);
+
+    // Stop all threads and delete resources.
+    void destroy();
 
     /// Returns the first created exec env instance. In a normal doris, this is
     /// the only instance. In test setups with multiple ExecEnv's per process,
@@ -101,10 +125,6 @@ public:
         return &s_exec_env;
     }
 
-    // Empty destructor because the compiler-generated one requires full
-    // declarations for classes in scoped_ptrs.
-    ~ExecEnv();
-
     static bool ready() { return _s_ready.load(std::memory_order_acquire); }
     const std::string& token() const;
     ExternalScanContextMgr* external_scan_context_mgr() { return 
_external_scan_context_mgr; }
@@ -152,12 +172,15 @@ public:
     void init_download_cache_buf();
     void init_download_cache_required_components();
     Status init_pipeline_task_scheduler();
+    void init_file_cache_factory();
     char* get_download_cache_buf(ThreadPoolToken* token) {
         if (_download_cache_buf_map.find(token) == 
_download_cache_buf_map.end()) {
             return nullptr;
         }
         return _download_cache_buf_map[token].get();
     }
+    io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; }
+    UserFunctionCache* user_function_cache() { return _user_function_cache; }
     FragmentMgr* fragment_mgr() { return _fragment_mgr; }
     ResultCache* result_cache() { return _result_cache; }
     TMasterInfo* master_info() { return _master_info; }
@@ -185,14 +208,11 @@ public:
     FileMetaCache* file_meta_cache() { return _file_meta_cache; }
     MemTableMemoryLimiter* memtable_memory_limiter() { return 
_memtable_memory_limiter.get(); }
 #ifdef BE_TEST
+    void set_ready() { this->_s_ready = true; }
+    void set_not_ready() { this->_s_ready = false; }
     void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) {
         _memtable_memory_limiter.reset(limiter);
     }
-#endif
-    vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; }
-    std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; }
-
-    // only for unit test
     void set_master_info(TMasterInfo* master_info) { this->_master_info = 
master_info; }
     void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr> 
new_load_stream_mgr) {
         this->_new_load_stream_mgr = new_load_stream_mgr;
@@ -201,16 +221,45 @@ public:
         this->_stream_load_executor = stream_load_executor;
     }
 
+    void set_storage_engine(StorageEngine* se) { this->_storage_engine = se; }
+    void set_cache_manager(CacheManager* cm) { this->_cache_manager = cm; }
+    void set_tablet_schema_cache(TabletSchemaCache* c) { 
this->_tablet_schema_cache = c; }
+    void set_storage_page_cache(StoragePageCache* c) { 
this->_storage_page_cache = c; }
+    void set_segment_loader(SegmentLoader* sl) { this->_segment_loader = sl; }
+    void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) {
+        this->_routine_load_task_executor = r;
+    }
+
+#endif
+    vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; }
+    std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; }
+
     void wait_for_all_tasks_done();
 
     void update_frontends(const std::vector<TFrontendInfo>& new_infos);
     std::map<TNetworkAddress, FrontendInfo> get_frontends();
     std::map<TNetworkAddress, FrontendInfo> get_running_frontends();
 
+    TabletSchemaCache* get_tablet_schema_cache() { return 
_tablet_schema_cache; }
+    StorageEngine* get_storage_engine() { return _storage_engine; }
+    io::S3FileBufferPool* get_s3_file_buffer_pool() { return _s3_buffer_pool; }
+    SchemaCache* schema_cache() { return _schema_cache; }
+    StoragePageCache* get_storage_page_cache() { return _storage_page_cache; }
+    SegmentLoader* segment_loader() { return _segment_loader; }
+    LookupConnectionCache* get_lookup_connection_cache() { return 
_lookup_connection_cache; }
+    RowCache* get_row_cache() { return _row_cache; }
+    CacheManager* get_cache_manager() { return _cache_manager; }
+    segment_v2::InvertedIndexSearcherCache* 
get_inverted_index_searcher_cache() {
+        return _inverted_index_searcher_cache;
+    }
+    segment_v2::InvertedIndexQueryCache* get_inverted_index_query_cache() {
+        return _inverted_index_query_cache;
+    }
+
 private:
-    ExecEnv();
+    ExecEnv() = default;
 
-    Status _init(const std::vector<StorePath>& store_paths);
+    [[nodiscard]] Status _init(const std::vector<StorePath>& store_paths);
     void _destroy();
 
     Status _init_mem_env();
@@ -221,6 +270,8 @@ private:
     inline static std::atomic_bool _s_ready {false};
     std::vector<StorePath> _store_paths;
 
+    io::FileCacheFactory* _file_cache_factory = nullptr;
+    UserFunctionCache* _user_function_cache = nullptr;
     // Leave protected so that subclasses can override
     ExternalScanContextMgr* _external_scan_context_mgr = nullptr;
     doris::vectorized::VDataStreamMgr* _vstream_mgr = nullptr;
@@ -268,6 +319,7 @@ private:
     BfdParser* _bfd_parser = nullptr;
     BrokerMgr* _broker_mgr = nullptr;
     LoadChannelMgr* _load_channel_mgr = nullptr;
+    // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control 
its life cycle.
     std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
     BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
     BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;
@@ -289,6 +341,22 @@ private:
     std::mutex _frontends_lock;
     std::map<TNetworkAddress, FrontendInfo> _frontends;
     GroupCommitMgr* _group_commit_mgr = nullptr;
+
+    // Maybe we should use unique_ptr, but it need complete type, which means 
we need
+    // to include many headers, and for some cpp file that do not need class 
like TabletSchemaCache,
+    // these redundancy header could introduce potential bug, at least, more 
header means slow compile.
+    // So we choose to use raw pointer, please remember to delete these 
pointer in deconstructor.
+    TabletSchemaCache* _tablet_schema_cache = nullptr;
+    io::S3FileBufferPool* _s3_buffer_pool = nullptr;
+    StorageEngine* _storage_engine = nullptr;
+    SchemaCache* _schema_cache = nullptr;
+    StoragePageCache* _storage_page_cache = nullptr;
+    SegmentLoader* _segment_loader = nullptr;
+    LookupConnectionCache* _lookup_connection_cache = nullptr;
+    RowCache* _row_cache = nullptr;
+    CacheManager* _cache_manager = nullptr;
+    segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = 
nullptr;
+    segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
 };
 
 template <>
@@ -305,4 +373,8 @@ ExecEnv::get_client_cache<TPaloBrokerServiceClient>() {
     return _broker_client_cache;
 }
 
+inline segment_v2::InvertedIndexQueryCache* GetInvertedIndexQueryCache() {
+    return ExecEnv::GetInstance()->get_inverted_index_query_cache();
+}
+
 } // namespace doris
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 0bc375aac4..bcdbf49801 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -16,6 +16,7 @@
 // under the License.
 
 // IWYU pragma: no_include <bthread/errno.h>
+#include <common/multi_version.h>
 #include <errno.h> // IWYU pragma: keep
 #include <gen_cpp/HeartbeatService_types.h>
 #include <gen_cpp/Metrics_types.h>
@@ -36,7 +37,9 @@
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
+#include "io/cache/block/block_file_cache_factory.h"
 #include "io/fs/file_meta_cache.h"
+#include "io/fs/s3_file_write_bufferpool.h"
 #include "olap/memtable_memory_limiter.h"
 #include "olap/olap_define.h"
 #include "olap/options.h"
@@ -44,6 +47,7 @@
 #include "olap/rowset/segment_v2/inverted_index_cache.h"
 #include "olap/schema_cache.h"
 #include "olap/segment_loader.h"
+#include "olap/storage_engine.h"
 #include "pipeline/task_queue.h"
 #include "pipeline/task_scheduler.h"
 #include "runtime/block_spill_manager.h"
@@ -69,7 +73,9 @@
 #include "runtime/stream_load/stream_load_executor.h"
 #include "runtime/task_group/task_group_manager.h"
 #include "runtime/thread_context.h"
+#include "runtime/user_function_cache.h"
 #include "service/backend_options.h"
+#include "service/backend_service.h"
 #include "service/point_query_executor.h"
 #include "util/bfd_parser.h"
 #include "util/bit_util.h"
@@ -82,6 +88,7 @@
 #include "util/parse_util.h"
 #include "util/pretty_printer.h"
 #include "util/threadpool.h"
+#include "util/thrift_rpc_helper.h"
 #include "util/timezone_utils.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/vdata_stream_mgr.h"
@@ -135,7 +142,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
     }
     init_doris_metrics(store_paths);
     _store_paths = store_paths;
-
+    _user_function_cache = new UserFunctionCache();
+    _user_function_cache->init(doris::config::user_function_dir);
     _external_scan_context_mgr = new ExternalScanContextMgr(this);
     _vstream_mgr = new doris::vectorized::VDataStreamMgr();
     _result_mgr = new ResultBufferMgr();
@@ -146,6 +154,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
 
     TimezoneUtils::load_timezone_names();
 
+    // _global_zone_cache is not owned by ExecEnv ... maybe should refactor.
     _global_zone_cache = std::make_unique<vectorized::ZoneList>();
     TimezoneUtils::load_timezones_to_cache(*_global_zone_cache);
 
@@ -176,7 +185,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
             .set_max_threads(std::numeric_limits<int>::max())
             .set_max_queue_size(config::fragment_pool_queue_size)
             .build(&_join_node_thread_pool);
-
+    init_file_cache_factory();
     RETURN_IF_ERROR(init_pipeline_task_scheduler());
     _task_group_manager = new taskgroup::TaskGroupManager();
     _scanner_scheduler = new doris::vectorized::ScannerScheduler();
@@ -205,12 +214,16 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
     _result_mgr->init();
     Status status = _load_path_mgr->init();
     if (!status.ok()) {
-        LOG(ERROR) << "load path mgr init failed." << status;
-        exit(-1);
+        LOG(ERROR) << "Load path mgr init failed. " << status;
+        return status;
     }
     _broker_mgr->init();
     _small_file_mgr->init();
-    _scanner_scheduler->init(this);
+    status = _scanner_scheduler->init();
+    if (!status.ok()) {
+        LOG(ERROR) << "Scanner scheduler init failed. " << status;
+        return status;
+    }
 
     _init_mem_env();
 
@@ -218,7 +231,33 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
     RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
     _heartbeat_flags = new HeartbeatFlags();
     _register_metrics();
+
+    _tablet_schema_cache = TabletSchemaCache::create_global_schema_cache();
+    _tablet_schema_cache->start();
+
+    // S3 buffer pool
+    _s3_buffer_pool = new io::S3FileBufferPool();
+    _s3_buffer_pool->init(config::s3_write_buffer_whole_size, 
config::s3_write_buffer_size,
+                          this->buffered_reader_prefetch_thread_pool());
+
+    // Storage engine
+    doris::EngineOptions options;
+    options.store_paths = store_paths;
+    options.backend_uid = doris::UniqueId::gen_uid();
+    _storage_engine = new StorageEngine(options);
+    auto st = _storage_engine->open();
+    if (!st.ok()) {
+        LOG(ERROR) << "Lail to open StorageEngine, res=" << st;
+        return st;
+    }
+    _storage_engine->set_heartbeat_flags(this->heartbeat_flags());
+    if (st = _storage_engine->start_bg_threads(); !st.ok()) {
+        LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << 
st;
+        return st;
+    }
+
     _s_ready = true;
+
     return Status::OK();
 }
 
@@ -244,6 +283,56 @@ Status ExecEnv::init_pipeline_task_scheduler() {
     return Status::OK();
 }
 
+void ExecEnv::init_file_cache_factory() {
+    // Load file cache before starting up daemon threads to make sure 
StorageEngine is read.
+    if (doris::config::enable_file_cache) {
+        _file_cache_factory = new io::FileCacheFactory();
+        io::IFileCache::init();
+        std::unordered_set<std::string> cache_path_set;
+        std::vector<doris::CachePath> cache_paths;
+        Status olap_res =
+                doris::parse_conf_cache_paths(doris::config::file_cache_path, 
cache_paths);
+        if (!olap_res) {
+            LOG(FATAL) << "parse config file cache path failed, path="
+                       << doris::config::file_cache_path;
+            exit(-1);
+        }
+
+        std::unique_ptr<doris::ThreadPool> file_cache_init_pool;
+        doris::ThreadPoolBuilder("FileCacheInitThreadPool")
+                .set_min_threads(cache_paths.size())
+                .set_max_threads(cache_paths.size())
+                .build(&file_cache_init_pool);
+
+        std::list<doris::Status> cache_status;
+        for (auto& cache_path : cache_paths) {
+            if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
+                LOG(WARNING) << fmt::format("cache path {} is duplicate", 
cache_path.path);
+                continue;
+            }
+
+            olap_res = file_cache_init_pool->submit_func(std::bind(
+                    &io::FileCacheFactory::create_file_cache, 
_file_cache_factory, cache_path.path,
+                    cache_path.init_settings(), 
&(cache_status.emplace_back())));
+
+            if (!olap_res.ok()) {
+                LOG(FATAL) << "failed to init file cache, err: " << olap_res;
+                exit(-1);
+            }
+            cache_path_set.emplace(cache_path.path);
+        }
+
+        file_cache_init_pool->wait();
+        for (const auto& status : cache_status) {
+            if (!status.ok()) {
+                LOG(FATAL) << "failed to init file cache, err: " << status;
+                exit(-1);
+            }
+        }
+    }
+    return;
+}
+
 Status ExecEnv::_init_mem_env() {
     bool is_percent = false;
     std::stringstream ss;
@@ -262,7 +351,7 @@ Status ExecEnv::_init_mem_env() {
     }
 
     // 3. init storage page cache
-    CacheManager::create_global_instance();
+    _cache_manager = CacheManager::create_global_instance();
 
     int64_t storage_cache_limit =
             ParseUtil::parse_mem_spec(config::storage_page_cache_limit, 
MemInfo::mem_limit(),
@@ -286,8 +375,8 @@ Status ExecEnv::_init_mem_env() {
     while (!is_percent && pk_storage_page_cache_limit > MemInfo::mem_limit() / 
2) {
         pk_storage_page_cache_limit = storage_cache_limit / 2;
     }
-    StoragePageCache::create_global_cache(storage_cache_limit, 
index_percentage,
-                                          pk_storage_page_cache_limit, 
num_shards);
+    _storage_page_cache = StoragePageCache::create_global_cache(
+            storage_cache_limit, index_percentage, 
pk_storage_page_cache_limit, num_shards);
     LOG(INFO) << "Storage page cache memory limit: "
               << PrettyPrinter::print(storage_cache_limit, TUnit::BYTES)
               << ", origin config value: " << config::storage_page_cache_limit;
@@ -300,7 +389,7 @@ Status ExecEnv::_init_mem_env() {
         // Reason same as buffer_pool_limit
         row_cache_mem_limit = row_cache_mem_limit / 2;
     }
-    RowCache::create_global_cache(row_cache_mem_limit);
+    _row_cache = RowCache::create_global_cache(row_cache_mem_limit);
     LOG(INFO) << "Row cache memory limit: "
               << PrettyPrinter::print(row_cache_mem_limit, TUnit::BYTES)
               << ", origin config value: " << config::row_cache_mem_limit;
@@ -322,11 +411,12 @@ Status ExecEnv::_init_mem_env() {
     }
     LOG(INFO) << "segment_cache_capacity <= fd_number * 2 / 5, fd_number: " << 
fd_number
               << " segment_cache_capacity: " << segment_cache_capacity;
-    SegmentLoader::create_global_instance(segment_cache_capacity);
+    _segment_loader = new SegmentLoader(segment_cache_capacity);
 
-    SchemaCache::create_global_instance(config::schema_cache_capacity);
+    _schema_cache = new SchemaCache(config::schema_cache_capacity);
 
-    
LookupConnectionCache::create_global_instance(config::lookup_connection_cache_bytes_limit);
+    _lookup_connection_cache = LookupConnectionCache::create_global_instance(
+            config::lookup_connection_cache_bytes_limit);
 
     // use memory limit
     int64_t inverted_index_cache_limit =
@@ -336,7 +426,8 @@ Status ExecEnv::_init_mem_env() {
         // Reason same as buffer_pool_limit
         inverted_index_cache_limit = inverted_index_cache_limit / 2;
     }
-    
InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit);
+    _inverted_index_searcher_cache =
+            
InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit);
     LOG(INFO) << "Inverted index searcher cache memory limit: "
               << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
               << ", origin config value: " << 
config::inverted_index_searcher_cache_limit;
@@ -349,7 +440,8 @@ Status ExecEnv::_init_mem_env() {
         // Reason same as buffer_pool_limit
         inverted_index_query_cache_limit = inverted_index_query_cache_limit / 
2;
     }
-    
InvertedIndexQueryCache::create_global_cache(inverted_index_query_cache_limit);
+    _inverted_index_query_cache =
+            
InvertedIndexQueryCache::create_global_cache(inverted_index_query_cache_limit);
     LOG(INFO) << "Inverted index query match cache memory limit: "
               << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
               << ", origin config value: " << 
config::inverted_index_query_cache_limit;
@@ -410,58 +502,120 @@ void ExecEnv::_deregister_metrics() {
     DEREGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size);
 }
 
-void ExecEnv::_destroy() {
+// TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a 
Stop method.
+// We need to stop all threads before releasing resource.
+void ExecEnv::destroy() {
     //Only destroy once after init
     if (!ready()) {
         return;
     }
     // Memory barrier to prevent other threads from accessing destructed 
resources
     _s_ready = false;
+
+    SAFE_STOP(_tablet_schema_cache);
+    SAFE_STOP(_load_channel_mgr);
+    SAFE_STOP(_scanner_scheduler);
+    SAFE_STOP(_broker_mgr);
+    SAFE_STOP(_load_path_mgr);
+    SAFE_STOP(_result_mgr);
+    SAFE_STOP(_group_commit_mgr);
+    // _routine_load_task_executor should be stopped before 
_new_load_stream_mgr.
+    SAFE_STOP(_routine_load_task_executor);
+    SAFE_STOP(_pipeline_task_scheduler);
+    SAFE_STOP(_pipeline_task_group_scheduler);
+    SAFE_STOP(_external_scan_context_mgr);
+    SAFE_STOP(_fragment_mgr);
+    // NewLoadStreamMgr should be destoried before storage_engine & after 
fragment_mgr stopped.
+    _new_load_stream_mgr.reset();
+    _stream_load_executor.reset();
+    SAFE_STOP(_storage_engine);
+    SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
+    SAFE_SHUTDOWN(_join_node_thread_pool);
+    SAFE_SHUTDOWN(_send_report_thread_pool);
+    SAFE_SHUTDOWN(_send_batch_thread_pool);
+    SAFE_SHUTDOWN(_serial_download_cache_thread_token);
+    SAFE_SHUTDOWN(_download_cache_thread_pool);
+
+    // Free resource after threads are stopped.
+    // Some threads are still running, like threads created by 
_new_load_stream_mgr ...
+    SAFE_DELETE(_s3_buffer_pool);
+    SAFE_DELETE(_tablet_schema_cache);
     _deregister_metrics();
-    SAFE_DELETE(_internal_client_cache);
-    SAFE_DELETE(_function_client_cache);
     SAFE_DELETE(_load_channel_mgr);
+    _memtable_memory_limiter.reset(nullptr);
+
+    // shared_ptr maybe no need to be reset
+    // _brpc_iobuf_block_memory_tracker.reset();
+    // _page_no_cache_mem_tracker.reset();
+    // _experimental_mem_tracker.reset();
+    // _orphan_mem_tracker.reset();
+
+    SAFE_DELETE(_block_spill_mgr);
+    SAFE_DELETE(_inverted_index_query_cache);
+    SAFE_DELETE(_inverted_index_searcher_cache);
+    SAFE_DELETE(_lookup_connection_cache);
+    SAFE_DELETE(_schema_cache);
+    SAFE_DELETE(_segment_loader);
+    SAFE_DELETE(_row_cache);
+
+    // StorageEngine must be destoried before _page_no_cache_mem_tracker.reset
+    // StorageEngine must be destoried before _cache_manager destory
+    SAFE_DELETE(_storage_engine);
+
+    // _scanner_scheduler must be desotried before _storage_page_cache
+    SAFE_DELETE(_scanner_scheduler);
+    // _storage_page_cache must be destoried before _cache_manager
+    SAFE_DELETE(_storage_page_cache);
+    // cache_manager must be destoried after _inverted_index_query_cache
+    // https://github.com/apache/doris/issues/24082#issuecomment-1712544039
+    SAFE_DELETE(_cache_manager);
+
+    SAFE_DELETE(_small_file_mgr);
     SAFE_DELETE(_broker_mgr);
-    SAFE_DELETE(_bfd_parser);
     SAFE_DELETE(_load_path_mgr);
-    SAFE_DELETE(_pipeline_task_scheduler);
-    SAFE_DELETE(_pipeline_task_group_scheduler);
-    SAFE_DELETE(_task_group_manager);
+    SAFE_DELETE(_result_mgr);
+    SAFE_DELETE(_file_meta_cache);
+    SAFE_DELETE(_group_commit_mgr);
+    SAFE_DELETE(_routine_load_task_executor);
+    // _stream_load_executor
+    SAFE_DELETE(_function_client_cache);
+    SAFE_DELETE(_internal_client_cache);
+
+    SAFE_DELETE(_bfd_parser);
+    SAFE_DELETE(_result_cache);
     SAFE_DELETE(_fragment_mgr);
+    SAFE_DELETE(_task_group_manager);
+    SAFE_DELETE(_pipeline_task_group_scheduler);
+    SAFE_DELETE(_pipeline_task_scheduler);
+    SAFE_DELETE(_file_cache_factory);
+    // TODO(zhiqiang): Maybe we should call shutdown before release thread 
pool?
+    _join_node_thread_pool.reset(nullptr);
+    _send_report_thread_pool.reset(nullptr);
+    _buffered_reader_prefetch_thread_pool.reset(nullptr);
+    _send_batch_thread_pool.reset(nullptr);
+
     SAFE_DELETE(_broker_client_cache);
     SAFE_DELETE(_frontend_client_cache);
     SAFE_DELETE(_backend_client_cache);
-    SAFE_DELETE(_result_mgr);
     SAFE_DELETE(_result_queue_mgr);
-    SAFE_DELETE(_routine_load_task_executor);
+
+    SAFE_DELETE(_vstream_mgr);
     SAFE_DELETE(_external_scan_context_mgr);
+    SAFE_DELETE(_user_function_cache);
+
+    _serial_download_cache_thread_token.reset(nullptr);
+    _download_cache_thread_pool.reset(nullptr);
+
+    // _heartbeat_flags must be destoried after staroge engine
     SAFE_DELETE(_heartbeat_flags);
-    SAFE_DELETE(_scanner_scheduler);
-    SAFE_DELETE(_group_commit_mgr);
-    SAFE_DELETE(_file_meta_cache);
+
     // Master Info is a thrift object, it could be the last one to deconstruct.
     // Master info should be deconstruct later than fragment manager, because 
fragment will
     // access master_info.backend id to access some info. If there is a 
running query and master
     // info is deconstructed then BE process will core at coordinator back 
method in fragment mgr.
     SAFE_DELETE(_master_info);
 
-    _new_load_stream_mgr.reset();
-    _memtable_memory_limiter.reset(nullptr);
-    _send_batch_thread_pool.reset(nullptr);
-    _buffered_reader_prefetch_thread_pool.reset(nullptr);
-    _send_report_thread_pool.reset(nullptr);
-    _join_node_thread_pool.reset(nullptr);
-    _serial_download_cache_thread_token.reset(nullptr);
-    _download_cache_thread_pool.reset(nullptr);
-    _orphan_mem_tracker.reset();
-    _experimental_mem_tracker.reset();
-    _page_no_cache_mem_tracker.reset();
-    _brpc_iobuf_block_memory_tracker.reset();
-    InvertedIndexSearcherCache::reset_global_instance();
-}
-
-void ExecEnv::destroy(ExecEnv* env) {
-    env->_destroy();
+    LOG(INFO) << "Doris exec envorinment is destoried.";
 }
 
 } // namespace doris
diff --git a/be/src/runtime/external_scan_context_mgr.cpp 
b/be/src/runtime/external_scan_context_mgr.cpp
index 2a3dc92521..6c51cfccbe 100644
--- a/be/src/runtime/external_scan_context_mgr.cpp
+++ b/be/src/runtime/external_scan_context_mgr.cpp
@@ -50,7 +50,7 @@ ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv* 
exec_env)
     });
 }
 
-ExternalScanContextMgr::~ExternalScanContextMgr() {
+void ExternalScanContextMgr::stop() {
     DEREGISTER_HOOK_METRIC(active_scan_context_count);
     _stop_background_threads_latch.count_down();
     if (_keep_alive_reaper) {
diff --git a/be/src/runtime/external_scan_context_mgr.h 
b/be/src/runtime/external_scan_context_mgr.h
index 93ae8d360a..4925821f3a 100644
--- a/be/src/runtime/external_scan_context_mgr.h
+++ b/be/src/runtime/external_scan_context_mgr.h
@@ -52,7 +52,9 @@ public:
 class ExternalScanContextMgr {
 public:
     ExternalScanContextMgr(ExecEnv* exec_env);
-    ~ExternalScanContextMgr();
+    ~ExternalScanContextMgr() = default;
+
+    void stop();
 
     Status create_scan_context(std::shared_ptr<ScanContext>* p_context);
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 703019a39b..adbe401356 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -139,7 +139,9 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
     CHECK(s.ok()) << s.to_string();
 }
 
-FragmentMgr::~FragmentMgr() {
+FragmentMgr::~FragmentMgr() {}
+
+void FragmentMgr::stop() {
     DEREGISTER_HOOK_METRIC(plan_fragment_count);
     DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
     _stop_background_threads_latch.count_down();
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 0cf5cf2d58..14c63c559b 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -75,6 +75,8 @@ public:
     FragmentMgr(ExecEnv* exec_env);
     ~FragmentMgr() override;
 
+    void stop();
+
     // execute one plan fragment
     Status exec_plan_fragment(const TExecPlanFragmentParams& params);
 
diff --git a/be/src/runtime/frontend_info.h b/be/src/runtime/frontend_info.h
index c16d63096f..a7e4b3f999 100644
--- a/be/src/runtime/frontend_info.h
+++ b/be/src/runtime/frontend_info.h
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+#pragma once
 
 #include <gen_cpp/HeartbeatService_types.h>
 
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 5c9b3c1637..94c0dba30a 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -362,7 +362,14 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : 
_exec_env(exec_env) {
             .build(&_insert_into_thread_pool);
 }
 
-GroupCommitMgr::~GroupCommitMgr() {}
+GroupCommitMgr::~GroupCommitMgr() {
+    LOG(INFO) << "GroupCommitMgr is destoried";
+}
+
+void GroupCommitMgr::stop() {
+    _insert_into_thread_pool->shutdown();
+    LOG(INFO) << "GroupCommitMgr is stopped";
+}
 
 Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan,
                                            const TDescriptorTable& tdesc_tbl,
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 0d5797ed96..1d124009d1 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -109,6 +109,8 @@ public:
     GroupCommitMgr(ExecEnv* exec_env);
     virtual ~GroupCommitMgr();
 
+    void stop();
+
     // insert into
     Status group_commit_insert(int64_t table_id, const TPlan& plan,
                                const TDescriptorTable& desc_tbl,
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 9987a34d5b..1aaf8772b8 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -73,13 +73,16 @@ LoadChannelMgr::LoadChannelMgr() : 
_stop_background_threads_latch(1) {
 }
 
 LoadChannelMgr::~LoadChannelMgr() {
+    delete _last_success_channel;
+}
+
+void LoadChannelMgr::stop() {
     DEREGISTER_HOOK_METRIC(load_channel_count);
     DEREGISTER_HOOK_METRIC(load_channel_mem_consumption);
     _stop_background_threads_latch.count_down();
     if (_load_channels_clean_thread) {
         _load_channels_clean_thread->join();
     }
-    delete _last_success_channel;
 }
 
 Status LoadChannelMgr::init(int64_t process_mem_limit) {
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index 3c094a4251..a77c0b0cc2 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -63,6 +63,8 @@ public:
     // cancel all tablet stream for 'load_id' load
     Status cancel(const PTabletWriterCancelRequest& request);
 
+    void stop();
+
 private:
     Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& 
is_eof,
                              const UniqueId& load_id, const 
PTabletWriterAddBlockRequest& request);
diff --git a/be/src/runtime/load_path_mgr.cpp b/be/src/runtime/load_path_mgr.cpp
index c9a5f18183..e241eeafea 100644
--- a/be/src/runtime/load_path_mgr.cpp
+++ b/be/src/runtime/load_path_mgr.cpp
@@ -56,7 +56,7 @@ LoadPathMgr::LoadPathMgr(ExecEnv* exec_env)
           _error_path_next_shard(0),
           _stop_background_threads_latch(1) {}
 
-LoadPathMgr::~LoadPathMgr() {
+void LoadPathMgr::stop() {
     _stop_background_threads_latch.count_down();
     if (_clean_thread) {
         _clean_thread->join();
diff --git a/be/src/runtime/load_path_mgr.h b/be/src/runtime/load_path_mgr.h
index 73703517c6..de443f059b 100644
--- a/be/src/runtime/load_path_mgr.h
+++ b/be/src/runtime/load_path_mgr.h
@@ -39,9 +39,10 @@ class Thread;
 class LoadPathMgr {
 public:
     LoadPathMgr(ExecEnv* env);
-    ~LoadPathMgr();
+    ~LoadPathMgr() = default;
 
     Status init();
+    void stop();
 
     Status allocate_dir(const std::string& db, const std::string& label, 
std::string* prefix);
 
diff --git a/be/src/runtime/memory/cache_manager.h 
b/be/src/runtime/memory/cache_manager.h
index fd7d5875b0..8fdce10d69 100644
--- a/be/src/runtime/memory/cache_manager.h
+++ b/be/src/runtime/memory/cache_manager.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "runtime/exec_env.h"
 #include "runtime/memory/cache_policy.h"
 #include "util/runtime_profile.h"
 
@@ -25,12 +26,11 @@ namespace doris {
 // Hold the list of all caches, for prune when memory not enough or timing.
 class CacheManager {
 public:
-    static void create_global_instance() {
-        DCHECK(_s_instance == nullptr);
-        static CacheManager instance;
-        _s_instance = &instance;
+    static CacheManager* create_global_instance() {
+        CacheManager* res = new CacheManager();
+        return res;
     }
-    static CacheManager* instance() { return _s_instance; }
+    static CacheManager* instance() { return 
ExecEnv::GetInstance()->get_cache_manager(); }
 
     std::list<CachePolicy*>::iterator register_cache(CachePolicy* cache) {
         std::lock_guard<std::mutex> l(_caches_lock);
@@ -55,8 +55,6 @@ public:
     void clear_once(CachePolicy::CacheType type);
 
 private:
-    static inline CacheManager* _s_instance = nullptr;
-
     std::mutex _caches_lock;
     std::list<CachePolicy*> _caches;
 };
diff --git a/be/src/runtime/result_buffer_mgr.cpp 
b/be/src/runtime/result_buffer_mgr.cpp
index 32eeb702df..a3b99300f2 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -47,7 +47,7 @@ ResultBufferMgr::ResultBufferMgr() : 
_stop_background_threads_latch(1) {
     });
 }
 
-ResultBufferMgr::~ResultBufferMgr() {
+void ResultBufferMgr::stop() {
     DEREGISTER_HOOK_METRIC(result_buffer_block_count);
     _stop_background_threads_latch.count_down();
     if (_clean_thread) {
diff --git a/be/src/runtime/result_buffer_mgr.h 
b/be/src/runtime/result_buffer_mgr.h
index f164a14b93..8c9b621968 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -47,9 +47,12 @@ class Thread;
 class ResultBufferMgr {
 public:
     ResultBufferMgr();
-    ~ResultBufferMgr();
+    ~ResultBufferMgr() = default;
     // init Result Buffer Mgr, start cancel thread
     Status init();
+
+    void stop();
+
     // create one result sender for this query_id
     // the returned sender do not need release
     // sender is not used when call cancel or unregister
diff --git a/be/src/runtime/routine_load/data_consumer_pool.h 
b/be/src/runtime/routine_load/data_consumer_pool.h
index 15baa2be6e..25dbc57fb7 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.h
+++ b/be/src/runtime/routine_load/data_consumer_pool.h
@@ -41,7 +41,9 @@ public:
     DataConsumerPool(int64_t max_pool_size)
             : _max_pool_size(max_pool_size), _stop_background_threads_latch(1) 
{}
 
-    ~DataConsumerPool() {
+    ~DataConsumerPool() = default;
+
+    void stop() {
         _stop_background_threads_latch.count_down();
         if (_clean_idle_consumer_thread) {
             _clean_idle_consumer_thread->join();
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 72285930b4..e5c48f78d9 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -76,12 +76,15 @@ RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* 
exec_env)
 }
 
 RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
+    LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup";
+    _task_map.clear();
+}
+
+void RoutineLoadTaskExecutor::stop() {
     DEREGISTER_HOOK_METRIC(routine_load_task_count);
     _thread_pool.shutdown();
     _thread_pool.join();
-
-    LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup";
-    _task_map.clear();
+    _data_consumer_pool.stop();
 }
 
 // Create a temp StreamLoadContext and set some kafka connection info in it.
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index 90c1a06400..6714ce6902 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -51,6 +51,8 @@ public:
 
     ~RoutineLoadTaskExecutor();
 
+    void stop();
+
     // submit a routine load task
     Status submit_task(const TRoutineLoadTask& task);
 
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index 6ce6d31604..179bf8911a 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -28,9 +28,6 @@
 
 namespace doris::taskgroup {
 
-TaskGroupManager::TaskGroupManager() = default;
-TaskGroupManager::~TaskGroupManager() = default;
-
 TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& 
task_group_info) {
     {
         std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index 375208dc6e..baa6579b15 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -29,8 +29,8 @@ namespace taskgroup {
 
 class TaskGroupManager {
 public:
-    TaskGroupManager();
-    ~TaskGroupManager();
+    TaskGroupManager() = default;
+    ~TaskGroupManager() = default;
 
     TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& 
task_group_info);
 
diff --git a/be/src/runtime/user_function_cache.cpp 
b/be/src/runtime/user_function_cache.cpp
index 39507ec222..9f07ba5902 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -39,6 +39,7 @@
 #include "http/http_client.h"
 #include "io/fs/file_system.h"
 #include "io/fs/local_file_system.h"
+#include "runtime/exec_env.h"
 #include "util/dynamic_util.h"
 #include "util/md5.h"
 #include "util/spinlock.h"
@@ -122,8 +123,7 @@ UserFunctionCache::~UserFunctionCache() {
 }
 
 UserFunctionCache* UserFunctionCache::instance() {
-    static UserFunctionCache s_cache;
-    return &s_cache;
+    return ExecEnv::GetInstance()->user_function_cache();
 }
 
 Status UserFunctionCache::init(const std::string& lib_dir) {
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index f35c49cbd2..57219f584e 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -86,8 +86,16 @@ Status BRpcService::start(int port, int num_threads) {
 }
 
 void BRpcService::join() {
-    _server->Stop(1000);
-    _server->Join();
+    int stop_succeed = _server->Stop(1000);
+
+    if (stop_succeed == 0) {
+        _server->Join();
+    } else {
+        LOG(WARNING) << "Failed to stop brpc service, "
+                     << "not calling brpc server join since it will never 
retrun."
+                     << "maybe something bad will happen, let us know if you 
meet something error.";
+    }
+
     _server->ClearServices();
 }
 
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index fd55c5a211..abfa7913e7 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -57,7 +57,6 @@
 #include "common/daemon.h"
 #include "common/logging.h"
 #include "common/phdr_cache.h"
-#include "common/resource_tls.h"
 #include "common/signal_handler.h"
 #include "common/status.h"
 #include "io/cache/block/block_file_cache_factory.h"
@@ -449,49 +448,6 @@ int main(int argc, char** argv) {
     // Or our own sig-handler for SIGINT & SIGTERM will not be chained ...
     // https://www.oracle.com/java/technologies/javase/signals.html
     doris::init_signals();
-
-    // Load file cache before starting up daemon threads to make sure 
StorageEngine is read.
-    if (doris::config::enable_file_cache) {
-        doris::io::IFileCache::init();
-        std::unordered_set<std::string> cache_path_set;
-        std::vector<doris::CachePath> cache_paths;
-        olap_res = 
doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
-        if (!olap_res) {
-            LOG(FATAL) << "parse config file cache path failed, path="
-                       << doris::config::file_cache_path;
-            exit(-1);
-        }
-
-        std::unique_ptr<doris::ThreadPool> file_cache_init_pool;
-        doris::ThreadPoolBuilder("FileCacheInitThreadPool")
-                .set_min_threads(cache_paths.size())
-                .set_max_threads(cache_paths.size())
-                .build(&file_cache_init_pool);
-
-        std::list<doris::Status> cache_status;
-        for (auto& cache_path : cache_paths) {
-            if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
-                LOG(WARNING) << fmt::format("cache path {} is duplicate", 
cache_path.path);
-                continue;
-            }
-
-            RETURN_IF_ERROR(file_cache_init_pool->submit_func(
-                    std::bind(&doris::io::FileCacheFactory::create_file_cache,
-                              &(doris::io::FileCacheFactory::instance()), 
cache_path.path,
-                              cache_path.init_settings(), 
&(cache_status.emplace_back()))));
-
-            cache_path_set.emplace(cache_path.path);
-        }
-
-        file_cache_init_pool->wait();
-        for (const auto& status : cache_status) {
-            if (!status.ok()) {
-                LOG(FATAL) << "failed to init file cache, err: " << status;
-                exit(-1);
-            }
-        }
-    }
-
     // ATTN: MUST init before `ExecEnv`, `StorageEngine` and other daemon 
services
     //
     //       Daemon ───┬──► StorageEngine ──► ExecEnv ──► Disk/Mem/CpuInfo
@@ -501,7 +457,6 @@ int main(int argc, char** argv) {
     doris::CpuInfo::init();
     doris::DiskInfo::init();
     doris::MemInfo::init();
-    
doris::UserFunctionCache::instance()->init(doris::config::user_function_dir);
 
     LOG(INFO) << doris::CpuInfo::debug_string();
     LOG(INFO) << doris::DiskInfo::debug_string();
@@ -511,41 +466,17 @@ int main(int argc, char** argv) {
     // will work only after additional call of this function.
     // rewrites dl_iterate_phdr will cause Jemalloc to fail to run after 
enable profile. see #
     // updatePHDRCache();
-
-    doris::ResourceTls::init();
     if (!doris::BackendOptions::init()) {
         exit(-1);
     }
 
     // init exec env
-    auto exec_env = doris::ExecEnv::GetInstance();
-    doris::ExecEnv::init(exec_env, paths);
-    doris::TabletSchemaCache::create_global_schema_cache();
-
-    // init s3 write buffer pool
-    doris::io::S3FileBufferPool* s3_buffer_pool = 
doris::io::S3FileBufferPool::GetInstance();
-    s3_buffer_pool->init(doris::config::s3_write_buffer_whole_size,
-                         doris::config::s3_write_buffer_size,
-                         exec_env->buffered_reader_prefetch_thread_pool());
-
-    // init and open storage engine
-    doris::EngineOptions options;
-    options.store_paths = paths;
-    options.backend_uid = doris::UniqueId::gen_uid();
-    std::unique_ptr<doris::StorageEngine> engine;
-    auto st = doris::StorageEngine::open(options, &engine);
-    if (!st.ok()) {
-        LOG(FATAL) << "fail to open StorageEngine, res=" << st;
+    auto exec_env(doris::ExecEnv::GetInstance());
+    status = doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths);
+    if (status != Status::OK()) {
+        LOG(FATAL) << "failed to init doris storage engine, res=" << status;
         exit(-1);
     }
-    engine->set_heartbeat_flags(exec_env->heartbeat_flags());
-
-    // start all background threads of storage engine.
-    // SHOULD be called after exec env is initialized.
-    EXIT_IF_ERROR(engine->start_bg_threads());
-
-    doris::Daemon daemon;
-    daemon.start();
 
     doris::telemetry::init_tracer();
 
@@ -563,8 +494,9 @@ int main(int argc, char** argv) {
     }
 
     // 2. bprc service
-    doris::BRpcService brpc_service(exec_env);
-    status = brpc_service.start(doris::config::brpc_port, 
doris::config::brpc_num_threads);
+    std::unique_ptr<doris::BRpcService> brpc_service =
+            std::make_unique<doris::BRpcService>(exec_env);
+    status = brpc_service->start(doris::config::brpc_port, 
doris::config::brpc_num_threads);
     if (!status.ok()) {
         LOG(ERROR) << "BRPC service did not start correctly, exiting";
         doris::shutdown_logging();
@@ -572,9 +504,9 @@ int main(int argc, char** argv) {
     }
 
     // 3. http service
-    doris::HttpService http_service(exec_env, doris::config::webserver_port,
-                                    doris::config::webserver_num_workers);
-    status = http_service.start();
+    std::unique_ptr<doris::HttpService> http_service = 
std::make_unique<doris::HttpService>(
+            exec_env, doris::config::webserver_port, 
doris::config::webserver_num_workers);
+    status = http_service->start();
     if (!status.ok()) {
         LOG(ERROR) << "Doris Be http service did not start correctly, exiting";
         doris::shutdown_logging();
@@ -605,6 +537,10 @@ int main(int argc, char** argv) {
     std::shared_ptr<doris::flight::FlightSqlServer> flight_server =
             std::move(doris::flight::FlightSqlServer::create()).ValueOrDie();
     status = flight_server->init(doris::config::arrow_flight_port);
+
+    // 6. start daemon thread to do clean or gc jobs
+    doris::Daemon daemon;
+    daemon.start();
     if (!status.ok()) {
         LOG(ERROR) << "Arrow Flight Service did not start correctly, exiting, "
                    << status.to_string();
@@ -618,10 +554,26 @@ int main(int argc, char** argv) {
 #endif
         sleep(3);
     }
-
+    LOG(INFO) << "Doris main exiting.";
     // For graceful shutdown, need to wait for all running queries to stop
     exec_env->wait_for_all_tasks_done();
-
+    daemon.stop();
+    flight_server.reset();
+    LOG(INFO) << "Flight server stopped.";
+    heartbeat_thrift_server->stop();
+    heartbeat_thrift_server.reset(nullptr);
+    LOG(INFO) << "Heartbeat server stopped";
+    // TODO(zhiqiang): http_service
+    http_service->stop();
+    http_service.reset(nullptr);
+    LOG(INFO) << "Http service stopped";
+    be_server->stop();
+    be_server.reset(nullptr);
+    LOG(INFO) << "Be server stopped";
+    brpc_service.reset(nullptr);
+    LOG(INFO) << "Brpc service stopped";
+    exec_env->destroy();
+    LOG(INFO) << "Doris main exited.";
     return 0;
 }
 
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index dcc3082972..fd8c640d20 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -239,8 +239,12 @@ Status HttpService::start() {
 }
 
 void HttpService::stop() {
+    if (stopped) {
+        return;
+    }
     _ev_http_server->stop();
     _pool.clear();
+    stopped = true;
 }
 
 } // namespace doris
diff --git a/be/src/service/http_service.h b/be/src/service/http_service.h
index dea8c4141c..05d36fdd75 100644
--- a/be/src/service/http_service.h
+++ b/be/src/service/http_service.h
@@ -43,6 +43,8 @@ private:
 
     std::unique_ptr<EvHttpServer> _ev_http_server;
     std::unique_ptr<WebPageHandler> _web_page_handler;
+
+    bool stopped = false;
 };
 
 } // namespace doris
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index df762226a2..a1cf85f05a 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -33,6 +33,7 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
 #include "olap/tablet_schema.h"
+#include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
 #include "util/key_util.h"
 #include "util/runtime_profile.h"
@@ -101,15 +102,12 @@ int64_t Reusable::mem_size() const {
     return _mem_size;
 }
 
-LookupConnectionCache* LookupConnectionCache::_s_instance = nullptr;
-void LookupConnectionCache::create_global_instance(size_t capacity) {
-    DCHECK(_s_instance == nullptr);
-    static LookupConnectionCache instance(capacity);
-    _s_instance = &instance;
+LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t 
capacity) {
+    DCHECK(ExecEnv::GetInstance()->get_lookup_connection_cache() == nullptr);
+    LookupConnectionCache* res = new LookupConnectionCache(capacity);
+    return res;
 }
 
-RowCache* RowCache::_s_instance = nullptr;
-
 RowCache::RowCache(int64_t capacity, int num_shards) {
     // Create Row Cache
     _cache = std::unique_ptr<Cache>(
@@ -117,14 +115,14 @@ RowCache::RowCache(int64_t capacity, int num_shards) {
 }
 
 // Create global instance of this class
-void RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) {
-    DCHECK(_s_instance == nullptr);
-    static RowCache instance(capacity, num_shards);
-    _s_instance = &instance;
+RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) 
{
+    DCHECK(ExecEnv::GetInstance()->get_row_cache() == nullptr);
+    RowCache* res = new RowCache(capacity, num_shards);
+    return res;
 }
 
 RowCache* RowCache::instance() {
-    return _s_instance;
+    return ExecEnv::GetInstance()->get_row_cache();
 }
 
 bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) {
diff --git a/be/src/service/point_query_executor.h 
b/be/src/service/point_query_executor.h
index 180af54fdd..4e51217c3b 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -46,6 +46,7 @@
 #include "olap/tablet.h"
 #include "olap/utils.h"
 #include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
 #include "util/mysql_global.h"
 #include "util/runtime_profile.h"
 #include "util/slice.h"
@@ -161,7 +162,7 @@ public:
     };
 
     // Create global instance of this class
-    static void create_global_cache(int64_t capacity, uint32_t num_shards = 
kDefaultNumShards);
+    static RowCache* create_global_cache(int64_t capacity, uint32_t num_shards 
= kDefaultNumShards);
 
     static RowCache* instance();
 
@@ -183,7 +184,6 @@ public:
 private:
     static constexpr uint32_t kDefaultNumShards = 128;
     RowCache(int64_t capacity, int num_shards = kDefaultNumShards);
-    static RowCache* _s_instance;
     std::unique_ptr<Cache> _cache = nullptr;
 };
 
@@ -191,9 +191,11 @@ private:
 // One connection per stmt perf uuid
 class LookupConnectionCache : public LRUCachePolicy {
 public:
-    static LookupConnectionCache* instance() { return _s_instance; }
+    static LookupConnectionCache* instance() {
+        return ExecEnv::GetInstance()->get_lookup_connection_cache();
+    }
 
-    static void create_global_instance(size_t capacity);
+    static LookupConnectionCache* create_global_instance(size_t capacity);
 
 private:
     friend class PointQueryExecutor;
@@ -240,8 +242,6 @@ private:
     struct CacheValue : public LRUCacheValueBase {
         std::shared_ptr<Reusable> item = nullptr;
     };
-
-    static LookupConnectionCache* _s_instance;
 };
 
 struct Metrics {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 87903e480a..439a843c3a 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -61,6 +61,17 @@ ScannerScheduler::~ScannerScheduler() {
         return;
     }
 
+    for (int i = 0; i < QUEUE_NUM; i++) {
+        delete _pending_queues[i];
+    }
+    delete[] _pending_queues;
+}
+
+void ScannerScheduler::stop() {
+    if (!_is_init) {
+        return;
+    }
+
     for (int i = 0; i < QUEUE_NUM; i++) {
         _pending_queues[i]->shutdown();
     }
@@ -80,13 +91,10 @@ ScannerScheduler::~ScannerScheduler() {
     _limited_scan_thread_pool->wait();
     _group_local_scan_thread_pool->wait();
 
-    for (int i = 0; i < QUEUE_NUM; i++) {
-        delete _pending_queues[i];
-    }
-    delete[] _pending_queues;
+    LOG(INFO) << "ScannerScheduler stopped";
 }
 
-Status ScannerScheduler::init(ExecEnv* env) {
+Status ScannerScheduler::init() {
     // 1. scheduling thread pool and scheduling queues
     ThreadPoolBuilder("SchedulingThreadPool")
             .set_min_threads(QUEUE_NUM)
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index e669fd9b77..366275eb41 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -64,10 +64,12 @@ public:
     ScannerScheduler();
     ~ScannerScheduler();
 
-    Status init(ExecEnv* env);
+    [[nodiscard]] Status init();
 
     [[nodiscard]] Status submit(ScannerContext* ctx);
 
+    void stop();
+
     std::unique_ptr<ThreadPoolToken> 
new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
                                                                  int 
max_concurrency);
     taskgroup::ScanTaskTaskGroupQueue* local_scan_task_queue() {
diff --git a/be/test/common/resource_tls_test.cpp 
b/be/test/common/resource_tls_test.cpp
deleted file mode 100644
index e09227eb03..0000000000
--- a/be/test/common/resource_tls_test.cpp
+++ /dev/null
@@ -1,49 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "common/resource_tls.h"
-
-#include <gtest/gtest-message.h>
-#include <gtest/gtest-test-part.h>
-
-#include <memory>
-
-#include "gen_cpp/Types_types.h"
-#include "gtest/gtest_pred_impl.h"
-
-namespace doris {
-
-class ResourceTlsTest : public testing::Test {};
-
-TEST_F(ResourceTlsTest, EmptyTest) {
-    EXPECT_TRUE(ResourceTls::get_resource_tls() == nullptr);
-    EXPECT_TRUE(ResourceTls::set_resource_tls((TResourceInfo*)1) != 0);
-}
-
-TEST_F(ResourceTlsTest, NormalTest) {
-    ResourceTls::init();
-    EXPECT_TRUE(ResourceTls::get_resource_tls() == nullptr);
-    TResourceInfo* info = new TResourceInfo();
-    info->user = "testUser";
-    info->group = "testGroup";
-    EXPECT_TRUE(ResourceTls::set_resource_tls(info) == 0);
-    TResourceInfo* getInfo = ResourceTls::get_resource_tls();
-    EXPECT_STREQ("testUser", getInfo->user.c_str());
-    EXPECT_STREQ("testGroup", getInfo->group.c_str());
-}
-
-} // namespace doris
diff --git a/be/test/olap/delete_bitmap_calculator_test.cpp 
b/be/test/olap/delete_bitmap_calculator_test.cpp
index 6e842edc36..53bdb9c0fb 100644
--- a/be/test/olap/delete_bitmap_calculator_test.cpp
+++ b/be/test/olap/delete_bitmap_calculator_test.cpp
@@ -40,6 +40,7 @@
 #include "olap/tablet_meta.h"
 #include "olap/tablet_schema.h"
 #include "olap/tablet_schema_helper.h"
+#include "runtime/exec_env.h"
 
 namespace doris {
 using namespace ErrorCode;
@@ -73,7 +74,7 @@ public:
         
EXPECT_TRUE(io::global_local_filesystem()->delete_and_create_directory(kSegmentDir).ok());
         doris::EngineOptions options;
         k_engine = new StorageEngine(options);
-        StorageEngine::_s_instance = k_engine;
+        ExecEnv::GetInstance()->set_storage_engine(k_engine);
     }
 
     void TearDown() override {
@@ -82,6 +83,7 @@ public:
             k_engine->stop();
             delete k_engine;
             k_engine = nullptr;
+            ExecEnv::GetInstance()->set_storage_engine(nullptr);
         }
     }
 
diff --git a/be/test/olap/delete_handler_test.cpp 
b/be/test/olap/delete_handler_test.cpp
index 605e3e389c..b338320d0f 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -29,6 +29,7 @@
 
 #include <cstdlib>
 #include <iostream>
+#include <memory>
 #include <string>
 #include <vector>
 
@@ -46,6 +47,7 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
 #include "olap/tablet_manager.h"
+#include "runtime/exec_env.h"
 #include "util/cpu_info.h"
 
 using namespace std;
@@ -78,8 +80,10 @@ static void set_up() {
 
     doris::EngineOptions options;
     options.store_paths = paths;
-    Status s = doris::StorageEngine::open(options, &k_engine);
+    k_engine = std::make_unique<StorageEngine>(options);
+    Status s = k_engine->open();
     EXPECT_TRUE(s.ok()) << s.to_string();
+    ExecEnv::GetInstance()->set_storage_engine(k_engine.get());
 }
 
 static void tear_down() {
@@ -90,6 +94,7 @@ static void tear_down() {
     EXPECT_TRUE(io::global_local_filesystem()
                         ->delete_directory(string(getenv("DORIS_HOME")) + "/" 
+ UNUSED_PREFIX)
                         .ok());
+    ExecEnv::GetInstance()->set_storage_engine(nullptr);
     k_engine.reset();
 }
 
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index a8dfa4a760..6fc57fd15d 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -79,17 +79,20 @@ static void set_up() {
 
     doris::EngineOptions options;
     options.store_paths = paths;
-    Status s = doris::StorageEngine::open(options, &k_engine);
+    k_engine = std::make_unique<StorageEngine>(options);
+    Status s = k_engine->open();
     EXPECT_TRUE(s.ok()) << s.to_string();
 
     ExecEnv* exec_env = doris::ExecEnv::GetInstance();
     exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
+    exec_env->set_storage_engine(k_engine.get());
     k_engine->start_bg_threads();
 }
 
 static void tear_down() {
     ExecEnv* exec_env = doris::ExecEnv::GetInstance();
     exec_env->set_memtable_memory_limiter(nullptr);
+    exec_env->set_storage_engine(nullptr);
     k_engine.reset();
     EXPECT_EQ(system("rm -rf ./data_test"), 0);
     
io::global_local_filesystem()->delete_directory(std::string(getenv("DORIS_HOME"))
 + "/" +
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
index 706cb74979..037e2a70c7 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -77,16 +77,19 @@ static void set_up() {
 
     doris::EngineOptions options;
     options.store_paths = paths;
-    Status s = doris::StorageEngine::open(options, &k_engine);
+    k_engine = std::make_unique<StorageEngine>(options);
+    Status s = k_engine->open();
     EXPECT_TRUE(s.ok()) << s.to_string();
     ExecEnv* exec_env = doris::ExecEnv::GetInstance();
-    k_engine->start_bg_threads();
     exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
+    exec_env->set_storage_engine(k_engine.get());
+    k_engine->start_bg_threads();
 }
 
 static void tear_down() {
     ExecEnv* exec_env = doris::ExecEnv::GetInstance();
     exec_env->set_memtable_memory_limiter(nullptr);
+    exec_env->set_storage_engine(nullptr);
     k_engine.reset();
     EXPECT_EQ(system("rm -rf ./data_test_1"), 0);
     EXPECT_EQ(system("rm -rf ./data_test_2"), 0);
diff --git a/be/test/olap/memtable_flush_executor_test.cpp 
b/be/test/olap/memtable_flush_executor_test.cpp
index efe95e36a3..283b010b76 100644
--- a/be/test/olap/memtable_flush_executor_test.cpp
+++ b/be/test/olap/memtable_flush_executor_test.cpp
@@ -55,13 +55,15 @@ void set_up() {
 
     doris::EngineOptions options;
     options.store_paths = paths;
-    Status s = doris::StorageEngine::open(options, &k_engine);
+    k_engine = std::make_unique<StorageEngine>(options);
+    Status s = k_engine->open();
     EXPECT_TRUE(s.ok()) << s.to_string();
-
+    ExecEnv::GetInstance()->set_storage_engine(k_engine.get());
     k_flush_executor = k_engine->memtable_flush_executor();
 }
 
 void tear_down() {
+    ExecEnv::GetInstance()->set_storage_engine(nullptr);
     k_engine.reset();
     system("rm -rf ./flush_test");
     EXPECT_TRUE(io::global_local_filesystem()
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp 
b/be/test/olap/memtable_memory_limiter_test.cpp
index 187fc09cb4..22a8421fd9 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -85,15 +85,23 @@ protected:
 
         doris::EngineOptions options;
         options.store_paths = paths;
-        Status s = doris::StorageEngine::open(options, &_engine);
+        _engine = std::make_unique<StorageEngine>(options);
+        Status st = _engine->open();
+        EXPECT_TRUE(st.ok()) << st.to_string();
+
         ExecEnv* exec_env = doris::ExecEnv::GetInstance();
-        _engine->start_bg_threads();
+        // ExecEnv's storage_engine will be read by storage_engine's other 
operations.
+        // So we must do this before storage engine's other operation.
+        exec_env->set_storage_engine(_engine.get());
         exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
+        _engine->start_bg_threads();
     }
 
     void TearDown() override {
         ExecEnv* exec_env = doris::ExecEnv::GetInstance();
         exec_env->set_memtable_memory_limiter(nullptr);
+        exec_env->set_storage_engine(nullptr);
+        _engine.reset(nullptr);
         EXPECT_EQ(system("rm -rf ./data_test"), 0);
         
io::global_local_filesystem()->delete_directory(std::string(getenv("DORIS_HOME"))
 + "/" +
                                                         UNUSED_PREFIX);
diff --git a/be/test/olap/ordered_data_compaction_test.cpp 
b/be/test/olap/ordered_data_compaction_test.cpp
index ab0562a3ac..b53689c229 100644
--- a/be/test/olap/ordered_data_compaction_test.cpp
+++ b/be/test/olap/ordered_data_compaction_test.cpp
@@ -61,6 +61,7 @@
 #include "olap/tablet_meta.h"
 #include "olap/tablet_schema.h"
 #include "olap/utils.h"
+#include "runtime/exec_env.h"
 #include "util/uid_util.h"
 #include "vec/columns/column.h"
 #include "vec/core/block.h"
@@ -89,8 +90,7 @@ protected:
         _data_dir->update_capacity();
         doris::EngineOptions options;
         k_engine = new StorageEngine(options);
-        StorageEngine::_s_instance = k_engine;
-
+        ExecEnv::GetInstance()->set_storage_engine(k_engine);
         config::enable_ordered_data_compaction = true;
         config::ordered_data_compaction_min_segment_size = 10;
     }
@@ -100,6 +100,7 @@ protected:
             k_engine->stop();
             delete k_engine;
             k_engine = nullptr;
+            ExecEnv::GetInstance()->set_storage_engine(nullptr);
         }
     }
 
diff --git a/be/test/olap/rowid_conversion_test.cpp 
b/be/test/olap/rowid_conversion_test.cpp
index 95b34cf0f8..6592cb53dd 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -54,6 +54,7 @@
 #include "olap/tablet.h"
 #include "olap/tablet_meta.h"
 #include "olap/tablet_schema.h"
+#include "runtime/exec_env.h"
 #include "util/uid_util.h"
 #include "vec/columns/column.h"
 #include "vec/core/block.h"
@@ -77,7 +78,7 @@ protected:
                             .ok());
         doris::EngineOptions options;
         k_engine = new StorageEngine(options);
-        StorageEngine::_s_instance = k_engine;
+        ExecEnv::GetInstance()->set_storage_engine(k_engine);
     }
 
     void TearDown() override {
@@ -86,6 +87,7 @@ protected:
             k_engine->stop();
             delete k_engine;
             k_engine = nullptr;
+            ExecEnv::GetInstance()->set_storage_engine(nullptr);
         }
     }
 
diff --git a/be/test/olap/rowset/beta_rowset_test.cpp 
b/be/test/olap/rowset/beta_rowset_test.cpp
index 5e259745d6..81e587370a 100644
--- a/be/test/olap/rowset/beta_rowset_test.cpp
+++ b/be/test/olap/rowset/beta_rowset_test.cpp
@@ -100,13 +100,18 @@ public:
 
         doris::EngineOptions options;
         options.store_paths = paths;
-        Status s = doris::StorageEngine::open(options, &k_engine);
+        k_engine = std::make_unique<StorageEngine>(options);
+        Status s = k_engine->open();
         EXPECT_TRUE(s.ok()) << s.to_string();
+        ExecEnv::GetInstance()->set_storage_engine(k_engine.get());
 
         
EXPECT_TRUE(io::global_local_filesystem()->create_directory(kTestDir).ok());
     }
 
-    static void TearDownTestSuite() { k_engine.reset(); }
+    static void TearDownTestSuite() {
+        ExecEnv::GetInstance()->set_storage_engine(nullptr);
+        k_engine.reset();
+    }
 
 protected:
     OlapReaderStatistics _stats;
diff --git a/be/test/olap/rowset/rowset_meta_manager_test.cpp 
b/be/test/olap/rowset/rowset_meta_manager_test.cpp
index a747d1fa2c..044b95ff97 100644
--- a/be/test/olap/rowset/rowset_meta_manager_test.cpp
+++ b/be/test/olap/rowset/rowset_meta_manager_test.cpp
@@ -36,6 +36,7 @@
 #include "olap/olap_meta.h"
 #include "olap/options.h"
 #include "olap/storage_engine.h"
+#include "runtime/exec_env.h"
 #include "util/uid_util.h"
 
 using ::testing::_;
@@ -60,6 +61,7 @@ public:
         if (k_engine == nullptr) {
             k_engine = new StorageEngine(options);
         }
+        ExecEnv::GetInstance()->set_storage_engine(k_engine);
 
         std::string meta_path = "./meta";
         EXPECT_TRUE(std::filesystem::create_directory(meta_path));
@@ -83,6 +85,7 @@ public:
 
     virtual void TearDown() {
         SAFE_DELETE(_meta);
+        ExecEnv::GetInstance()->set_storage_engine(nullptr);
         SAFE_DELETE(k_engine);
         EXPECT_TRUE(std::filesystem::remove_all("./meta"));
         LOG(INFO) << "TearDown";
diff --git a/be/test/olap/tablet_cooldown_test.cpp 
b/be/test/olap/tablet_cooldown_test.cpp
index 03fa82c3cc..e475791da3 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -248,13 +248,17 @@ public:
 
         EngineOptions options;
         options.store_paths = paths;
-        doris::StorageEngine::open(options, &k_engine);
+        k_engine = std::make_unique<StorageEngine>(options);
+        auto st = k_engine->open();
+        EXPECT_TRUE(st.ok()) << st.to_string();
         ExecEnv* exec_env = doris::ExecEnv::GetInstance();
         exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
+        exec_env->set_storage_engine(k_engine.get());
     }
 
     static void TearDownTestSuite() {
         ExecEnv* exec_env = doris::ExecEnv::GetInstance();
+        exec_env->set_storage_engine(nullptr);
         exec_env->set_memtable_memory_limiter(nullptr);
         k_engine.reset();
     }
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index 5954d0329f..a9a4bb940d 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -41,6 +41,7 @@
 #include "olap/tablet_manager.h"
 #include "olap/tablet_meta.h"
 #include "olap/tablet_meta_manager.h"
+#include "runtime/exec_env.h"
 #include "util/uid_util.h"
 
 using ::testing::_;
@@ -66,6 +67,7 @@ public:
         // won't open engine, options.path is needless
         options.backend_uid = UniqueId::gen_uid();
         k_engine = new StorageEngine(options);
+        ExecEnv::GetInstance()->set_storage_engine(k_engine);
         _data_dir = new DataDir(_engine_data_path, 1000000000);
         _data_dir->init();
         _tablet_mgr = k_engine->tablet_manager();
@@ -77,6 +79,7 @@ public:
         if (k_engine != nullptr) {
             k_engine->stop();
         }
+        ExecEnv::GetInstance()->set_storage_engine(nullptr);
         SAFE_DELETE(k_engine);
         _tablet_mgr = nullptr;
     }
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index 838db9b0d6..3fc32d3b3d 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -34,6 +34,7 @@
 #include "olap/storage_policy.h"
 #include "olap/tablet_meta.h"
 #include "olap/utils.h"
+#include "runtime/exec_env.h"
 #include "testutil/mock_rowset.h"
 #include "util/time.h"
 #include "util/uid_util.h"
@@ -86,7 +87,7 @@ public:
 
         doris::EngineOptions options;
         k_engine = new StorageEngine(options);
-        StorageEngine::_s_instance = k_engine;
+        ExecEnv::GetInstance()->set_storage_engine(k_engine);
     }
 
     void TearDown() override {
@@ -95,6 +96,7 @@ public:
             k_engine->stop();
             delete k_engine;
             k_engine = nullptr;
+            ExecEnv::GetInstance()->set_storage_engine(nullptr);
         }
     }
 
diff --git a/be/test/olap/txn_manager_test.cpp 
b/be/test/olap/txn_manager_test.cpp
index 2d27576b5a..fe05e206c7 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -113,6 +113,7 @@ public:
         if (k_engine == nullptr) {
             k_engine = new StorageEngine(options);
         }
+        ExecEnv::GetInstance()->set_storage_engine(k_engine);
 
         std::string meta_path = "./meta";
         std::filesystem::remove_all("./meta");
diff --git a/be/test/runtime/external_scan_context_mgr_test.cpp 
b/be/test/runtime/external_scan_context_mgr_test.cpp
index ba4febda10..c313916476 100644
--- a/be/test/runtime/external_scan_context_mgr_test.cpp
+++ b/be/test/runtime/external_scan_context_mgr_test.cpp
@@ -38,13 +38,11 @@ public:
         _exec_env._fragment_mgr = fragment_mgr;
         _exec_env._result_queue_mgr = result_queue_mgr;
     }
-    virtual ~ExternalScanContextMgrTest() {
-        delete _exec_env._fragment_mgr;
-        delete _exec_env._result_queue_mgr;
-    }
+    ~ExternalScanContextMgrTest() = default;
 
 protected:
-    virtual void SetUp() {}
+    void SetUp() override { _exec_env.set_ready(); }
+    void TearDown() override { _exec_env.destroy(); }
 
 private:
     ExecEnv _exec_env;
diff --git a/be/test/runtime/load_stream_test.cpp 
b/be/test/runtime/load_stream_test.cpp
index dae8b93e19..f3c9b55cd7 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -29,6 +29,7 @@
 #include <unistd.h>
 
 #include <functional>
+#include <memory>
 
 #include "common/config.h"
 #include "common/status.h"
@@ -582,10 +583,10 @@ public:
 
         doris::EngineOptions options;
         options.store_paths = paths;
-        Status s = doris::StorageEngine::open(options, &k_engine);
+        k_engine = std::make_unique<StorageEngine>(options);
+        Status s = k_engine->open();
         EXPECT_TRUE(s.ok()) << s.to_string();
-
-        _env = doris::ExecEnv::GetInstance();
+        doris::ExecEnv::GetInstance()->set_storage_engine(k_engine.get());
 
         
EXPECT_TRUE(io::global_local_filesystem()->create_directory(zTestDir).ok());
 
@@ -611,6 +612,7 @@ public:
     }
 
     void TearDown() override {
+        ExecEnv::GetInstance()->set_storage_engine(nullptr);
         k_engine.reset();
         _server->Stop(1000);
         _load_stream_mgr.reset();
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp 
b/be/test/runtime/routine_load_task_executor_test.cpp
index f1088300cd..8a8dcc4d67 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -93,7 +93,6 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
     task.__set_kafka_load_info(k_info);
 
     RoutineLoadTaskExecutor executor(&_env);
-
     // submit task
     Status st;
     st = executor.submit_task(task);
@@ -116,6 +115,8 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
     task.__set_kafka_load_info(k_info);
     st = executor.submit_task(task);
     EXPECT_TRUE(st.ok());
+
+    executor.stop();
 }
 
-} // namespace doris
+} // namespace doris
\ No newline at end of file
diff --git a/be/test/testutil/run_all_tests.cpp 
b/be/test/testutil/run_all_tests.cpp
index 2e735ce8fa..de062fa930 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -39,10 +39,13 @@
 int main(int argc, char** argv) {
     doris::ExecEnv::GetInstance()->init_mem_tracker();
     doris::thread_context()->thread_mem_tracker_mgr->init();
-    doris::CacheManager::create_global_instance();
-    doris::TabletSchemaCache::create_global_schema_cache();
-    doris::StoragePageCache::create_global_cache(1 << 30, 10, 0);
-    doris::SegmentLoader::create_global_instance(1000);
+    
doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance());
+    doris::ExecEnv::GetInstance()->set_tablet_schema_cache(
+            doris::TabletSchemaCache::create_global_schema_cache());
+    doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->start();
+    doris::ExecEnv::GetInstance()->set_storage_page_cache(
+            doris::StoragePageCache::create_global_cache(1 << 30, 10, 0));
+    doris::ExecEnv::GetInstance()->set_segment_loader(new 
doris::SegmentLoader(1000));
     std::string conf = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
     if (!doris::config::init(conf.c_str(), false)) {
         fprintf(stderr, "error read config file. \n");
@@ -54,5 +57,7 @@ int main(int argc, char** argv) {
     doris::DiskInfo::init();
     doris::MemInfo::init();
     doris::BackendOptions::init();
-    return RUN_ALL_TESTS();
+    int res = RUN_ALL_TESTS();
+    doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->stop();
+    return res;
 }
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp 
b/be/test/vec/olap/vertical_compaction_test.cpp
index 14a202845a..74f11d7e7d 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -61,6 +61,7 @@
 #include "olap/tablet_meta.h"
 #include "olap/tablet_schema.h"
 #include "olap/utils.h"
+#include "runtime/exec_env.h"
 #include "util/uid_util.h"
 #include "vec/columns/column.h"
 #include "vec/core/block.h"
@@ -93,7 +94,7 @@ protected:
 
         doris::EngineOptions options;
         k_engine = new StorageEngine(options);
-        StorageEngine::_s_instance = k_engine;
+        ExecEnv::GetInstance()->set_storage_engine(k_engine);
     }
     void TearDown() override {
         SAFE_DELETE(_data_dir);
@@ -102,6 +103,7 @@ protected:
             k_engine->stop();
             delete k_engine;
             k_engine = nullptr;
+            ExecEnv::GetInstance()->set_storage_engine(nullptr);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to