This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit cb8e19992cf6dfd69739f5d1721329e112a6c7c6 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Fri Apr 12 10:03:05 2024 +0800 [feature](Cloud) Alter be to fetch vault info after creating storage vault (#33558) --- be/src/cloud/cloud_internal_service.cpp | 10 +++++ be/src/cloud/cloud_internal_service.h | 5 +++ be/src/cloud/cloud_storage_engine.cpp | 48 ++++++++++++---------- be/src/cloud/cloud_storage_engine.h | 2 + .../main/java/org/apache/doris/catalog/Env.java | 2 +- .../org/apache/doris/catalog/StorageVaultMgr.java | 25 ++++++++++- .../org/apache/doris/rpc/BackendServiceClient.java | 5 +++ .../org/apache/doris/rpc/BackendServiceProxy.java | 14 +++++++ .../doris/cloud/catalog/HdfsStorageVaultTest.java | 3 +- gensrc/proto/internal_service.proto | 7 ++++ 10 files changed, 96 insertions(+), 25 deletions(-) diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 0ebbe1c20c6..aba178bb34d 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -17,6 +17,8 @@ #include "cloud/cloud_internal_service.h" +#include "cloud/cloud_storage_engine.h" + namespace doris { CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env) @@ -24,4 +26,12 @@ CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, E CloudInternalServiceImpl::~CloudInternalServiceImpl() = default; +void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* controller, + const doris::PAlterVaultSyncRequest* request, + PAlterVaultSyncResponse* response, + google::protobuf::Closure* done) { + LOG(INFO) << "alter be to sync vault info from Meta Service"; + _engine.sync_storage_vault(); +} + } // namespace doris diff --git a/be/src/cloud/cloud_internal_service.h b/be/src/cloud/cloud_internal_service.h index 6399a8923fa..1bc6806c57f 100644 --- a/be/src/cloud/cloud_internal_service.h +++ b/be/src/cloud/cloud_internal_service.h @@ -31,6 +31,11 @@ public: // TODO(plat1ko): cloud internal service functions + void alter_vault_sync(google::protobuf::RpcController* controller, + const doris::PAlterVaultSyncRequest* request, + PAlterVaultSyncResponse* response, + google::protobuf::Closure* done) override; + private: [[maybe_unused]] CloudStorageEngine& _engine; }; diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 96e336b3ef3..d3f4c05180c 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -267,32 +267,36 @@ Status CloudStorageEngine::start_bg_threads() { return Status::OK(); } +void CloudStorageEngine::sync_storage_vault() { + cloud::StorageVaultInfos vault_infos; + auto st = _meta_mgr->get_storage_vault_info(&vault_infos); + if (!st.ok()) { + LOG(WARNING) << "failed to get storage vault info. err=" << st; + return; + } + + CHECK(!vault_infos.empty()) << "no s3 infos"; + for (auto& [id, vault_info] : vault_infos) { + auto fs = get_filesystem(id); + auto st = (fs == nullptr) + ? std::visit(VaultCreateFSVisitor {id}, vault_info) + : std::visit(RefreshFSVaultVisitor {id, std::move(fs)}, vault_info); + if (!st.ok()) [[unlikely]] { + LOG(WARNING) << vault_process_error(id, vault_info, std::move(st)); + } + } + + if (auto& id = std::get<0>(vault_infos.back()); + latest_fs() == nullptr || latest_fs()->id() != id) { + set_latest_fs(get_filesystem(id)); + } +} + // We should enable_java_support if we want to use hdfs vault void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() { while (!_stop_background_threads_latch.wait_for( std::chrono::seconds(config::refresh_s3_info_interval_s))) { - cloud::StorageVaultInfos vault_infos; - auto st = _meta_mgr->get_storage_vault_info(&vault_infos); - if (!st.ok()) { - LOG(WARNING) << "failed to get storage vault info. err=" << st; - continue; - } - - CHECK(!vault_infos.empty()) << "no s3 infos"; - for (auto& [id, vault_info] : vault_infos) { - auto fs = get_filesystem(id); - auto st = (fs == nullptr) - ? std::visit(VaultCreateFSVisitor {id}, vault_info) - : std::visit(RefreshFSVaultVisitor {id, std::move(fs)}, vault_info); - if (!st.ok()) [[unlikely]] { - LOG(WARNING) << vault_process_error(id, vault_info, std::move(st)); - } - } - - if (auto& id = std::get<0>(vault_infos.back()); - latest_fs() == nullptr || latest_fs()->id() != id) { - set_latest_fs(get_filesystem(id)); - } + sync_storage_vault(); } } diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 297050360df..e57d0059e97 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -109,6 +109,8 @@ public: std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy( std::string_view compaction_policy); + void sync_storage_vault(); + private: void _refresh_storage_vault_info_thread_callback(); void _vacuum_stale_rowsets_thread_callback(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 3f8fafa477e..9497785faea 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -692,7 +692,7 @@ public class Env { this.brokerMgr = new BrokerMgr(); this.resourceMgr = new ResourceMgr(); - this.storageVaultMgr = new StorageVaultMgr(); + this.storageVaultMgr = new StorageVaultMgr(systemInfo); this.globalTransactionMgr = EnvFactory.getInstance().createGlobalTransactionMgr(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java index ddf10a52fff..b473afd6840 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java @@ -24,12 +24,18 @@ import org.apache.doris.cloud.proto.Cloud.AlterObjStoreInfoRequest.Operation; import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; +import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest; +import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TNetworkAddress; import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -41,7 +47,12 @@ public class StorageVaultMgr { private ReadWriteLock rwLock = new ReentrantReadWriteLock(); - public StorageVaultMgr() { + private static final ExecutorService ALTER_BE_SYNC_THREAD_POOL = Executors.newFixedThreadPool(1); + + private final SystemInfoService systemInfoService; + + public StorageVaultMgr(SystemInfoService systemInfoService) { + this.systemInfoService = systemInfoService; } // TODO(ByteYue): The CreateStorageVault should only be handled by master @@ -125,6 +136,7 @@ public class StorageVaultMgr { MetaServiceProxy.getInstance().alterObjStoreInfo(requestBuilder.build()); if (response.getStatus().getCode() == Cloud.MetaServiceCode.ALREADY_EXISTED && hdfsStorageVault.ifNotExists()) { + ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask()); return; } if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { @@ -136,4 +148,15 @@ public class StorageVaultMgr { throw new DdlException(e.getMessage()); } } + + private void alterSyncVaultTask() { + systemInfoService.getAllBackends().forEach(backend -> { + TNetworkAddress address = backend.getBrpcAddress(); + try { + BackendServiceProxy.getInstance().alterVaultSync(address, PAlterVaultSyncRequest.newBuilder().build()); + } catch (RpcException e) { + LOG.warn("failed to alter sync vault"); + } + }); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 50afc7c96bb..4027cf6d951 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -182,6 +182,11 @@ public class BackendServiceClient { return stub.getWalQueueSize(request); } + public Future<InternalService.PAlterVaultSyncResponse> alterVaultSync( + InternalService.PAlterVaultSyncRequest request) { + return stub.alterVaultSync(request); + } + public void shutdown() { ConnectivityState state = channel.getState(false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index e541b0eb689..9db06eac4de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -23,6 +23,8 @@ import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.metric.MetricRepo; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest; +import org.apache.doris.proto.InternalService.PAlterVaultSyncResponse; import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; @@ -527,6 +529,18 @@ public class BackendServiceProxy { } } + public Future<PAlterVaultSyncResponse> alterVaultSync(TNetworkAddress address, + PAlterVaultSyncRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.alterVaultSync(request); + } catch (Throwable e) { + LOG.warn("failed to alter vault sync from address={}:{}", address.getHostname(), + address.getPort(), e); + throw new RpcException(address.getHostname(), e.getMessage()); + } + } + public Future<InternalService.PFetchRemoteSchemaResponse> fetchRemoteTabletSchemaAsync( TNetworkAddress address, InternalService.PFetchRemoteSchemaRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java index a77e86076d5..e3458e57a36 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java @@ -31,6 +31,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.SystemInfoService; import com.google.common.collect.ImmutableMap; import mockit.Mock; @@ -44,7 +45,7 @@ import java.util.Map; import java.util.Set; public class HdfsStorageVaultTest { - private StorageVaultMgr mgr = new StorageVaultMgr(); + private StorageVaultMgr mgr = new StorageVaultMgr(new SystemInfoService()); @Before public void setUp() throws Exception { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 1a6dad5521b..59abc9adfb7 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -930,6 +930,12 @@ message PFetchRemoteSchemaResponse { optional TabletSchemaPB merged_schema = 2; } +message PAlterVaultSyncRequest { +} + +message PAlterVaultSyncResponse { +} + service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult); @@ -979,5 +985,6 @@ service PBackendService { rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns (PFetchArrowFlightSchemaResult); rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns (PFetchRemoteSchemaResponse); rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns (PJdbcTestConnectionResult); + rpc alter_vault_sync(PAlterVaultSyncRequest) returns (PAlterVaultSyncResponse); }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org