This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new b979744325d branch-3.0: [fix](cloud-mow) FE should release mow lock when calculate delete bitmap catch exception (#43088) b979744325d is described below commit b979744325d47f5abf4cc446bc154b5e0d42d531 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Nov 8 09:48:35 2024 +0800 branch-3.0: [fix](cloud-mow) FE should release mow lock when calculate delete bitmap catch exception (#43088) PR Body: Now mow table lock is released on ms when doing commit txn, however if calculate delete bitmap failed before commiting txn, this lock will not release which will lead to another loading task hang on geting mow lock until this lock is expired on last txn. Cherry-picked from #41759 Co-authored-by: huanghaibin <284824...@qq.com> --- be/src/cloud/cloud_meta_mgr.cpp | 22 ++++ be/src/cloud/cloud_meta_mgr.h | 3 + cloud/src/common/bvars.cpp | 2 + cloud/src/common/bvars.h | 1 + cloud/src/meta-service/meta_service.cpp | 52 ++++++++ cloud/src/meta-service/meta_service.h | 13 ++ .../apache/doris/cloud/rpc/MetaServiceClient.java | 11 ++ .../apache/doris/cloud/rpc/MetaServiceProxy.java | 6 + .../transaction/CloudGlobalTransactionMgr.java | 37 +++++- gensrc/proto/cloud.proto | 13 ++ .../test_cloud_mow_stream_load_with_timeout.out | 7 + .../test_cloud_mow_broker_load_with_retry.groovy | 2 +- .../test_cloud_mow_stream_load_with_timeout.groovy | 146 +++++++++++++++++++++ 13 files changed, 313 insertions(+), 2 deletions(-) diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index e198017f17a..ad7d06c2e63 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -293,6 +293,9 @@ static std::string debug_info(const Request& req) { return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id()); } else if constexpr (is_any_v<Request, RemoveDeleteBitmapRequest>) { return fmt::format(" tablet_id={}", req.tablet_id()); + } else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) { + return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(), + req.tablet_id(), req.lock_id()); } else { static_assert(!sizeof(Request)); } @@ -1112,6 +1115,25 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, in return st; } +Status CloudMetaMgr::remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, + int64_t initiator) { + VLOG_DEBUG << "remove_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id() + << ",lock_id:" << lock_id; + RemoveDeleteBitmapUpdateLockRequest req; + RemoveDeleteBitmapUpdateLockResponse res; + req.set_cloud_unique_id(config::cloud_unique_id); + req.set_tablet_id(tablet.tablet_id()); + req.set_lock_id(lock_id); + req.set_initiator(initiator); + auto st = retry_rpc("remove delete bitmap update lock", req, &res, + &MetaService_Stub::remove_delete_bitmap_update_lock); + if (!st.ok()) { + LOG(WARNING) << "remove delete bitmap update lock fail,tablet_id=" << tablet.tablet_id() + << " lock_id=" << lock_id << " st=" << st.to_string(); + } + return st; +} + Status CloudMetaMgr::remove_old_version_delete_bitmap( int64_t tablet_id, const std::vector<std::tuple<std::string, uint64_t, uint64_t>>& to_delete) { diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index 79cdb3fd3d1..0134469407a 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -101,6 +101,9 @@ public: Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, int64_t initiator); + Status remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, + int64_t initiator); + Status remove_old_version_delete_bitmap( int64_t tablet_id, const std::vector<std::tuple<std::string, uint64_t, uint64_t>>& to_delete); diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index f053c1877fb..f9b11aa85b4 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -74,6 +74,8 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap("ms", "get_delete_bitmap" BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms", "get_delete_bitmap_update_lock"); BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap("ms", "remove_delete_bitmap"); +BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock( + "ms", "remove_delete_bitmap_update_lock"); BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance"); BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach"); BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", "reset_rl_progress"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index 2a9efe35302..4848ec4b456 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -173,6 +173,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_update_delete_bitmap; extern BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap; extern BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock; extern BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap; +extern BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock; extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance; diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 107ca9c0447..8212ab991b5 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -2095,6 +2095,58 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl } } +void MetaServiceImpl::remove_delete_bitmap_update_lock( + google::protobuf::RpcController* controller, + const RemoveDeleteBitmapUpdateLockRequest* request, + RemoveDeleteBitmapUpdateLockResponse* response, ::google::protobuf::Closure* done) { + RPC_PREPROCESS(remove_delete_bitmap_update_lock); + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + if (cloud_unique_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "cloud unique id not set"; + return; + } + + instance_id = get_instance_id(resource_mgr_, cloud_unique_id); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; + return; + } + + RPC_RATE_LIMIT(remove_delete_bitmap_update_lock) + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + msg = "failed to init txn"; + return; + } + if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, request->table_id(), + request->lock_id(), request->initiator())) { + LOG(WARNING) << "failed to check delete bitmap tablet lock" + << " table_id=" << request->table_id() << " tablet_id=" << request->tablet_id() + << " request lock_id=" << request->lock_id() + << " request initiator=" << request->initiator() << " msg " << msg; + return; + } + std::string lock_key = + meta_delete_bitmap_update_lock_key({instance_id, request->table_id(), -1}); + txn->remove(lock_key); + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::COMMIT>(err); + ss << "failed to remove delete bitmap tablet lock , err=" << err; + msg = ss.str(); + return; + } + + LOG(INFO) << "remove delete bitmap table lock table_id=" << request->table_id() + << " tablet_id=" << request->tablet_id() << " lock_id=" << request->lock_id() + << ", key=" << hex(lock_key) << ", initiator=" << request->initiator(); +} + void MetaServiceImpl::remove_delete_bitmap(google::protobuf::RpcController* controller, const RemoveDeleteBitmapRequest* request, RemoveDeleteBitmapResponse* response, diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index f60d795949b..55e8626b6bf 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -274,6 +274,11 @@ public: RemoveDeleteBitmapResponse* response, ::google::protobuf::Closure* done) override; + void remove_delete_bitmap_update_lock(google::protobuf::RpcController* controller, + const RemoveDeleteBitmapUpdateLockRequest* request, + RemoveDeleteBitmapUpdateLockResponse* response, + ::google::protobuf::Closure* done) override; + // cloud control get cluster's status by this api void get_cluster_status(google::protobuf::RpcController* controller, const GetClusterStatusRequest* request, @@ -647,6 +652,14 @@ public: call_impl(&cloud::MetaService::remove_delete_bitmap, controller, request, response, done); } + void remove_delete_bitmap_update_lock(google::protobuf::RpcController* controller, + const RemoveDeleteBitmapUpdateLockRequest* request, + RemoveDeleteBitmapUpdateLockResponse* response, + ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::remove_delete_bitmap_update_lock, controller, request, + response, done); + } + // cloud control get cluster's status by this api void get_cluster_status(google::protobuf::RpcController* controller, const GetClusterStatusRequest* request, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index c4d28fb3bc2..904f3ec15d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -345,6 +345,17 @@ public class MetaServiceClient { return blockingStub.getDeleteBitmapUpdateLock(request); } + public Cloud.RemoveDeleteBitmapUpdateLockResponse removeDeleteBitmapUpdateLock( + Cloud.RemoveDeleteBitmapUpdateLockRequest request) { + if (!request.hasCloudUniqueId()) { + Cloud.RemoveDeleteBitmapUpdateLockRequest.Builder builder = Cloud.RemoveDeleteBitmapUpdateLockRequest + .newBuilder(); + builder.mergeFrom(request); + return blockingStub.removeDeleteBitmapUpdateLock(builder.setCloudUniqueId(Config.cloud_unique_id).build()); + } + return blockingStub.removeDeleteBitmapUpdateLock(request); + } + public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest request) { if (!request.hasCloudUniqueId()) { Cloud.GetInstanceRequest.Builder builder = Cloud.GetInstanceRequest.newBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index 6ed0eb81b78..d7f718e3ca4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -335,6 +335,12 @@ public class MetaServiceProxy { return w.executeRequest((client) -> client.getDeleteBitmapUpdateLock(request)); } + public Cloud.RemoveDeleteBitmapUpdateLockResponse removeDeleteBitmapUpdateLock( + Cloud.RemoveDeleteBitmapUpdateLockRequest request) + throws RpcException { + return w.executeRequest((client) -> client.removeDeleteBitmapUpdateLock(request)); + } + public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request) throws RpcException { return w.executeRequest((client) -> client.alterObjStoreInfo(request)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 131473470ab..9a7ee5bc86e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -56,6 +56,8 @@ import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB; import org.apache.doris.cloud.proto.Cloud.MetaServiceCode; import org.apache.doris.cloud.proto.Cloud.PrecommitTxnRequest; import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse; +import org.apache.doris.cloud.proto.Cloud.RemoveDeleteBitmapUpdateLockRequest; +import org.apache.doris.cloud.proto.Cloud.RemoveDeleteBitmapUpdateLockResponse; import org.apache.doris.cloud.proto.Cloud.SubTxnInfo; import org.apache.doris.cloud.proto.Cloud.TableStatsPB; import org.apache.doris.cloud.proto.Cloud.TabletIndexPB; @@ -648,7 +650,13 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = getCalcDeleteBitmapInfo( backendToPartitionTablets, partitionVersions, baseCompactionCnts, cumulativeCompactionCnts, cumulativePoints); - sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos); + try { + sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos); + } catch (UserException e) { + LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" + transactionId + ",exception=" + e.getMessage()); + removeDeleteBitmapUpdateLock(tableToPartitions, transactionId); + throw e; + } } private void getPartitionInfo(List<OlapTable> tableList, @@ -869,6 +877,33 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } } + private void removeDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, long transactionId) { + for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) { + RemoveDeleteBitmapUpdateLockRequest.Builder builder = RemoveDeleteBitmapUpdateLockRequest.newBuilder(); + builder.setTableId(entry.getKey()) + .setLockId(transactionId) + .setInitiator(-1); + final RemoveDeleteBitmapUpdateLockRequest request = builder.build(); + RemoveDeleteBitmapUpdateLockResponse response = null; + try { + response = MetaServiceProxy.getInstance().removeDeleteBitmapUpdateLock(request); + if (LOG.isDebugEnabled()) { + LOG.debug("remove delete bitmap lock, transactionId={}, Request: {}, Response: {}", + transactionId, request, response); + } + Preconditions.checkNotNull(response); + Preconditions.checkNotNull(response.getStatus()); + if (response.getStatus().getCode() != MetaServiceCode.OK) { + LOG.warn("remove delete bitmap lock failed, transactionId={}, response:{}", + transactionId, response); + } + } catch (Exception e) { + LOG.warn("ignore get delete bitmap lock exception, transactionId={}, exception={}", + transactionId, e); + } + } + } + private void sendCalcDeleteBitmaptask(long dbId, long transactionId, Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos) throws UserException { diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 0c478255503..8d93b973b96 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1463,6 +1463,18 @@ message GetDeleteBitmapUpdateLockResponse { repeated int64 cumulative_points = 4; } +message RemoveDeleteBitmapUpdateLockRequest { + optional string cloud_unique_id = 1; // For auth + optional int64 table_id = 2; + optional int64 tablet_id = 3; + optional int64 lock_id = 4; + optional int64 initiator = 5; +} + +message RemoveDeleteBitmapUpdateLockResponse { + optional MetaServiceResponseStatus status = 1; +} + message GetRLTaskCommitAttachRequest { optional string cloud_unique_id = 1; // For auth optional int64 db_id = 2; @@ -1574,6 +1586,7 @@ service MetaService { rpc update_delete_bitmap(UpdateDeleteBitmapRequest) returns(UpdateDeleteBitmapResponse); rpc get_delete_bitmap(GetDeleteBitmapRequest) returns(GetDeleteBitmapResponse); rpc get_delete_bitmap_update_lock(GetDeleteBitmapUpdateLockRequest) returns(GetDeleteBitmapUpdateLockResponse); + rpc remove_delete_bitmap_update_lock(RemoveDeleteBitmapUpdateLockRequest) returns(RemoveDeleteBitmapUpdateLockResponse); rpc remove_delete_bitmap(RemoveDeleteBitmapRequest) returns(RemoveDeleteBitmapResponse); // routine load progress diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out new file mode 100644 index 00000000000..b8b3ea3ecca --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- + +-- !sql -- +5 e 90 +6 f 100 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy index 035a6307d46..1bd4d87742c 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy @@ -176,7 +176,7 @@ suite("test_cloud_mow_broker_load_with_retry", "nonConcurrent") { ++i } } finally { - GetDebugPoint().disableDebugPointForAllFEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") sql "DROP TABLE IF EXISTS ${table};" GetDebugPoint().clearDebugPointsForAllBEs() } diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy new file mode 100644 index 00000000000..122503b1611 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") { + if (!isCloudMode()) { + return + } + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def customFeConfig = [ + calculate_delete_bitmap_task_timeout_seconds: 2, + meta_service_rpc_retry_times : 5 + ] + + // store the original value + get_be_param("mow_stream_load_commit_retry_times") + // disable retry to make this problem more clear + set_be_param("mow_stream_load_commit_retry_times", "1") + + + def tableName = "tbl_basic" + setFeConfigTemporary(customFeConfig) { + try { + // create table + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + // this streamLoad will fail on calculate delete bitmap timeout + GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("Timeout")) + } + } + qt_sql """ select * from ${tableName} order by id""" + + // this streamLoad will success because of removing timeout simulation + GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + qt_sql """ select * from ${tableName} order by id""" + } finally { + reset_be_param("mow_stream_load_commit_retry_times") + GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + sql "DROP TABLE IF EXISTS ${tableName};" + GetDebugPoint().clearDebugPointsForAllBEs() + } + + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org