This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit cb8e19992cf6dfd69739f5d1721329e112a6c7c6
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Fri Apr 12 10:03:05 2024 +0800

    [feature](Cloud) Alter be to fetch vault info after creating storage vault 
(#33558)
---
 be/src/cloud/cloud_internal_service.cpp            | 10 +++++
 be/src/cloud/cloud_internal_service.h              |  5 +++
 be/src/cloud/cloud_storage_engine.cpp              | 48 ++++++++++++----------
 be/src/cloud/cloud_storage_engine.h                |  2 +
 .../main/java/org/apache/doris/catalog/Env.java    |  2 +-
 .../org/apache/doris/catalog/StorageVaultMgr.java  | 25 ++++++++++-
 .../org/apache/doris/rpc/BackendServiceClient.java |  5 +++
 .../org/apache/doris/rpc/BackendServiceProxy.java  | 14 +++++++
 .../doris/cloud/catalog/HdfsStorageVaultTest.java  |  3 +-
 gensrc/proto/internal_service.proto                |  7 ++++
 10 files changed, 96 insertions(+), 25 deletions(-)

diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index 0ebbe1c20c6..aba178bb34d 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -17,6 +17,8 @@
 
 #include "cloud/cloud_internal_service.h"
 
+#include "cloud/cloud_storage_engine.h"
+
 namespace doris {
 
 CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, 
ExecEnv* exec_env)
@@ -24,4 +26,12 @@ 
CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, E
 
 CloudInternalServiceImpl::~CloudInternalServiceImpl() = default;
 
+void 
CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* 
controller,
+                                                const 
doris::PAlterVaultSyncRequest* request,
+                                                PAlterVaultSyncResponse* 
response,
+                                                google::protobuf::Closure* 
done) {
+    LOG(INFO) << "alter be to sync vault info from Meta Service";
+    _engine.sync_storage_vault();
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_internal_service.h 
b/be/src/cloud/cloud_internal_service.h
index 6399a8923fa..1bc6806c57f 100644
--- a/be/src/cloud/cloud_internal_service.h
+++ b/be/src/cloud/cloud_internal_service.h
@@ -31,6 +31,11 @@ public:
 
     // TODO(plat1ko): cloud internal service functions
 
+    void alter_vault_sync(google::protobuf::RpcController* controller,
+                          const doris::PAlterVaultSyncRequest* request,
+                          PAlterVaultSyncResponse* response,
+                          google::protobuf::Closure* done) override;
+
 private:
     [[maybe_unused]] CloudStorageEngine& _engine;
 };
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 96e336b3ef3..d3f4c05180c 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -267,32 +267,36 @@ Status CloudStorageEngine::start_bg_threads() {
     return Status::OK();
 }
 
+void CloudStorageEngine::sync_storage_vault() {
+    cloud::StorageVaultInfos vault_infos;
+    auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
+    if (!st.ok()) {
+        LOG(WARNING) << "failed to get storage vault info. err=" << st;
+        return;
+    }
+
+    CHECK(!vault_infos.empty()) << "no s3 infos";
+    for (auto& [id, vault_info] : vault_infos) {
+        auto fs = get_filesystem(id);
+        auto st = (fs == nullptr)
+                          ? std::visit(VaultCreateFSVisitor {id}, vault_info)
+                          : std::visit(RefreshFSVaultVisitor {id, 
std::move(fs)}, vault_info);
+        if (!st.ok()) [[unlikely]] {
+            LOG(WARNING) << vault_process_error(id, vault_info, std::move(st));
+        }
+    }
+
+    if (auto& id = std::get<0>(vault_infos.back());
+        latest_fs() == nullptr || latest_fs()->id() != id) {
+        set_latest_fs(get_filesystem(id));
+    }
+}
+
 // We should enable_java_support if we want to use hdfs vault
 void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() {
     while (!_stop_background_threads_latch.wait_for(
             std::chrono::seconds(config::refresh_s3_info_interval_s))) {
-        cloud::StorageVaultInfos vault_infos;
-        auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
-        if (!st.ok()) {
-            LOG(WARNING) << "failed to get storage vault info. err=" << st;
-            continue;
-        }
-
-        CHECK(!vault_infos.empty()) << "no s3 infos";
-        for (auto& [id, vault_info] : vault_infos) {
-            auto fs = get_filesystem(id);
-            auto st = (fs == nullptr)
-                              ? std::visit(VaultCreateFSVisitor {id}, 
vault_info)
-                              : std::visit(RefreshFSVaultVisitor {id, 
std::move(fs)}, vault_info);
-            if (!st.ok()) [[unlikely]] {
-                LOG(WARNING) << vault_process_error(id, vault_info, 
std::move(st));
-            }
-        }
-
-        if (auto& id = std::get<0>(vault_infos.back());
-            latest_fs() == nullptr || latest_fs()->id() != id) {
-            set_latest_fs(get_filesystem(id));
-        }
+        sync_storage_vault();
     }
 }
 
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index 297050360df..e57d0059e97 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -109,6 +109,8 @@ public:
     std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy(
             std::string_view compaction_policy);
 
