This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new eaffb691d38 [branch-2.0](txn) be dead exceeds 5min abort its txns 
(#22781,  #28662, #35342) (#39317)
eaffb691d38 is described below

commit eaffb691d38afe0995f073847572564df49b909e
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Wed Aug 14 15:31:51 2024 +0800

    [branch-2.0](txn) be dead exceeds 5min abort its txns (#22781,  #28662, 
#35342) (#39317)
    
    cherry-pick:  #22781,  #28662, #35342
    
    ---------
    
    Co-authored-by: HHoflittlefish777 
<77738092+hhoflittlefish...@users.noreply.github.com>
---
 .../runtime/stream_load/stream_load_executor.cpp   |   4 +
 docs/en/docs/admin-manual/config/fe-config.md      |  10 ++
 docs/zh-CN/docs/admin-manual/config/fe-config.md   |  10 ++
 .../main/java/org/apache/doris/common/Config.java  |  14 +++
 .../apache/doris/analysis/NativeInsertStmt.java    |   5 +-
 .../java/org/apache/doris/common/ClientPool.java   |   2 +-
 .../java/org/apache/doris/common/FeConstants.java  |   1 -
 .../org/apache/doris/httpv2/rest/LoadAction.java   |   6 ++
 .../java/org/apache/doris/load/DeleteHandler.java  |   5 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   5 +-
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |   5 +-
 .../load/routineload/RoutineLoadTaskInfo.java      |   5 +-
 .../doris/load/sync/canal/CanalSyncChannel.java    |   7 +-
 .../org/apache/doris/nereids/txn/Transaction.java  |   5 +-
 .../java/org/apache/doris/qe/SimpleScheduler.java  |   3 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   8 +-
 .../java/org/apache/doris/service/ExecuteEnv.java  |   7 ++
 .../apache/doris/service/FrontendServiceImpl.java  |  12 ++-
 .../java/org/apache/doris/system/HeartbeatMgr.java |  18 +++-
 .../doris/transaction/DatabaseTransactionMgr.java  |   7 +-
 .../doris/transaction/GlobalTransactionMgr.java    |  32 +++++--
 .../apache/doris/transaction/TransactionState.java |  15 ++-
 .../apache/doris/clone/BeDownCancelCloneTest.java  |   4 +-
 .../doris/cluster/DecommissionBackendTest.java     |   2 +-
 .../org/apache/doris/qe/SimpleSchedulerTest.java   |   8 +-
 .../transaction/DatabaseTransactionMgrTest.java    |  10 +-
 .../transaction/GlobalTransactionMgrTest.java      |  11 ++-
 .../doris/transaction/TransactionStateTest.java    |   5 +-
 .../apache/doris/utframe/TestWithFeService.java    |   2 +-
 gensrc/thrift/FrontendService.thrift               |   2 +
 .../org/apache/doris/regression/suite/Suite.groovy |  62 ++++++++++++
 .../suites/demo_p0/streamLoad_action.groovy        |   5 +
 .../stream_load/test_coordidator_be_restart.groovy | 106 +++++++++++++++++++++
 33 files changed, 349 insertions(+), 54 deletions(-)

diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 4a80eb35ca6..19d25e9ffa1 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -45,6 +45,7 @@
 #include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_context.h"
 #include "thrift/protocol/TDebugProtocol.h"
+#include "util/debug_points.h"
 #include "util/doris_metrics.h"
 #include "util/thrift_rpc_helper.h"
 #include "util/time.h"
@@ -242,6 +243,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* 
ctx) {
         request.__set_timeout(ctx->timeout_second);
     }
     request.__set_request_id(ctx->id.to_thrift());
+    request.__set_backend_id(_exec_env->master_info()->backend_id);
 
     TLoadTxnBeginResult result;
     Status status;
@@ -374,6 +376,8 @@ void 
StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx,
 }
 
 Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
+    DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK);
+
     
DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1);
 
     TLoadTxnCommitRequest request;
diff --git a/docs/en/docs/admin-manual/config/fe-config.md 
b/docs/en/docs/admin-manual/config/fe-config.md
index 1c2ab9939a1..fbab5cfcd15 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -587,6 +587,16 @@ Is it possible to configure dynamically: true
 
 Whether it is a configuration item unique to the Master FE node: true
 
+### `abort_txn_after_lost_heartbeat_time_second`
+
+Abort transaction time after lost heartbeat. The default value is 300, which 
means transactions of be will be aborted after lost heartbeat 300s.
+
+Default: 300(s)
+
+Is it possible to configure dynamically: true
+
+Whether it is a configuration item unique to the Master FE node: true
+
 #### `enable_access_file_without_broker`
 
 Default:false
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md 
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index c93b2d17e01..60e2198b711 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -587,6 +587,16 @@ FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。
 
 是否为 Master FE 节点独有的配置项:true
 
