This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b6ca76e7d44411a3b888bd021e2d7a60b9348ecf Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Fri Feb 23 20:55:44 2024 +0800 fix routine load job throw exception after commit (#31303) --- .../java/org/apache/doris/load/routineload/RoutineLoadJob.java | 10 ++++++++++ .../org/apache/doris/load/routineload/RoutineLoadJobTest.java | 1 - 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index a8fb467a9bf..8760dc4b71c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1018,6 +1018,16 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // find task in job Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter( entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); + if (!routineLoadTaskInfoOptional.isPresent()) { + // not find task in routineLoadTaskInfoList. this may happen in following case: + // the routine load job has been paused and before transaction committed. + // The routineLoadTaskInfoList will be cleared when job being paused. + // So the task can not be found here. + // This is a normal case, we just print a log here to observe. + LOG.info("Can not find task with transaction {} after committed, job: {}", + txnState.getTransactionId(), id); + return; + } RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); taskBeId = routineLoadTaskInfo.getBeId(); executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index df7f3633816..8d903957453 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -172,7 +172,6 @@ public class RoutineLoadJobTest { Deencapsulation.setField(routineLoadJob, "progress", progress); try { routineLoadJob.afterCommitted(transactionState, true); - Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); } catch (TransactionException e) { Assert.fail(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org