This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b6ca76e7d44411a3b888bd021e2d7a60b9348ecf
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Fri Feb 23 20:55:44 2024 +0800

    fix routine load job throw exception after commit (#31303)
---
 .../java/org/apache/doris/load/routineload/RoutineLoadJob.java | 10 ++++++++++
 .../org/apache/doris/load/routineload/RoutineLoadJobTest.java  |  1 -
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index a8fb467a9bf..8760dc4b71c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1018,6 +1018,16 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                 // find task in job
                 Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = 
routineLoadTaskInfoList.stream().filter(
                         entity -> entity.getTxnId() == 
txnState.getTransactionId()).findFirst();
+                if (!routineLoadTaskInfoOptional.isPresent()) {
+                    // not find task in routineLoadTaskInfoList. this may 
happen in following case:
+                    //      the routine load job has been paused and before 
transaction committed.
+                    //      The routineLoadTaskInfoList will be cleared when 
job being paused.
+                    //      So the task can not be found here.
+                    // This is a normal case, we just print a log here to 
observe.
+                    LOG.info("Can not find task with transaction {} after 
committed, job: {}",
+                            txnState.getTransactionId(), id);
+                    return;
+                }
                 RoutineLoadTaskInfo routineLoadTaskInfo = 
routineLoadTaskInfoOptional.get();
                 taskBeId = routineLoadTaskInfo.getBeId();
                 executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, 
TransactionStatus.COMMITTED, null);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index df7f3633816..8d903957453 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -172,7 +172,6 @@ public class RoutineLoadJobTest {
         Deencapsulation.setField(routineLoadJob, "progress", progress);
         try {
             routineLoadJob.afterCommitted(transactionState, true);
-            Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, 
routineLoadJob.getState());
         } catch (TransactionException e) {
             Assert.fail();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to