+#### `abort_txn_after_lost_heartbeat_time_second`
+
+丢失be心跳后丢弃be事务的时间。默认时间为三百秒,当三百秒fe没有接收到be心跳时,会丢弃该be的所有事务。
+
+默认值:300(秒)
+
+是否可以动态配置:true
+
+是否为 Master FE 节点独有的配置项:true
+
 #### `enable_access_file_without_broker`
 
 默认值:false
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a5c0ad36ad8..44882f21768 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1812,6 +1812,20 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static long max_backend_heartbeat_failure_tolerance_count = 1;
 
+    /**
+     * Abort transaction time after lost heartbeat.
+     * The default value is 300s, which means transactions of be will be 
aborted after lost heartbeat 300s.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int abort_txn_after_lost_heartbeat_time_second = 300;
+
+    /**
+     * Heartbeat interval in seconds.
+     * Default is 5, which means every 5 seconds, the master will send a 
heartbeat to all backends.
+     */
+    @ConfField(mutable = false, masterOnly = false)
+    public static int heartbeat_interval_second = 5;
+
     /**
      * The iceberg and hudi table will be removed in v1.3
      * Use multi catalog instead.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 9625db3ea5c..f95efcd3474 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -53,6 +53,7 @@ import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.external.jdbc.JdbcTableSink;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.ExprRewriter;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TUniqueId;
@@ -358,7 +359,9 @@ public class NativeInsertStmt extends InsertStmt {
                 LoadJobSourceType sourceType = 
LoadJobSourceType.INSERT_STREAMING;
                 transactionId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
                         Lists.newArrayList(targetTable.getId()), 
label.getLabelName(),
-                        new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        new TxnCoordinator(TxnSourceType.FE, 0,
+                                FrontendOptions.getLocalHostAddress(),
+                                ExecuteEnv.getInstance().getStartupTime()),
                         sourceType, timeoutSecond);
             }
             isTransactionBegin = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index 7308a225402..bdfffbe4802 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -27,7 +27,7 @@ import 
org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
 
 public class ClientPool {
     static GenericKeyedObjectPoolConfig heartbeatConfig = new 
GenericKeyedObjectPoolConfig();
-    static int heartbeatTimeoutMs = FeConstants.heartbeat_interval_second * 
1000;
+    static int heartbeatTimeoutMs = Config.heartbeat_interval_second * 1000;
 
     static GenericKeyedObjectPoolConfig backendConfig = new 
GenericKeyedObjectPoolConfig();
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 1afa12856f0..2f45d87e895 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -34,7 +34,6 @@ public class FeConstants {
     public static int shortkey_max_column_count = 3;
     public static int shortkey_maxsize_bytes = 36;
 
-    public static int heartbeat_interval_second = 5;
     public static int checkpoint_interval_second = 60; // 1 minutes
 
     // dpp version
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index b358ea60b9a..c021c62f367 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -22,6 +22,7 @@ import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.LoadException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.entity.RestBaseResult;
 import org.apache.doris.httpv2.exception.UnauthorizedException;
@@ -229,6 +230,11 @@ public class LoadAction extends RestBaseController {
     }
 
     private TNetworkAddress selectRedirectBackend(String clusterName) throws 
LoadException {
+        long debugBackendId = 
DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId",
 -1L);
+        if (debugBackendId != -1L) {
+            Backend backend = 
Env.getCurrentSystemInfo().getBackend(debugBackendId);
+            return new TNetworkAddress(backend.getHost(), 
backend.getHttpPort());
+        }
         String qualifiedUser = ConnectContext.get().getQualifiedUser();
         Set<Tag> userTags = 
Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
         BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
index b771eb8c9fa..70b3765227a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
@@ -69,6 +69,7 @@ import org.apache.doris.planner.RangePartitionPrunerV2;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
@@ -245,7 +246,9 @@ public class DeleteHandler implements Writable {
                 // begin txn here and generate txn id
                 transactionId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
                         Lists.newArrayList(olapTable.getId()), label, null,
-                        new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        new TxnCoordinator(TxnSourceType.FE, 0,
+                                FrontendOptions.getLocalHostAddress(),
+                                ExecuteEnv.getInstance().getStartupTime()),
                         TransactionState.LoadJobSourceType.FRONTEND, jobId, 
Config.stream_load_default_timeout_second);
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 656b9b9d699..4591bb91b03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -49,6 +49,7 @@ import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.BeginTransactionException;
@@ -104,7 +105,9 @@ public class BrokerLoadJob extends BulkLoadJob {
             QuotaExceedException, MetaNotFoundException {
         transactionId = Env.getCurrentGlobalTransactionMgr()
                 .beginTransaction(dbId, 
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
-                        new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        new TxnCoordinator(TxnSourceType.FE, 0,
+                                FrontendOptions.getLocalHostAddress(),
+                                ExecuteEnv.getInstance().getStartupTime()),
                         TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
                         getTimeout());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 09a85d3dffe..878a400e281 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -61,6 +61,7 @@ import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.EtlStatus;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.sparkdpp.DppResult;
 import org.apache.doris.sparkdpp.EtlJobConfig;
@@ -198,7 +199,9 @@ public class SparkLoadJob extends BulkLoadJob {
             QuotaExceedException, MetaNotFoundException {
         transactionId = Env.getCurrentGlobalTransactionMgr()
                 .beginTransaction(dbId, 
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
-                        new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        new TxnCoordinator(TxnSourceType.FE, 0,
+                                FrontendOptions.getLocalHostAddress(),
+                                ExecuteEnv.getInstance().getStartupTime()),
                         LoadJobSourceType.FRONTEND, id, getTimeout());
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 10d57e66d67..ae2570224c4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.QuotaExceedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.transaction.BeginTransactionException;
@@ -206,7 +207,9 @@ public abstract class RoutineLoadTaskInfo {
         try {
             txnId = 
Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(),
                     Lists.newArrayList(routineLoadJob.getTableId()), 
DebugUtil.printId(id), null,
-                    new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                    new TxnCoordinator(TxnSourceType.FE, 0,
+                            FrontendOptions.getLocalHostAddress(),
+                            ExecuteEnv.getInstance().getStartupTime()),
                     TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, 
routineLoadJob.getId(),
                     timeoutMs / 1000);
         } catch (DuplicatedRequestException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index 5b10ecea818..825d63292ef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -32,6 +32,7 @@ import org.apache.doris.load.sync.SyncJob;
 import org.apache.doris.load.sync.model.Data;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.InsertStreamTxnExecutor;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.SyncTask;
 import org.apache.doris.task.SyncTaskPool;
@@ -133,8 +134,10 @@ public class CanalSyncChannel extends SyncChannel {
                 try {
                     long txnId = 
globalTransactionMgr.beginTransaction(db.getId(),
                             Lists.newArrayList(tbl.getId()), label,
-                        new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
-                            FrontendOptions.getLocalHostAddress()), 
sourceType, timeoutSecond);
+                            new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+                                    FrontendOptions.getLocalHostAddress(),
+                                    ExecuteEnv.getInstance().getStartupTime()),
+                            sourceType, timeoutSecond);
                     String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
                     request = new TStreamLoadPutRequest()
                         
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
index 994f5e36055..db6598a1e1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/txn/Transaction.java
@@ -34,6 +34,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.LoadEtlTask;
 import org.apache.doris.thrift.TQueryType;
@@ -80,7 +81,9 @@ public class Transaction {
         this.coordinator = new Coordinator(ctx, null, planner, 
ctx.getStatsErrorEstimator());
         this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
                 database.getId(), ImmutableList.of(table.getId()), labelName,
-                new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                new TxnCoordinator(TxnSourceType.FE, 0,
+                        FrontendOptions.getLocalHostAddress(),
+                        ExecuteEnv.getInstance().getStartupTime()),
                 LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
         this.createAt = System.currentTimeMillis();
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
index ace88c8ac0d..48c7744576c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java
@@ -19,7 +19,6 @@ package org.apache.doris.qe;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.Reference;
 import org.apache.doris.common.UserException;
@@ -176,7 +175,7 @@ public class SimpleScheduler {
             return;
         }
 
-        blacklistBackends.put(backendID, 
Pair.of(FeConstants.heartbeat_interval_second + 1, reason));
+        blacklistBackends.put(backendID, 
Pair.of(Config.heartbeat_interval_second + 1, reason));
         LOG.warn("add backend {} to black list. reason: {}", backendID, 
reason);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 8dda65e81f7..f47121348f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -160,6 +160,7 @@ import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.ResultRow;
 import org.apache.doris.statistics.util.InternalQueryBuffer;
@@ -1908,9 +1909,10 @@ public class StmtExecutor {
         String label = txnEntry.getLabel();
         if (Env.getCurrentEnv().isMaster()) {
             long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
-                    txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
-                    label, new TransactionState.TxnCoordinator(
-                            TransactionState.TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                    txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), 
label,
+                    new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+                            FrontendOptions.getLocalHostAddress(),
+                            ExecuteEnv.getInstance().getStartupTime()),
                     sourceType, timeoutSecond);
             txnConf.setTxnId(txnId);
             String token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
index a7ac522b5bd..ecd544c8bff 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
@@ -26,10 +26,12 @@ public class ExecuteEnv {
     private static volatile ExecuteEnv INSTANCE;
     private MultiLoadMgr multiLoadMgr;
     private ConnectScheduler scheduler;
+    private long startupTime;
 
     private ExecuteEnv() {
         multiLoadMgr = new MultiLoadMgr();
         scheduler = new ConnectScheduler(Config.qe_max_connection);
+        startupTime = System.currentTimeMillis();
     }
 
     public static ExecuteEnv getInstance() {
@@ -50,4 +52,9 @@ public class ExecuteEnv {
     public MultiLoadMgr getMultiLoadMgr() {
         return multiLoadMgr;
     }
+
+    public long getStartupTime() {
+        return startupTime;
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 36fdec157ac..a38921221fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1249,10 +1249,12 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, 
TableType.OLAP);
         // begin
         long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : 
Config.stream_load_default_timeout_second;
+        Backend backend = 
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
+        long startTime = backend != null ? backend.getLastStartTime() : 0;
+        TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, 
request.getBackendId(), clientIp, startTime);
         long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
                 db.getId(), Lists.newArrayList(table.getId()), 
request.getLabel(), request.getRequestId(),
-                new TxnCoordinator(TxnSourceType.BE, clientIp),
-                TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, 
timeoutSecond);
+                txnCoord, 
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
         TLoadTxnBeginResult result = new TLoadTxnBeginResult();
         result.setTxnId(txnId).setDbId(db.getId());
         return result;
@@ -1356,10 +1358,12 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         // step 5: get timeout
         long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : 
Config.stream_load_default_timeout_second;
 
+        Backend backend = 
Env.getCurrentSystemInfo().getBackend(request.getBackendId());
+        long startTime = backend != null ? backend.getLastStartTime() : 0;
+        TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, 
request.getBackendId(), clientIp, startTime);
         // step 6: begin transaction
         long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
-                db.getId(), tableIdList, request.getLabel(), 
request.getRequestId(),
-                new TxnCoordinator(TxnSourceType.BE, clientIp),
+                db.getId(), tableIdList, request.getLabel(), 
request.getRequestId(), txnCoord,
                 TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, 
timeoutSecond);
 
         // step 7: return result
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 7c081c12cd0..f621ae1e322 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -77,7 +77,7 @@ public class HeartbeatMgr extends MasterDaemon {
     private static volatile AtomicReference<TMasterInfo> masterInfo = new 
AtomicReference<>();
 
     public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) 
{
-        super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000);
+        super("heartbeat mgr", Config.heartbeat_interval_second * 1000);
         this.nodeMgr = nodeMgr;
         this.executor = 
ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num,
                 Config.heartbeat_mgr_blocking_queue_size, 
"heartbeat-mgr-pool", needRegisterMetric);
@@ -168,13 +168,21 @@ public class HeartbeatMgr extends MasterDaemon {
                 BackendHbResponse hbResponse = (BackendHbResponse) response;
                 Backend be = nodeMgr.getBackend(hbResponse.getBeId());
                 if (be != null) {
+                    long oldStartTime = be.getLastStartTime();
                     boolean isChanged = be.handleHbResponse(hbResponse, 
isReplay);
-                    if (hbResponse.getStatus() != HbStatus.OK) {
+                    if (hbResponse.getStatus() == HbStatus.OK) {
+                        long newStartTime = be.getLastStartTime();
+                        if (!isReplay && oldStartTime != newStartTime) {
+                            
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
+                                    be.getId(), be.getHost(), newStartTime);
+                        }
+                    } else {
                         // invalid all connections cached in ClientPool
                         ClientPool.backendPool.clearPool(new 
TNetworkAddress(be.getHost(), be.getBePort()));
-                        if (!isReplay) {
-                            Env.getCurrentEnv().getGlobalTransactionMgr()
-                                    
.abortTxnWhenCoordinateBeDown(be.getHost(), 100);
+                        if (!isReplay && System.currentTimeMillis() - 
be.getLastUpdateMs()
+                                >= 
Config.abort_txn_after_lost_heartbeat_time_second * 1000L) {
+                            
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown(
+                                    be.getId(), be.getHost(), 100);
                         }
                     }
                     return isChanged;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 7ef043136ba..d733d863f4e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -1733,13 +1733,16 @@ public class DatabaseTransactionMgr {
         return null;
     }
 
-    public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String 
coordinateHost, int limit) {
+    public List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long 
coordinateBeId,
+            String coordinateHost, int limit) {
         ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
         readLock();
         try {
             idToRunningTransactionState.values().stream()
                     .filter(t -> (t.getCoordinator().sourceType == 
TransactionState.TxnSourceType.BE
-                            && t.getCoordinator().ip.equals(coordinateHost)))
+                            && t.getTransactionStatus() == 
TransactionStatus.PREPARE
+                            && t.getCoordinator().ip.equals(coordinateHost)
+                            && (t.getCoordinator().id == 0 || 
t.getCoordinator().id == coordinateBeId)))
                     .limit(limit)
                     .forEach(t -> txnInfos.add(Pair.of(t.getDbId(), 
t.getTransactionId())));
         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 22a019c4c0d..35c2195eb33 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -653,10 +653,12 @@ public class GlobalTransactionMgr implements Writable {
         }
     }
 
-    public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String 
coordinateHost, int limit) {
+    private List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long 
coordinateBeId,
+            String coordinateHost, int limit) {
         ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
         for (DatabaseTransactionMgr databaseTransactionMgr : 
dbIdToDatabaseTransactionMgrs.values()) {
-            
txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost,
 limit));
+            
txnInfos.addAll(databaseTransactionMgr.getPrepareTransactionIdByCoordinateBe(
+                        coordinateBeId, coordinateHost, limit));
             if (txnInfos.size() > limit) {
                 break;
             }
@@ -664,19 +666,33 @@ public class GlobalTransactionMgr implements Writable {
         return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0, 
limit)) : txnInfos;
     }
 
+    public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String 
coordinateHost, long beStartTime) {
+        List<Pair<Long, Long>> transactionIdByCoordinateBe
+                = getPrepareTransactionIdByCoordinateBe(coordinateBeId, 
coordinateHost, Integer.MAX_VALUE);
+        for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
+            try {
+                DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(txnInfo.first);
+                TransactionState transactionState = 
dbTransactionMgr.getTransactionState(txnInfo.second);
+                long coordStartTime = 
transactionState.getCoordinator().startTime;
+                if (coordStartTime < beStartTime) {
+                    dbTransactionMgr.abortTransaction(txnInfo.second, 
"coordinate BE restart", null);
+                }
+            } catch (UserException e) {
+                LOG.warn("Abort txn on coordinate BE {} failed, msg={}", 
coordinateHost, e.getMessage());
+            }
+        }
+    }
+
     /**
      * If a Coordinate BE is down when running txn, the txn will remain in FE 
until killed by timeout
      * So when FE identify the Coordinate BE is down, FE should cancel it 
initiative
      */