+    void sync_storage_vault();
+
 private:
     void _refresh_storage_vault_info_thread_callback();
     void _vacuum_stale_rowsets_thread_callback();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 3f8fafa477e..9497785faea 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -692,7 +692,7 @@ public class Env {
 
         this.brokerMgr = new BrokerMgr();
         this.resourceMgr = new ResourceMgr();
-        this.storageVaultMgr = new StorageVaultMgr();
+        this.storageVaultMgr = new StorageVaultMgr(systemInfo);
 
         this.globalTransactionMgr = 
EnvFactory.getInstance().createGlobalTransactionMgr(this);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java
index ddf10a52fff..b473afd6840 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java
@@ -24,12 +24,18 @@ import 
org.apache.doris.cloud.proto.Cloud.AlterObjStoreInfoRequest.Operation;
 import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest;
+import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TNetworkAddress;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -41,7 +47,12 @@ public class StorageVaultMgr {
 
     private ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
-    public StorageVaultMgr() {
+    private static final ExecutorService ALTER_BE_SYNC_THREAD_POOL = 
Executors.newFixedThreadPool(1);
+
+    private final SystemInfoService systemInfoService;
+
+    public StorageVaultMgr(SystemInfoService systemInfoService) {
+        this.systemInfoService = systemInfoService;
     }
 
     // TODO(ByteYue): The CreateStorageVault should only be handled by master
@@ -125,6 +136,7 @@ public class StorageVaultMgr {
                     
MetaServiceProxy.getInstance().alterObjStoreInfo(requestBuilder.build());
             if (response.getStatus().getCode() == 
Cloud.MetaServiceCode.ALREADY_EXISTED
                     && hdfsStorageVault.ifNotExists()) {
+                ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
                 return;
             }
             if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
@@ -136,4 +148,15 @@ public class StorageVaultMgr {
             throw new DdlException(e.getMessage());
         }
     }
+
+    private void alterSyncVaultTask() {
+        systemInfoService.getAllBackends().forEach(backend -> {
+            TNetworkAddress address = backend.getBrpcAddress();
+            try {
+                BackendServiceProxy.getInstance().alterVaultSync(address, 
PAlterVaultSyncRequest.newBuilder().build());
+            } catch (RpcException e) {
+                LOG.warn("failed to alter sync vault");
+            }
+        });
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 50afc7c96bb..4027cf6d951 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -182,6 +182,11 @@ public class BackendServiceClient {
         return stub.getWalQueueSize(request);
     }
 
+    public Future<InternalService.PAlterVaultSyncResponse> alterVaultSync(
+            InternalService.PAlterVaultSyncRequest request) {
+        return stub.alterVaultSync(request);
+    }
+
 
     public void shutdown() {
         ConnectivityState state = channel.getState(false);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index e541b0eb689..9db06eac4de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -23,6 +23,8 @@ import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest;
+import org.apache.doris.proto.InternalService.PAlterVaultSyncResponse;
 import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
 import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
 import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
@@ -527,6 +529,18 @@ public class BackendServiceProxy {
         }
     }
 
+    public Future<PAlterVaultSyncResponse> alterVaultSync(TNetworkAddress 
address,
+            PAlterVaultSyncRequest request) throws RpcException {
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.alterVaultSync(request);
+        } catch (Throwable e) {
+            LOG.warn("failed to alter vault sync from address={}:{}", 
address.getHostname(),
+                    address.getPort(), e);
+            throw new RpcException(address.getHostname(), e.getMessage());
+        }
+    }
+
     public Future<InternalService.PFetchRemoteSchemaResponse> 
fetchRemoteTabletSchemaAsync(
             TNetworkAddress address, InternalService.PFetchRemoteSchemaRequest 
request) throws RpcException {
         try {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java
index a77e86076d5..e3458e57a36 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.SystemInfoService;
 
 import com.google.common.collect.ImmutableMap;
 import mockit.Mock;
@@ -44,7 +45,7 @@ import java.util.Map;
 import java.util.Set;
 
 public class HdfsStorageVaultTest {
-    private StorageVaultMgr mgr = new StorageVaultMgr();
+    private StorageVaultMgr mgr = new StorageVaultMgr(new SystemInfoService());
 
     @Before
     public void setUp() throws Exception {
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 1a6dad5521b..59abc9adfb7 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -930,6 +930,12 @@ message PFetchRemoteSchemaResponse {
     optional TabletSchemaPB merged_schema = 2;
 }
 
+message PAlterVaultSyncRequest {
+}
+
+message PAlterVaultSyncResponse {
+}
+
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
     rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@@ -979,5 +985,6 @@ service PBackendService {
     rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns 
(PFetchArrowFlightSchemaResult);
     rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns 
(PFetchRemoteSchemaResponse);
     rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns 
(PJdbcTestConnectionResult);
+    rpc alter_vault_sync(PAlterVaultSyncRequest) returns 
(PAlterVaultSyncResponse);
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to