This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new eaffb691d38 [branch-2.0](txn) be dead exceeds 5min abort its txns (#22781, #28662, #35342) (#39317) eaffb691d38 is described below commit eaffb691d38afe0995f073847572564df49b909e Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Wed Aug 14 15:31:51 2024 +0800 [branch-2.0](txn) be dead exceeds 5min abort its txns (#22781, #28662, #35342) (#39317) cherry-pick: #22781, #28662, #35342 --------- Co-authored-by: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> --- .../runtime/stream_load/stream_load_executor.cpp | 4 + docs/en/docs/admin-manual/config/fe-config.md | 10 ++ docs/zh-CN/docs/admin-manual/config/fe-config.md | 10 ++ .../main/java/org/apache/doris/common/Config.java | 14 +++ .../apache/doris/analysis/NativeInsertStmt.java | 5 +- .../java/org/apache/doris/common/ClientPool.java | 2 +- .../java/org/apache/doris/common/FeConstants.java | 1 - .../org/apache/doris/httpv2/rest/LoadAction.java | 6 ++ .../java/org/apache/doris/load/DeleteHandler.java | 5 +- .../apache/doris/load/loadv2/BrokerLoadJob.java | 5 +- .../org/apache/doris/load/loadv2/SparkLoadJob.java | 5 +- .../load/routineload/RoutineLoadTaskInfo.java | 5 +- .../doris/load/sync/canal/CanalSyncChannel.java | 7 +- .../org/apache/doris/nereids/txn/Transaction.java | 5 +- .../java/org/apache/doris/qe/SimpleScheduler.java | 3 +- .../java/org/apache/doris/qe/StmtExecutor.java | 8 +- .../java/org/apache/doris/service/ExecuteEnv.java | 7 ++ .../apache/doris/service/FrontendServiceImpl.java | 12 ++- .../java/org/apache/doris/system/HeartbeatMgr.java | 18 +++- .../doris/transaction/DatabaseTransactionMgr.java | 7 +- .../doris/transaction/GlobalTransactionMgr.java | 32 +++++-- .../apache/doris/transaction/TransactionState.java | 15 ++- .../apache/doris/clone/BeDownCancelCloneTest.java | 4 +- .../doris/cluster/DecommissionBackendTest.java | 2 +- .../org/apache/doris/qe/SimpleSchedulerTest.java | 8 +- .../transaction/DatabaseTransactionMgrTest.java | 10 +- .../transaction/GlobalTransactionMgrTest.java | 11 ++- .../doris/transaction/TransactionStateTest.java | 5 +- .../apache/doris/utframe/TestWithFeService.java | 2 +- gensrc/thrift/FrontendService.thrift | 2 + .../org/apache/doris/regression/suite/Suite.groovy | 62 ++++++++++++ .../suites/demo_p0/streamLoad_action.groovy | 5 + .../stream_load/test_coordidator_be_restart.groovy | 106 +++++++++++++++++++++ 33 files changed, 349 insertions(+), 54 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 4a80eb35ca6..19d25e9ffa1 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -45,6 +45,7 @@ #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" #include "thrift/protocol/TDebugProtocol.h" +#include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/thrift_rpc_helper.h" #include "util/time.h" @@ -242,6 +243,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { request.__set_timeout(ctx->timeout_second); } request.__set_request_id(ctx->id.to_thrift()); + request.__set_backend_id(_exec_env->master_info()->backend_id); TLoadTxnBeginResult result; Status status; @@ -374,6 +376,8 @@ void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx, } Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { + DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK); + DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1); TLoadTxnCommitRequest request; diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index 1c2ab9939a1..fbab5cfcd15 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -587,6 +587,16 @@ Is it possible to configure dynamically: true Whether it is a configuration item unique to the Master FE node: true +### `abort_txn_after_lost_heartbeat_time_second` + +Abort transaction time after lost heartbeat. The default value is 300, which means transactions of be will be aborted after lost heartbeat 300s. + +Default: 300(s) + +Is it possible to configure dynamically: true + +Whether it is a configuration item unique to the Master FE node: true + #### `enable_access_file_without_broker` Default:false diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index c93b2d17e01..60e2198b711 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -587,6 +587,16 @@ FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。 是否为 Master FE 节点独有的配置项:true +#### `abort_txn_after_lost_heartbeat_time_second` + +丢失be心跳后丢弃be事务的时间。默认时间为三百秒,当三百秒fe没有接收到be心跳时,会丢弃该be的所有事务。 + +默认值:300(秒) + +是否可以动态配置:true + +是否为 Master FE 节点独有的配置项:true + #### `enable_access_file_without_broker` 默认值:false diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index a5c0ad36ad8..44882f21768 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1812,6 +1812,20 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static long max_backend_heartbeat_failure_tolerance_count = 1; + /** + * Abort transaction time after lost heartbeat. + * The default value is 300s, which means transactions of be will be aborted after lost heartbeat 300s. + */ + @ConfField(mutable = true, masterOnly = true) + public static int abort_txn_after_lost_heartbeat_time_second = 300; + + /** + * Heartbeat interval in seconds. + * Default is 5, which means every 5 seconds, the master will send a heartbeat to all backends. + */ + @ConfField(mutable = false, masterOnly = false) + public static int heartbeat_interval_second = 5; + /** * The iceberg and hudi table will be removed in v1.3 * Use multi catalog instead. diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 9625db3ea5c..f95efcd3474 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -53,6 +53,7 @@ import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.external.jdbc.JdbcTableSink; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.ExprRewriter; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TUniqueId; @@ -358,7 +359,9 @@ public class NativeInsertStmt extends InsertStmt { LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), Lists.newArrayList(targetTable.getId()), label.getLabelName(), - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); } isTransactionBegin = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java index 7308a225402..bdfffbe4802 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java @@ -27,7 +27,7 @@ import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; public class ClientPool { static GenericKeyedObjectPoolConfig heartbeatConfig = new GenericKeyedObjectPoolConfig(); - static int heartbeatTimeoutMs = FeConstants.heartbeat_interval_second * 1000; + static int heartbeatTimeoutMs = Config.heartbeat_interval_second * 1000; static GenericKeyedObjectPoolConfig backendConfig = new GenericKeyedObjectPoolConfig(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 1afa12856f0..2f45d87e895 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -34,7 +34,6 @@ public class FeConstants { public static int shortkey_max_column_count = 3; public static int shortkey_maxsize_bytes = 36; - public static int heartbeat_interval_second = 5; public static int checkpoint_interval_second = 60; // 1 minutes // dpp version diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index b358ea60b9a..c021c62f367 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -22,6 +22,7 @@ import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.httpv2.exception.UnauthorizedException; @@ -229,6 +230,11 @@ public class LoadAction extends RestBaseController { } private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException { + long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L); + if (debugBackendId != -1L) { + Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId); + return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); + } String qualifiedUser = ConnectContext.get().getQualifiedUser(); Set<Tag> userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); BeSelectionPolicy policy = new BeSelectionPolicy.Builder() diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java index b771eb8c9fa..70b3765227a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -69,6 +69,7 @@ import org.apache.doris.planner.RangePartitionPrunerV2; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.QueryStateException; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; @@ -245,7 +246,9 @@ public class DeleteHandler implements Writable { // begin txn here and generate txn id transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), Lists.newArrayList(olapTable.getId()), label, null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.FRONTEND, jobId, Config.stream_load_default_timeout_second); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 656b9b9d699..4591bb91b03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -49,6 +49,7 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; @@ -104,7 +105,9 @@ public class BrokerLoadJob extends BulkLoadJob { QuotaExceedException, MetaNotFoundException { transactionId = Env.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id, getTimeout()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 09a85d3dffe..878a400e281 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -61,6 +61,7 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.sparkdpp.DppResult; import org.apache.doris.sparkdpp.EtlJobConfig; @@ -198,7 +199,9 @@ public class SparkLoadJob extends BulkLoadJob { QuotaExceedException, MetaNotFoundException { transactionId = Env.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), LoadJobSourceType.FRONTEND, id, getTimeout()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 10d57e66d67..ae2570224c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -27,6 +27,7 @@ import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; @@ -206,7 +207,9 @@ public abstract class RoutineLoadTaskInfo { try { txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(), Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(), timeoutMs / 1000); } catch (DuplicatedRequestException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 5b10ecea818..825d63292ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -32,6 +32,7 @@ import org.apache.doris.load.sync.SyncJob; import org.apache.doris.load.sync.model.Data; import org.apache.doris.proto.InternalService; import org.apache.doris.qe.InsertStreamTxnExecutor; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.SyncTask; import org.apache.doris.task.SyncTaskPool; @@ -133,8 +134,10 @@ public class CanalSyncChannel extends SyncChannel { try { long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label, - new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, - FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond); + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), + sourceType, timeoutSecond); String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); request = new TStreamLoadPutRequest() .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java index 994f5e36055..db6598a1e1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java @@ -34,6 +34,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.TQueryType; @@ -80,7 +81,9 @@ public class Transaction { this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator()); this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( database.getId(), ImmutableList.of(table.getId()), labelName, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout()); this.createAt = System.currentTimeMillis(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java index ace88c8ac0d..48c7744576c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java @@ -19,7 +19,6 @@ package org.apache.doris.qe; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; @@ -176,7 +175,7 @@ public class SimpleScheduler { return; } - blacklistBackends.put(backendID, Pair.of(FeConstants.heartbeat_interval_second + 1, reason)); + blacklistBackends.put(backendID, Pair.of(Config.heartbeat_interval_second + 1, reason)); LOG.warn("add backend {} to black list. reason: {}", backendID, reason); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 8dda65e81f7..f47121348f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -160,6 +160,7 @@ import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.util.InternalQueryBuffer; @@ -1908,9 +1909,10 @@ public class StmtExecutor { String label = txnEntry.getLabel(); if (Env.getCurrentEnv().isMaster()) { long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), - label, new TransactionState.TxnCoordinator( - TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label, + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); txnConf.setTxnId(txnId); String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java index a7ac522b5bd..ecd544c8bff 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java @@ -26,10 +26,12 @@ public class ExecuteEnv { private static volatile ExecuteEnv INSTANCE; private MultiLoadMgr multiLoadMgr; private ConnectScheduler scheduler; + private long startupTime; private ExecuteEnv() { multiLoadMgr = new MultiLoadMgr(); scheduler = new ConnectScheduler(Config.qe_max_connection); + startupTime = System.currentTimeMillis(); } public static ExecuteEnv getInstance() { @@ -50,4 +52,9 @@ public class ExecuteEnv { public MultiLoadMgr getMultiLoadMgr() { return multiLoadMgr; } + + public long getStartupTime() { + return startupTime; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 36fdec157ac..a38921221fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1249,10 +1249,12 @@ public class FrontendServiceImpl implements FrontendService.Iface { OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP); // begin long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; + Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); + long startTime = backend != null ? backend.getLastStartTime() : 0; + TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime); long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequestId(), - new TxnCoordinator(TxnSourceType.BE, clientIp), - TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); + txnCoord, TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); TLoadTxnBeginResult result = new TLoadTxnBeginResult(); result.setTxnId(txnId).setDbId(db.getId()); return result; @@ -1356,10 +1358,12 @@ public class FrontendServiceImpl implements FrontendService.Iface { // step 5: get timeout long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; + Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); + long startTime = backend != null ? backend.getLastStartTime() : 0; + TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime); // step 6: begin transaction long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - db.getId(), tableIdList, request.getLabel(), request.getRequestId(), - new TxnCoordinator(TxnSourceType.BE, clientIp), + db.getId(), tableIdList, request.getLabel(), request.getRequestId(), txnCoord, TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); // step 7: return result diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 7c081c12cd0..f621ae1e322 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -77,7 +77,7 @@ public class HeartbeatMgr extends MasterDaemon { private static volatile AtomicReference<TMasterInfo> masterInfo = new AtomicReference<>(); public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) { - super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000); + super("heartbeat mgr", Config.heartbeat_interval_second * 1000); this.nodeMgr = nodeMgr; this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num, Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric); @@ -168,13 +168,21 @@ public class HeartbeatMgr extends MasterDaemon { BackendHbResponse hbResponse = (BackendHbResponse) response; Backend be = nodeMgr.getBackend(hbResponse.getBeId()); if (be != null) { + long oldStartTime = be.getLastStartTime(); boolean isChanged = be.handleHbResponse(hbResponse, isReplay); - if (hbResponse.getStatus() != HbStatus.OK) { + if (hbResponse.getStatus() == HbStatus.OK) { + long newStartTime = be.getLastStartTime(); + if (!isReplay && oldStartTime != newStartTime) { + Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart( + be.getId(), be.getHost(), newStartTime); + } + } else { // invalid all connections cached in ClientPool ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort())); - if (!isReplay) { - Env.getCurrentEnv().getGlobalTransactionMgr() - .abortTxnWhenCoordinateBeDown(be.getHost(), 100); + if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs() + >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L) { + Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown( + be.getId(), be.getHost(), 100); } } return isChanged; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 7ef043136ba..d733d863f4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1733,13 +1733,16 @@ public class DatabaseTransactionMgr { return null; } - public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String coordinateHost, int limit) { + public List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long coordinateBeId, + String coordinateHost, int limit) { ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>(); readLock(); try { idToRunningTransactionState.values().stream() .filter(t -> (t.getCoordinator().sourceType == TransactionState.TxnSourceType.BE - && t.getCoordinator().ip.equals(coordinateHost))) + && t.getTransactionStatus() == TransactionStatus.PREPARE + && t.getCoordinator().ip.equals(coordinateHost) + && (t.getCoordinator().id == 0 || t.getCoordinator().id == coordinateBeId))) .limit(limit) .forEach(t -> txnInfos.add(Pair.of(t.getDbId(), t.getTransactionId()))); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 22a019c4c0d..35c2195eb33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -653,10 +653,12 @@ public class GlobalTransactionMgr implements Writable { } } - public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String coordinateHost, int limit) { + private List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long coordinateBeId, + String coordinateHost, int limit) { ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>(); for (DatabaseTransactionMgr databaseTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { - txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost, limit)); + txnInfos.addAll(databaseTransactionMgr.getPrepareTransactionIdByCoordinateBe( + coordinateBeId, coordinateHost, limit)); if (txnInfos.size() > limit) { break; } @@ -664,19 +666,33 @@ public class GlobalTransactionMgr implements Writable { return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0, limit)) : txnInfos; } + public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime) { + List<Pair<Long, Long>> transactionIdByCoordinateBe + = getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, Integer.MAX_VALUE); + for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) { + try { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first); + TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second); + long coordStartTime = transactionState.getCoordinator().startTime; + if (coordStartTime < beStartTime) { + dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE restart", null); + } + } catch (UserException e) { + LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); + } + } + } + /** * If a Coordinate BE is down when running txn, the txn will remain in FE until killed by timeout * So when FE identify the Coordinate BE is down, FE should cancel it initiative */ - public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) { - List<Pair<Long, Long>> transactionIdByCoordinateBe = getTransactionIdByCoordinateBe(coordinateHost, limit); + public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String coordinateHost, int limit) { + List<Pair<Long, Long>> transactionIdByCoordinateBe + = getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, limit); for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) { try { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first); - TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second); - if (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED) { - continue; - } dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE is down", null); } catch (UserException e) { LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index f9a094eceb9..cdef27c9359 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -162,15 +162,23 @@ public class TransactionState implements Writable { public static class TxnCoordinator { @SerializedName(value = "sourceType") public TxnSourceType sourceType; + // backendId for backend, 0 for frontend + @SerializedName(value = "id") + public long id = 0; @SerializedName(value = "ip") public String ip; + // frontend/backend start time + @SerializedName(value = "st") + public long startTime = 0; public TxnCoordinator() { } - public TxnCoordinator(TxnSourceType sourceType, String ip) { + public TxnCoordinator(TxnSourceType sourceType, long id, String ip, long startTime) { this.sourceType = sourceType; + this.id = id; this.ip = ip; + this.startTime = startTime; } @Override @@ -301,7 +309,8 @@ public class TransactionState implements Writable { this.transactionId = -1; this.label = ""; this.idToTableCommitInfos = Maps.newHashMap(); - this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, "127.0.0.1"); // mocked, to avoid NPE + // mocked, to avoid NPE + this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 0, "127.0.0.1", System.currentTimeMillis()); this.transactionStatus = TransactionStatus.PREPARE; this.sourceType = LoadJobSourceType.FRONTEND; this.prepareTime = -1; @@ -721,7 +730,7 @@ public class TransactionState implements Writable { info.readFields(in); idToTableCommitInfos.put(info.getTableId(), info); } - txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), Text.readString(in)); + txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), 0, Text.readString(in), 0); transactionStatus = TransactionStatus.valueOf(in.readInt()); sourceType = LoadJobSourceType.valueOf(in.readInt()); prepareTime = in.readLong(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java index 4a413495e98..e288d046de1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java @@ -54,7 +54,7 @@ public class BeDownCancelCloneTest extends TestWithFeService { Config.disable_balance = true; Config.schedule_batch_size = 1000; Config.schedule_slot_num_per_hdd_path = 1000; - FeConstants.heartbeat_interval_second = 5; + Config.heartbeat_interval_second = 5; Config.max_backend_heartbeat_failure_tolerance_count = 1; Config.min_clone_task_timeout_sec = 20 * 60 * 1000; } @@ -114,7 +114,7 @@ public class BeDownCancelCloneTest extends TestWithFeService { params2.put("deadBeIds", String.valueOf(destBeId)); DebugPointUtil.addDebugPointWithParams("HeartbeatMgr.BackendHeartbeatHandler", params2); - Thread.sleep((FeConstants.heartbeat_interval_second + Thread.sleep((Config.heartbeat_interval_second * Config.max_backend_heartbeat_failure_tolerance_count + 4) * 1000L); destBe = Env.getCurrentSystemInfo().getBackend(destBeId); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index b917bc41a7e..ff5f5292be8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -66,7 +66,7 @@ public class DecommissionBackendTest extends TestWithFeService { Config.disable_balance = true; Config.schedule_batch_size = 1000; Config.schedule_slot_num_per_hdd_path = 1000; - FeConstants.heartbeat_interval_second = 5; + Config.heartbeat_interval_second = 5; } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java index 6ba2d271566..ac13900d2cf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java @@ -17,7 +17,7 @@ package org.apache.doris.qe; -import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Config; import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; import org.apache.doris.system.Backend; @@ -47,7 +47,7 @@ public class SimpleSchedulerTest { @BeforeClass public static void setUp() { SimpleScheduler.init(); - FeConstants.heartbeat_interval_second = 2; + Config.heartbeat_interval_second = 2; be1 = new Backend(1000L, "192.168.100.0", 9050); be2 = new Backend(1001L, "192.168.100.1", 9050); be3 = new Backend(1002L, "192.168.100.2", 9050); @@ -139,7 +139,7 @@ public class SimpleSchedulerTest { t3.join(); Assert.assertFalse(SimpleScheduler.isAvailable(be1)); - Thread.sleep((FeConstants.heartbeat_interval_second + 5) * 1000); + Thread.sleep((Config.heartbeat_interval_second + 5) * 1000); Assert.assertTrue(SimpleScheduler.isAvailable(be1)); } @@ -194,7 +194,7 @@ public class SimpleSchedulerTest { System.out.println(e.getMessage()); } - Thread.sleep((FeConstants.heartbeat_interval_second + 5) * 1000); + Thread.sleep((Config.heartbeat_interval_second + 5) * 1000); Assert.assertNotNull(SimpleScheduler.getHost(locations.get(0).backend_id, locations, backends, ref)); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 9108570e5e4..ea63a5e18b1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -57,8 +57,8 @@ public class DatabaseTransactionMgrTest { private static Env slaveEnv; private static Map<String, Long> LabelToTxnId; - private TransactionState.TxnCoordinator transactionSource = - new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe"); + private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, 0, "localfe", System.currentTimeMillis()); public static void setTransactionFinishPublish(TransactionState transactionState, List<Long> backendIds) { for (long backendId : backendIds) { @@ -118,7 +118,9 @@ public class DatabaseTransactionMgrTest { masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1); labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1); - TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1"); + // txn 2, 3, 4 + TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.BE, 0, "be1", System.currentTimeMillis()); long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLabel2, beTransactionSource, @@ -204,7 +206,7 @@ public class DatabaseTransactionMgrTest { @Test public void testGetTransactionIdByCoordinateBe() throws UserException { DatabaseTransactionMgr masterDbTransMgr = masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1); - List<Pair<Long, Long>> transactionInfoList = masterDbTransMgr.getTransactionIdByCoordinateBe("be1", 10); + List<Pair<Long, Long>> transactionInfoList = masterDbTransMgr.getPrepareTransactionIdByCoordinateBe(0, "be1", 10); Assert.assertEquals(3, transactionInfoList.size()); Assert.assertEquals(CatalogTestUtil.testDbId1, transactionInfoList.get(0).first.longValue()); Assert.assertEquals(TransactionStatus.PREPARE, diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 414f5cf03c4..cc00237a4c4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -77,7 +77,8 @@ public class GlobalTransactionMgrTest { private static Env masterEnv; private static Env slaveEnv; - private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe"); + private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, 0, "localfe", System.currentTimeMillis()); @Before public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException, @@ -323,7 +324,9 @@ public class GlobalTransactionMgrTest { Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, - LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(), + LoadJobSourceType.ROUTINE_LOAD_TASK, + new TxnCoordinator(TxnSourceType.BE, 0, "be1", System.currentTimeMillis()), + routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); masterTransMgr.getCallbackFactory().addCallback(routineLoadJob); @@ -395,7 +398,9 @@ public class GlobalTransactionMgrTest { Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, - LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(), + LoadJobSourceType.ROUTINE_LOAD_TASK, + new TxnCoordinator(TxnSourceType.BE, 0, "be1", System.currentTimeMillis()), + routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); masterTransMgr.getCallbackFactory().addCallback(routineLoadJob); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java index c20b2097f8d..f08b7478d06 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java @@ -61,8 +61,9 @@ public class TransactionStateTest { UUID uuid = UUID.randomUUID(); TransactionState transactionState = new TransactionState(1000L, Lists.newArrayList(20000L, 20001L), 3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()), - LoadJobSourceType.BACKEND_STREAMING, new TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), 50000L, - 60 * 1000L); + LoadJobSourceType.BACKEND_STREAMING, + new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1", System.currentTimeMillis()), + 50000L, 60 * 1000L); transactionState.write(out); out.flush(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index c61fcd28afe..59e4eae0a37 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -405,7 +405,7 @@ public abstract class TestWithFeService { } private void checkBEHeartbeat(List<Backend> bes) throws InterruptedException { - int maxTry = FeConstants.heartbeat_interval_second + 5; + int maxTry = Config.heartbeat_interval_second + 5; boolean allAlive = false; while (maxTry-- > 0 && !allAlive) { Thread.sleep(1000); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index da4c915e9b0..75a6537ee85 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -559,6 +559,7 @@ struct TLoadTxnBeginRequest { 10: optional i64 timeout 11: optional Types.TUniqueId request_id 12: optional string token + 13: optional i64 backend_id } struct TLoadTxnBeginResult { @@ -581,6 +582,7 @@ struct TBeginTxnRequest { 9: optional i64 timeout 10: optional Types.TUniqueId request_id 11: optional string token + 12: optional i64 backend_id } struct TBeginTxnResult { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index afc23cc2cc0..9a14346af10 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -422,6 +422,41 @@ class Suite implements GroovyInterceptable { return result; } + long getTableId(String tableName) { + return getTableId(getDbName(), tableName) + } + + long getTableId(String dbName, String tableName) { + def dbInfo = sql "show proc '/dbs'" + for(List<Object> row : dbInfo) { + if (row[1].replace("default_cluster:", "").equals(dbName)) { + def tbInfo = sql "show proc '/dbs/${row[0]}' " + for (List<Object> tb : tbInfo) { + if (tb[1].equals(tableName)) { + return tb[0].toLong() + } + } + } + } + } + + long getDbId() { + return getDbId(getDbName()) + } + + long getDbId(String dbName) { + def dbInfo = sql "show proc '/dbs'" + for (List<Object> row : dbInfo) { + if (row[1].replace("default_cluster:", "").equals(dbName)) { + return row[0].toLong() + } + } + } + + String getDbName() { + return context.dbName + } + List<List<Object>> order_sql(String sqlStr) { return sql(sqlStr, true) } @@ -681,6 +716,33 @@ class Suite implements GroovyInterceptable { return hdfs.downLoad(label) } + void runStreamLoadExample(String tableName, String coordidateBeHostPort = "") { + def backends = sql_return_maparray "show backends" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + id int, + name varchar(255) + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "${backends.size()}" + ) + """ + + streamLoad { + table tableName + set 'column_separator', ',' + file context.config.dataPath + "/demo_p0/streamload_input.csv" + time 10000 + if (!coordidateBeHostPort.equals("")) { + def pos = coordidateBeHostPort.indexOf(':') + def host = coordidateBeHostPort.substring(0, pos) + def httpPort = coordidateBeHostPort.substring(pos + 1).toInteger() + directToBe host, httpPort + } + } + } + void streamLoad(Closure actionSupplier) { runAction(new StreamLoadAction(context), actionSupplier) } diff --git a/regression-test/suites/demo_p0/streamLoad_action.groovy b/regression-test/suites/demo_p0/streamLoad_action.groovy index a11aed7a1c5..b0ca182c8d9 100644 --- a/regression-test/suites/demo_p0/streamLoad_action.groovy +++ b/regression-test/suites/demo_p0/streamLoad_action.groovy @@ -124,6 +124,11 @@ suite("streamLoad_action") { LIMIT 5; """ + def tableName2 = "test_streamload_action2" + runStreamLoadExample(tableName2) + sql """ DROP TABLE ${tableName} """ + sql """ DROP TABLE ${tableName2}""" + sql """ DROP TABLE B """ } diff --git a/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy new file mode 100644 index 00000000000..cd2b8ea9e48 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException + +suite('test_coordidator_be_restart') { + def options = new ClusterOptions() + options.cloudMode = false + options.enableDebugPoints() + + docker(options) { + def db = context.config.getDbNameByFile(context.file) + def tableName1 = 'tbl_test_coordidator_be_restart_t1' + setFeConfig('abort_txn_after_lost_heartbeat_time_second', 3600) + + def dbId = getDbId() + + def txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + assertEquals(0, txns.size()) + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + assertEquals(0, txns.size()) + + def coordinatorBe = cluster.getAllBackends().get(0) + def coordinatorBeHost = coordinatorBe.host + + GetDebugPoint().enableDebugPointForAllFEs('LoadAction.selectRedirectBackend.backendId', [value: coordinatorBe.backendId]) + GetDebugPoint().enableDebugPointForAllBEs('StreamLoadExecutor.commit_txn.block') + + thread { + try { + runStreamLoadExample(tableName1, coordinatorBe.host + ':' + coordinatorBe.httpPort) + } catch (NoHttpResponseException t) { + // be down will raise NoHttpResponseException + } + } + + sleep(5000) + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(1, txns.size()) + for (def txn : txns) { + assertEquals('PREPARE', txn.TransactionStatus) + } + + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + assertEquals(0, txns.size()) + + // coordinatorBe shutdown not abort txn because abort_txn_after_lost_heartbeat_time_second = 3600 + cluster.stopBackends(coordinatorBe.index) + def isDead = false + for (def i = 0; i < 10; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (!be.Alive.toBoolean()) { + isDead = true + break + } + sleep 1000 + } + assertTrue(isDead) + sleep 5000 + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(1, txns.size()) + for (def txn : txns) { + assertEquals('PREPARE', txn.TransactionStatus) + } + + // coordinatorBe restart, abort txn on it + cluster.startBackends(coordinatorBe.index) + def isAlive = false + for (def i = 0; i < 20; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (be.Alive.toBoolean()) { + isAlive = true + break + } + sleep 1000 + } + assertTrue(isAlive) + sleep 5000 + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(0, txns.size()) + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + logger.info('finished txns: ' + txns) + assertEquals(1, txns.size()) + for (def txn : txns) { + assertEquals('ABORTED', txn.TransactionStatus) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org