-    public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) 
{
-        List<Pair<Long, Long>> transactionIdByCoordinateBe = 
getTransactionIdByCoordinateBe(coordinateHost, limit);
+    public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String 
coordinateHost, int limit) {
+        List<Pair<Long, Long>> transactionIdByCoordinateBe
+                = getPrepareTransactionIdByCoordinateBe(coordinateBeId, 
coordinateHost, limit);
         for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
             try {
                 DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(txnInfo.first);
-                TransactionState transactionState = 
dbTransactionMgr.getTransactionState(txnInfo.second);
-                if (transactionState.getTransactionStatus() == 
TransactionStatus.PRECOMMITTED) {
-                    continue;
-                }
                 dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate 
BE is down", null);
             } catch (UserException e) {
                 LOG.warn("Abort txn on coordinate BE {} failed, msg={}", 
coordinateHost, e.getMessage());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index f9a094eceb9..cdef27c9359 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -162,15 +162,23 @@ public class TransactionState implements Writable {
     public static class TxnCoordinator {
         @SerializedName(value = "sourceType")
         public TxnSourceType sourceType;
+        // backendId for backend, 0 for frontend
+        @SerializedName(value = "id")
+        public long id = 0;
         @SerializedName(value = "ip")
         public String ip;
+        // frontend/backend start time
+        @SerializedName(value = "st")
+        public long startTime = 0;
 
         public TxnCoordinator() {
         }
 
-        public TxnCoordinator(TxnSourceType sourceType, String ip) {
+        public TxnCoordinator(TxnSourceType sourceType, long id, String ip, 
long startTime) {
             this.sourceType = sourceType;
+            this.id = id;
             this.ip = ip;
+            this.startTime = startTime;
         }
 
         @Override
@@ -301,7 +309,8 @@ public class TransactionState implements Writable {
         this.transactionId = -1;
         this.label = "";
         this.idToTableCommitInfos = Maps.newHashMap();
-        this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 
"127.0.0.1"); // mocked, to avoid NPE
+        // mocked, to avoid NPE
+        this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 0, 
"127.0.0.1", System.currentTimeMillis());
         this.transactionStatus = TransactionStatus.PREPARE;
         this.sourceType = LoadJobSourceType.FRONTEND;
         this.prepareTime = -1;
@@ -721,7 +730,7 @@ public class TransactionState implements Writable {
             info.readFields(in);
             idToTableCommitInfos.put(info.getTableId(), info);
         }
-        txnCoordinator = new 
TxnCoordinator(TxnSourceType.valueOf(in.readInt()), Text.readString(in));
+        txnCoordinator = new 
TxnCoordinator(TxnSourceType.valueOf(in.readInt()), 0, Text.readString(in), 0);
         transactionStatus = TransactionStatus.valueOf(in.readInt());
         sourceType = LoadJobSourceType.valueOf(in.readInt());
         prepareTime = in.readLong();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
index 4a413495e98..e288d046de1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
@@ -54,7 +54,7 @@ public class BeDownCancelCloneTest extends TestWithFeService {
         Config.disable_balance = true;
         Config.schedule_batch_size = 1000;
         Config.schedule_slot_num_per_hdd_path = 1000;
-        FeConstants.heartbeat_interval_second = 5;
+        Config.heartbeat_interval_second = 5;
         Config.max_backend_heartbeat_failure_tolerance_count = 1;
         Config.min_clone_task_timeout_sec = 20 * 60 * 1000;
     }
@@ -114,7 +114,7 @@ public class BeDownCancelCloneTest extends 
TestWithFeService {
         params2.put("deadBeIds", String.valueOf(destBeId));
         
DebugPointUtil.addDebugPointWithParams("HeartbeatMgr.BackendHeartbeatHandler", 
params2);
 
-        Thread.sleep((FeConstants.heartbeat_interval_second
+        Thread.sleep((Config.heartbeat_interval_second
                 * Config.max_backend_heartbeat_failure_tolerance_count + 4) * 
1000L);
 
         destBe = Env.getCurrentSystemInfo().getBackend(destBeId);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index b917bc41a7e..ff5f5292be8 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -66,7 +66,7 @@ public class DecommissionBackendTest extends 
TestWithFeService {
         Config.disable_balance = true;
         Config.schedule_batch_size = 1000;
         Config.schedule_slot_num_per_hdd_path = 1000;
-        FeConstants.heartbeat_interval_second = 5;
+        Config.heartbeat_interval_second = 5;
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
index 6ba2d271566..ac13900d2cf 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.qe;
 
-import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.Reference;
 import org.apache.doris.common.UserException;
 import org.apache.doris.system.Backend;
@@ -47,7 +47,7 @@ public class SimpleSchedulerTest {
     @BeforeClass
     public static void setUp() {
         SimpleScheduler.init();
-        FeConstants.heartbeat_interval_second = 2;
+        Config.heartbeat_interval_second = 2;
         be1 = new Backend(1000L, "192.168.100.0", 9050);
         be2 = new Backend(1001L, "192.168.100.1", 9050);
         be3 = new Backend(1002L, "192.168.100.2", 9050);
@@ -139,7 +139,7 @@ public class SimpleSchedulerTest {
         t3.join();
 
         Assert.assertFalse(SimpleScheduler.isAvailable(be1));
-        Thread.sleep((FeConstants.heartbeat_interval_second + 5) * 1000);
+        Thread.sleep((Config.heartbeat_interval_second + 5) * 1000);
         Assert.assertTrue(SimpleScheduler.isAvailable(be1));
     }
 
@@ -194,7 +194,7 @@ public class SimpleSchedulerTest {
             System.out.println(e.getMessage());
         }
 
-        Thread.sleep((FeConstants.heartbeat_interval_second + 5) * 1000);
+        Thread.sleep((Config.heartbeat_interval_second + 5) * 1000);
         
Assert.assertNotNull(SimpleScheduler.getHost(locations.get(0).backend_id, 
locations, backends, ref));
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 9108570e5e4..ea63a5e18b1 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -57,8 +57,8 @@ public class DatabaseTransactionMgrTest {
     private static Env slaveEnv;
     private static Map<String, Long> LabelToTxnId;
 
-    private TransactionState.TxnCoordinator transactionSource =
-            new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+    private TransactionState.TxnCoordinator transactionSource = new 
TransactionState.TxnCoordinator(
+            TransactionState.TxnSourceType.FE, 0, "localfe", 
System.currentTimeMillis());
 
     public static void setTransactionFinishPublish(TransactionState 
transactionState, List<Long> backendIds) {
         for (long backendId : backendIds) {
@@ -118,7 +118,9 @@ public class DatabaseTransactionMgrTest {
         masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId1);
         labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1);
 
-        TransactionState.TxnCoordinator beTransactionSource = new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1");
+        // txn 2, 3, 4
+        TransactionState.TxnCoordinator beTransactionSource = new 
TransactionState.TxnCoordinator(
+                TransactionState.TxnSourceType.BE, 0, "be1", 
System.currentTimeMillis());
         long transactionId2 = 
masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(CatalogTestUtil.testTableId1),
                 CatalogTestUtil.testTxnLabel2,
                 beTransactionSource,
@@ -204,7 +206,7 @@ public class DatabaseTransactionMgrTest {
     @Test
     public void testGetTransactionIdByCoordinateBe() throws UserException {
         DatabaseTransactionMgr masterDbTransMgr = 
masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1);
-        List<Pair<Long, Long>> transactionInfoList = 
masterDbTransMgr.getTransactionIdByCoordinateBe("be1", 10);
+        List<Pair<Long, Long>> transactionInfoList = 
masterDbTransMgr.getPrepareTransactionIdByCoordinateBe(0, "be1", 10);
         Assert.assertEquals(3, transactionInfoList.size());
         Assert.assertEquals(CatalogTestUtil.testDbId1, 
transactionInfoList.get(0).first.longValue());
         Assert.assertEquals(TransactionStatus.PREPARE,
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 414f5cf03c4..cc00237a4c4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -77,7 +77,8 @@ public class GlobalTransactionMgrTest {
     private static Env masterEnv;
     private static Env slaveEnv;
 
-    private TransactionState.TxnCoordinator transactionSource = new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
+    private TransactionState.TxnCoordinator transactionSource = new 
TransactionState.TxnCoordinator(
+            TransactionState.TxnSourceType.FE, 0, "localfe", 
System.currentTimeMillis());
 
     @Before
     public void setUp() throws InstantiationException, IllegalAccessException, 
IllegalArgumentException,
@@ -323,7 +324,9 @@ public class GlobalTransactionMgrTest {
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
         TransactionState transactionState = new TransactionState(1L, 
Lists.newArrayList(1L), 1L, "label", null,
-                LoadJobSourceType.ROUTINE_LOAD_TASK, new 
TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(),
+                LoadJobSourceType.ROUTINE_LOAD_TASK,
+                new TxnCoordinator(TxnSourceType.BE, 0, "be1", 
System.currentTimeMillis()),
+                routineLoadJob.getId(),
                 Config.stream_load_default_timeout_second);
         transactionState.setTransactionStatus(TransactionStatus.PREPARE);
         masterTransMgr.getCallbackFactory().addCallback(routineLoadJob);
@@ -395,7 +398,9 @@ public class GlobalTransactionMgrTest {
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
         TransactionState transactionState = new TransactionState(1L, 
Lists.newArrayList(1L), 1L, "label", null,
-                LoadJobSourceType.ROUTINE_LOAD_TASK, new 
TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(),
+                LoadJobSourceType.ROUTINE_LOAD_TASK,
+                new TxnCoordinator(TxnSourceType.BE, 0, "be1", 
System.currentTimeMillis()),
+                routineLoadJob.getId(),
                 Config.stream_load_default_timeout_second);
         transactionState.setTransactionStatus(TransactionStatus.PREPARE);
         masterTransMgr.getCallbackFactory().addCallback(routineLoadJob);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
index c20b2097f8d..f08b7478d06 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
@@ -61,8 +61,9 @@ public class TransactionStateTest {
         UUID uuid = UUID.randomUUID();
         TransactionState transactionState = new TransactionState(1000L, 
Lists.newArrayList(20000L, 20001L),
                 3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits()),
-                LoadJobSourceType.BACKEND_STREAMING, new 
TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), 50000L,
-                60 * 1000L);
+                LoadJobSourceType.BACKEND_STREAMING,
+                new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1", 
System.currentTimeMillis()),
+                50000L, 60 * 1000L);
 
         transactionState.write(out);
         out.flush();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index c61fcd28afe..59e4eae0a37 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -405,7 +405,7 @@ public abstract class TestWithFeService {
     }
 
     private void checkBEHeartbeat(List<Backend> bes) throws 
InterruptedException {
-        int maxTry = FeConstants.heartbeat_interval_second + 5;
+        int maxTry = Config.heartbeat_interval_second + 5;
         boolean allAlive = false;
         while (maxTry-- > 0 && !allAlive) {
             Thread.sleep(1000);
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index da4c915e9b0..75a6537ee85 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -559,6 +559,7 @@ struct TLoadTxnBeginRequest {
     10: optional i64 timeout
     11: optional Types.TUniqueId request_id
     12: optional string token
+    13: optional i64 backend_id
 }
 
 struct TLoadTxnBeginResult {
@@ -581,6 +582,7 @@ struct TBeginTxnRequest {
     9: optional i64 timeout
     10: optional Types.TUniqueId request_id
     11: optional string token
+    12: optional i64 backend_id
 }
 
 struct TBeginTxnResult {
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index afc23cc2cc0..9a14346af10 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -422,6 +422,41 @@ class Suite implements GroovyInterceptable {
         return result;
     }
 
+    long getTableId(String tableName) {
+        return getTableId(getDbName(), tableName)
+    }
+
+    long getTableId(String dbName, String tableName) {
+        def dbInfo = sql "show proc '/dbs'"
+        for(List<Object> row : dbInfo) {
+            if (row[1].replace("default_cluster:", "").equals(dbName)) {
+                def tbInfo = sql "show proc '/dbs/${row[0]}' "
+                for (List<Object> tb : tbInfo) {
+                    if (tb[1].equals(tableName)) {
+                        return tb[0].toLong()
+                    }
+                }
+            }
+        }
+    }
+
+    long getDbId() {
+        return getDbId(getDbName())
+    }
+
+    long getDbId(String dbName) {
+        def dbInfo = sql "show proc '/dbs'"
+        for (List<Object> row : dbInfo) {
+            if (row[1].replace("default_cluster:", "").equals(dbName)) {
+                return row[0].toLong()
+            }
+        }
+    }
+
+    String getDbName() {
+        return context.dbName
+    }
+
     List<List<Object>> order_sql(String sqlStr) {
         return sql(sqlStr,  true)
     }
@@ -681,6 +716,33 @@ class Suite implements GroovyInterceptable {
         return hdfs.downLoad(label)
     }
 
+    void runStreamLoadExample(String tableName, String coordidateBeHostPort = 
"") {
+        def backends = sql_return_maparray "show backends"
+        sql """
+                CREATE TABLE IF NOT EXISTS ${tableName} (
+                    id int,
+                    name varchar(255)
+                )
+                DISTRIBUTED BY HASH(id) BUCKETS 1
+                PROPERTIES (
+                  "replication_num" = "${backends.size()}"
+                )
+            """
+
+        streamLoad {
+            table tableName
+            set 'column_separator', ','
+            file context.config.dataPath + "/demo_p0/streamload_input.csv"
+            time 10000
+            if (!coordidateBeHostPort.equals("")) {
+                def pos = coordidateBeHostPort.indexOf(':')
+                def host = coordidateBeHostPort.substring(0, pos)
+                def httpPort = coordidateBeHostPort.substring(pos + 
1).toInteger()
+                directToBe host, httpPort
+            }
+        }
+    }
+
     void streamLoad(Closure actionSupplier) {
         runAction(new StreamLoadAction(context), actionSupplier)
     }
diff --git a/regression-test/suites/demo_p0/streamLoad_action.groovy 
b/regression-test/suites/demo_p0/streamLoad_action.groovy
index a11aed7a1c5..b0ca182c8d9 100644
--- a/regression-test/suites/demo_p0/streamLoad_action.groovy
+++ b/regression-test/suites/demo_p0/streamLoad_action.groovy
@@ -124,6 +124,11 @@ suite("streamLoad_action") {
         LIMIT 5;
     """
 
+    def tableName2 = "test_streamload_action2"
+    runStreamLoadExample(tableName2)
+
     sql """ DROP TABLE ${tableName} """
+    sql """ DROP TABLE ${tableName2}"""
+
     sql """ DROP TABLE B """
 }
diff --git 
a/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy 
b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy
new file mode 100644
index 00000000000..cd2b8ea9e48
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.http.NoHttpResponseException
+
+suite('test_coordidator_be_restart') {
+    def options = new ClusterOptions()
+    options.cloudMode = false
+    options.enableDebugPoints()
+
+    docker(options) {
+        def db = context.config.getDbNameByFile(context.file)
+        def tableName1 = 'tbl_test_coordidator_be_restart_t1'
+        setFeConfig('abort_txn_after_lost_heartbeat_time_second', 3600)
+
+        def dbId = getDbId()
+
+        def txns = sql_return_maparray "show proc 
'/transactions/${dbId}/running'"
+        assertEquals(0, txns.size())
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+        assertEquals(0, txns.size())
+
+        def coordinatorBe = cluster.getAllBackends().get(0)
+        def coordinatorBeHost = coordinatorBe.host
+
+        
GetDebugPoint().enableDebugPointForAllFEs('LoadAction.selectRedirectBackend.backendId',
 [value: coordinatorBe.backendId])
+        
GetDebugPoint().enableDebugPointForAllBEs('StreamLoadExecutor.commit_txn.block')
+
+        thread {
+            try {
+                runStreamLoadExample(tableName1, coordinatorBe.host + ':' + 
coordinatorBe.httpPort)
+            } catch (NoHttpResponseException t) {
+            // be down  will raise NoHttpResponseException
+            }
+        }
+
+        sleep(5000)
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+        logger.info('running txns: ' + txns)
+        assertEquals(1, txns.size())
+        for (def txn : txns) {
+            assertEquals('PREPARE', txn.TransactionStatus)
+        }
+
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+        assertEquals(0, txns.size())
+
+        // coordinatorBe shutdown not abort txn because 
abort_txn_after_lost_heartbeat_time_second = 3600
+        cluster.stopBackends(coordinatorBe.index)
+        def isDead = false
+        for (def i = 0; i < 10; i++) {
+            def be = sql_return_maparray('show backends').find { it.Host == 
coordinatorBeHost }
+            if (!be.Alive.toBoolean()) {
+                isDead = true
+                break
+            }
+            sleep 1000
+        }
+        assertTrue(isDead)
+        sleep 5000
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+        logger.info('running txns: ' + txns)
+        assertEquals(1, txns.size())
+        for (def txn : txns) {
+            assertEquals('PREPARE', txn.TransactionStatus)
+        }
+
+        // coordinatorBe restart, abort txn on it
+        cluster.startBackends(coordinatorBe.index)
+        def isAlive = false
+        for (def i = 0; i < 20; i++) {
+            def be = sql_return_maparray('show backends').find { it.Host == 
coordinatorBeHost }
+            if (be.Alive.toBoolean()) {
+                isAlive = true
+                break
+            }
+            sleep 1000
+        }
+        assertTrue(isAlive)
+        sleep 5000
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/running'"
+        logger.info('running txns: ' + txns)
+        assertEquals(0, txns.size())
+        txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'"
+        logger.info('finished txns: ' + txns)
+        assertEquals(1, txns.size())
+        for (def txn : txns) {
+            assertEquals('ABORTED', txn.TransactionStatus)
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to