This is an automated email from the ASF dual-hosted git repository. kangkaisen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 1ebd156 [Feature]Add fetch/update/clear proto of fe&be for cache (#4190) 1ebd156 is described below commit 1ebd156b99d1e1eaaedbf8ca0e9d3708438e4b11 Author: HaiBo Li <liha...@vip.126.com> AuthorDate: Fri Jul 31 13:23:24 2020 +0800 [Feature]Add fetch/update/clear proto of fe&be for cache (#4190) --- .../org/apache/doris/rpc/BackendServiceProxy.java | 42 +++++++++++++++ .../java/org/apache/doris/rpc/PBackendService.java | 14 +++++ gensrc/proto/internal_service.proto | 62 ++++++++++++++++++++++ gensrc/proto/palo_internal_service.proto | 3 ++ 4 files changed, 121 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index d9a90de..7a01b7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -28,6 +28,11 @@ import org.apache.doris.proto.PProxyRequest; import org.apache.doris.proto.PProxyResult; import org.apache.doris.proto.PTriggerProfileReportResult; import org.apache.doris.proto.PUniqueId; +import org.apache.doris.proto.PUpdateCacheRequest; +import org.apache.doris.proto.PCacheResponse; +import org.apache.doris.proto.PFetchCacheRequest; +import org.apache.doris.proto.PFetchCacheResult; +import org.apache.doris.proto.PClearCacheRequest; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; @@ -164,6 +169,43 @@ public class BackendServiceProxy { } } + public Future<PCacheResponse> updateCache( + TNetworkAddress address, PUpdateCacheRequest request) throws RpcException{ + try { + PBackendService service = getProxy(address); + return service.updateCache(request); + } catch (Throwable e) { + LOG.warn("update cache catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + + public Future<PFetchCacheResult> fetchCache( + TNetworkAddress address, PFetchCacheRequest request) throws RpcException { + try { + PBackendService service = getProxy(address); + return service.fetchCache(request); + } catch (Throwable e) { + LOG.warn("fetch cache catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + + public Future<PCacheResponse> clearCache( + TNetworkAddress address, PClearCacheRequest request) throws RpcException { + try { + PBackendService service = getProxy(address); + return service.clearCache(request); + } catch (Throwable e) { + LOG.warn("clear cache catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + + public Future<PTriggerProfileReportResult> triggerProfileReportAsync( TNetworkAddress address, PTriggerProfileReportRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java index 38bc2e7..3e68009 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java @@ -24,6 +24,11 @@ import org.apache.doris.proto.PFetchDataResult; import org.apache.doris.proto.PProxyRequest; import org.apache.doris.proto.PProxyResult; import org.apache.doris.proto.PTriggerProfileReportResult; +import org.apache.doris.proto.PUpdateCacheRequest; +import org.apache.doris.proto.PClearCacheRequest; +import org.apache.doris.proto.PCacheResponse; +import org.apache.doris.proto.PFetchCacheRequest; +import org.apache.doris.proto.PFetchCacheResult; import com.baidu.jprotobuf.pbrpc.ProtobufRPC; @@ -43,6 +48,15 @@ public interface PBackendService { attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 86400000) Future<PFetchDataResult> fetchDataAsync(PFetchDataRequest request); + @ProtobufRPC(serviceName = "PBackendService", methodName = "update_cache", onceTalkTimeout = 10000) + Future<PCacheResponse> updateCache(PUpdateCacheRequest request); + + @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_cache", onceTalkTimeout = 10000) + Future<PFetchCacheResult> fetchCache(PFetchCacheRequest request); + + @ProtobufRPC(serviceName = "PBackendService", methodName = "clear_cache", onceTalkTimeout = 10000) + Future<PCacheResponse> clearCache(PClearCacheRequest request); + @ProtobufRPC(serviceName = "PBackendService", methodName = "trigger_profile_report", attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000) Future<PTriggerProfileReportResult> triggerProfileReport(PTriggerProfileReportRequest request); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 80f3be9d..cc0d08b 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -147,6 +147,65 @@ message PFetchDataResult { optional PQueryStatistics query_statistics = 4; }; +//Add message definition to fetch and update cache +enum PCacheStatus { + DEFAULT = 0; + CACHE_OK = 1; + PARAM_ERROR = 2; + SIZE_OVER_LIMIT = 3; + NO_SQL_KEY = 4; + NO_PARTITION_KEY = 5; + INVALID_KEY_RANGE = 6; + DATA_OVERDUE = 7; + EMPTY_DATA = 8; +}; + +message PCacheParam { + required int64 partition_key = 1; + optional int64 last_version = 2; + optional int64 last_version_time = 3; +}; + +message PCacheValue { + required PCacheParam param = 1; + required int32 data_size = 2; + repeated bytes row = 3; +}; + +//for update&clear return +message PCacheResponse { + required PCacheStatus status = 1; +}; + +message PUpdateCacheRequest{ + required PUniqueId sql_key = 1; + repeated PCacheValue value = 2; +}; + +message PFetchCacheRequest { + required PUniqueId sql_key = 1; + repeated PCacheParam param = 2; +}; + +message PFetchCacheResult { + required PCacheStatus status = 1; + repeated PCacheValue value = 2; +}; + +enum PClearType { + CLEAR_ALL = 0; + PRUNE_CACHE = 1; + CLEAR_BEFORE_TIME = 2; + CLEAR_SQL_KEY = 3; +}; + +message PClearCacheRequest { + required PClearType clear_type = 1; + optional int64 before_time = 2; + optional PUniqueId sql_key = 3; +}; +//End cache proto definition + message PTriggerProfileReportRequest { repeated PUniqueId instance_ids = 1; }; @@ -195,5 +254,8 @@ service PBackendService { rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult); rpc trigger_profile_report(PTriggerProfileReportRequest) returns (PTriggerProfileReportResult); rpc get_info(PProxyRequest) returns (PProxyResult); + rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse); + rpc fetch_cache(PFetchCacheRequest) returns (PFetchCacheResult); + rpc clear_cache(PClearCacheRequest) returns (PCacheResponse); }; diff --git a/gensrc/proto/palo_internal_service.proto b/gensrc/proto/palo_internal_service.proto index 3adc1ec..31adb76 100644 --- a/gensrc/proto/palo_internal_service.proto +++ b/gensrc/proto/palo_internal_service.proto @@ -36,4 +36,7 @@ service PInternalService { rpc tablet_writer_cancel(doris.PTabletWriterCancelRequest) returns (doris.PTabletWriterCancelResult); rpc trigger_profile_report(doris.PTriggerProfileReportRequest) returns (doris.PTriggerProfileReportResult); rpc get_info(doris.PProxyRequest) returns (doris.PProxyResult); + rpc update_cache(doris.PUpdateCacheRequest) returns (doris.PCacheResponse); + rpc fetch_cache(doris.PFetchCacheRequest) returns (doris.PFetchCacheResult); + rpc clear_cache(doris.PClearCacheRequest) returns (doris.PCacheResponse); }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org