This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c57145b  [Bug] Fix bug that routine load may lost some data (#5093)
c57145b is described below

commit c57145b4c2633c0152fc02995c8c1912f59d25a6
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Wed Dec 23 09:33:52 2020 +0800

    [Bug] Fix bug that routine load may lost some data (#5093)
    
    In the previous implementation, whether a subtask is in commit or abort 
state,
    we will try to update the job progress, such as the consumed offset of 
kafka.
    Under normal circumstances, the aborted transaction does not consume any 
data,
    and all progress is 0, so even we update the progress, the progress will 
remain
    unchanged.
    However, in the case of high cluster load, the subtask may fail half of the 
execution on the BE side.
    At this time, although the task is aborted, part of the progress is updated.
    Cause the next subtask to skip these data for consumption, resulting in 
data loss.
---
 be/src/common/config.h                             | 14 +++---
 be/src/exec/olap_scanner.cpp                       |  2 +-
 be/src/olap/rowset/alpha_rowset.cpp                |  3 +-
 be/src/runtime/stream_load/stream_load_context.h   |  3 ++
 .../org/apache/doris/catalog/TabletStatMgr.java    |  8 +--
 .../main/java/org/apache/doris/common/Config.java  |  3 +-
 .../doris/load/routineload/KafkaProgress.java      | 17 +++++--
 .../load/routineload/KafkaRoutineLoadJob.java      | 58 +++++++++++-----------
 .../routineload/RLTaskTxnCommitAttachment.java     |  4 ++
 .../doris/load/routineload/RoutineLoadJob.java     | 21 +++++---
 .../load/routineload/RoutineLoadProgress.java      |  2 +-
 .../load/routineload/RoutineLoadTaskScheduler.java | 12 +++--
 .../doris/transaction/DatabaseTransactionMgr.java  | 11 ++--
 .../apache/doris/transaction/TransactionState.java |  5 +-
 .../doris/load/routineload/RoutineLoadJobTest.java | 24 ++++++---
 .../transaction/GlobalTransactionMgrTest.java      | 29 +++++------
 16 files changed, 126 insertions(+), 90 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 93f6bfe..6bef85a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -36,11 +36,11 @@ CONF_Int32(brpc_port, "8060");
 // the number of bthreads for brpc, the default value is set to -1, which 
means the number of bthreads is #cpu-cores
 CONF_Int32(brpc_num_threads, "-1")
 
-        // Declare a selection strategy for those servers have many ips.
-        // Note that there should at most one ip match this list.
-        // this is a list in semicolon-delimited format, in CIDR notation, 
e.g. 10.10.10.0/24
-        // If no ip match this rule, will choose one randomly.
-        CONF_String(priority_networks, "");
+// Declare a selection strategy for those servers have many ips.
+// Note that there should at most one ip match this list.
+// this is a list in semicolon-delimited format, in CIDR notation, e.g. 
10.10.10.0/24
+// If no ip match this rule, will choose one randomly.
+CONF_String(priority_networks, "");
 
 ////
 //// tcmalloc gc parameter
@@ -259,7 +259,7 @@ CONF_Int64(index_stream_cache_capacity, "10737418240");
 // CONF_Int64(max_packed_row_block_size, "20971520");
 
 // Cache for storage page size
-CONF_String(storage_page_cache_limit, "20G");
+CONF_String(storage_page_cache_limit, "20%");
 // whether to disable page cache feature in storage
 CONF_Bool(disable_storage_page_cache, "false");
 
@@ -372,7 +372,7 @@ CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // or encounter 'tablet writer write failed' error when loading.
 // CONF_Int32(tablet_writer_rpc_timeout_sec, "600");
 // OlapTableSink sender's send interval, should be less than the real response 
time of a tablet writer rpc.
-CONF_mInt32(olap_table_sink_send_interval_ms, "10");
+CONF_mInt32(olap_table_sink_send_interval_ms, "1");
 
 // Fragment thread pool
 CONF_Int32(fragment_pool_thread_num_min, "64");
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 5a959e5..05d5bf7 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -257,7 +257,7 @@ Status OlapScanner::get_batch(RuntimeState* state, 
RowBatch* batch, bool* eof) {
     int64_t raw_rows_threshold = raw_rows_read() + 
config::doris_scanner_row_num;
     {
         SCOPED_TIMER(_parent->_scan_timer);
-        SCOPED_TIMER(_parent->_scan_cpu_timer);
+        SCOPED_CPU_TIMER(_parent->_scan_cpu_timer);
         while (true) {
             // Batch is full, break
             if (batch->is_full()) {
diff --git a/be/src/olap/rowset/alpha_rowset.cpp 
b/be/src/olap/rowset/alpha_rowset.cpp
index f1e7af3..9ff8178 100644
--- a/be/src/olap/rowset/alpha_rowset.cpp
+++ b/be/src/olap/rowset/alpha_rowset.cpp
@@ -330,8 +330,7 @@ OLAPStatus AlphaRowset::init() {
             // table value column, so when first start the two number is not 
the same,
             // it causes start failed. When `expect_zone_maps_num > 
zone_maps_size` it may be the first start after upgrade
             if (expect_zone_maps_num > zone_maps_size) {
-                LOG(WARNING)
-                        << "tablet: " << _rowset_meta->tablet_id() << " expect 
zone map size is "
+                VLOG(1) << "tablet: " << _rowset_meta->tablet_id() << " expect 
zone map size is "
                         << expect_zone_maps_num << ", actual num is " << 
zone_maps_size
                         << ". If this is not the first start after upgrade, 
please pay attention!";
             }
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index 1537a17..c0d61ab 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -45,6 +45,9 @@ public:
               topic(t_info.topic),
               begin_offset(t_info.partition_begin_offset),
               properties(t_info.properties) {
+        // The offset(begin_offset) sent from FE is the starting offset,
+        // and the offset(cmt_offset) reported by BE to FE is the consumed 
offset,
+        // so we need to minus 1 here.
         for (auto& p : t_info.partition_begin_offset) {
             cmt_offset[p.first] = p.second - 1;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index 25501bd..a63784b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -28,11 +28,11 @@ import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TTabletStat;
 import org.apache.doris.thrift.TTabletStatResult;
 
-import com.google.common.collect.ImmutableMap;
-
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.google.common.collect.ImmutableMap;
+
 import java.util.List;
 import java.util.Map;
 
@@ -61,7 +61,7 @@ public class TabletStatMgr extends MasterDaemon {
                 client = ClientPool.backendPool.borrowObject(address);
                 TTabletStatResult result = client.getTabletStat();
 
-                LOG.info("get tablet stat from backend: {}, num: {}", 
backend.getId(), result.getTabletsStatsSize());
+                LOG.debug("get tablet stat from backend: {}, num: {}", 
backend.getId(), result.getTabletsStatsSize());
                 updateTabletStat(backend.getId(), result);
 
                 ok = true;
@@ -112,7 +112,7 @@ public class TabletStatMgr extends MasterDaemon {
                             index.setRowCount(indexRowCount);
                         } // end for indices
                     } // end for partitions
-                    LOG.info("finished to set row num for table: {} in 
database: {}",
+                    LOG.debug("finished to set row num for table: {} in 
database: {}",
                              table.getName(), db.getFullName());
                 }
             } finally {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 83e5237..c3ba37a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -68,7 +68,8 @@ public class Config extends ConfigBase {
     public static String sys_log_dir = PaloFe.DORIS_HOME_DIR + "/log";
     @ConfField public static String sys_log_level = "INFO"; 
     @ConfField public static int sys_log_roll_num = 10;
-    @ConfField public static String[] sys_log_verbose_modules = 
{"org.apache.thrift", "org.apache.doris.thrift", "org.apache.doris.http", 
"org.apache.doris.service.FrontendServiceImpl"};
+    @ConfField
+    public static String[] sys_log_verbose_modules = {};
     @ConfField public static String sys_log_roll_interval = "DAY";
     @ConfField public static String sys_log_delete_age = "7d";
     @Deprecated
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index 34a5a4e..6d704d9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -19,8 +19,12 @@ package org.apache.doris.load.routineload;
 
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.thrift.TKafkaRLTaskProgress;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import com.google.common.base.Joiner;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
@@ -38,6 +42,8 @@ import java.util.Map;
  */
 // {"partitionIdToOffset": {}}
 public class KafkaProgress extends RoutineLoadProgress {
+    private static final Logger LOG = 
LogManager.getLogger(KafkaProgress.class);
+
     public static final String OFFSET_BEGINNING = "OFFSET_BEGINNING"; // -2
     public static final String OFFSET_END = "OFFSET_END"; // -1
     // OFFSET_ZERO is just for show info, if user specified offset is 0
@@ -47,7 +53,7 @@ public class KafkaProgress extends RoutineLoadProgress {
     public static final long OFFSET_END_VAL = -1;
 
     // (partition id, begin offset)
-    // the offset the next msg to be consumed
+    // the offset saved here is the next offset need to be consumed
     private Map<Integer, Long> partitionIdToOffset = Maps.newConcurrentMap();
 
     public KafkaProgress() {
@@ -101,12 +107,13 @@ public class KafkaProgress extends RoutineLoadProgress {
             } else if (entry.getValue() == -2) {
                 showPartitionIdToOffset.put(entry.getKey(), OFFSET_BEGINNING);
             } else {
+                // The offset saved in partitionIdToOffset is the next offset 
to be consumed.
+                // So here we minus 1 to return the "already consumed" offset.
                 showPartitionIdToOffset.put(entry.getKey(), "" + 
(entry.getValue() - 1));
             }
         }
     }
 
-
     // modify the partition offset of this progress.
     // throw exception is the specified partition does not exist in progress.
     public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) 
throws DdlException {
@@ -138,11 +145,13 @@ public class KafkaProgress extends RoutineLoadProgress {
     }
 
     @Override
-    public void update(RoutineLoadProgress progress) {
-        KafkaProgress newProgress = (KafkaProgress) progress;
+    public void update(RLTaskTxnCommitAttachment attachment) {
+        KafkaProgress newProgress = (KafkaProgress) attachment.getProgress();
         // + 1 to point to the next msg offset to be consumed
         newProgress.partitionIdToOffset.entrySet().stream()
                 .forEach(entity -> 
this.partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1));
+        LOG.debug("update kafka progress: {}, task: {}, job: {}",
+                newProgress.toJsonString(), 
DebugUtil.printId(attachment.getTaskId()), attachment.getJobId());
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index a9e6b78..a72944a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -42,17 +42,19 @@ import org.apache.doris.common.util.SmallFileMgr;
 import org.apache.doris.common.util.SmallFileMgr.SmallFile;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
 import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -145,7 +147,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         }
     }
 
-
     @Override
     public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws 
UserException {
         List<RoutineLoadTaskInfo> result = new ArrayList<>();
@@ -198,46 +199,45 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         return currentTaskConcurrentNum;
     }
 
-    // case1: BE execute the task successfully and commit it to FE, but failed 
on FE(such as db renamed, not found),
-    //        after commit failed, BE try to rollback this txn, and loaded 
rows in its attachment is larger than 0.
-    //        In this case, FE should not update the progress.
-    //
-    // case2: partitionIdToOffset must be not empty when loaded rows > 0
-    //        be commit txn but fe throw error when committing txn,
-    //        fe rollback txn without partitionIdToOffset by itself
-    //        this task should not be commit
-    //        otherwise currentErrorNum and currentTotalNum is updated when 
progress is not updated
+    // Through the transaction status and attachment information, to determine 
whether the progress needs to be updated.
     @Override
     protected boolean checkCommitInfo(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment,
-                                      TransactionStatus txnStatus) {
-        if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && txnStatus == 
TransactionStatus.ABORTED) {
-            // case 1
-            return false;
+                                      TransactionState txnState,
+                                      TransactionState.TxnStatusChangeReason 
txnStatusChangeReason) {
+        if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED) {
+            // For committed txn, update the progress.
+            return true;
         }
 
-        if (rlTaskTxnCommitAttachment.getLoadedRows() > 0
-                && (!((KafkaProgress) 
rlTaskTxnCommitAttachment.getProgress()).hasPartition())) {
-            // case 2
-            LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, 
DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()))
-                    .add("job_id", id)
-                    .add("loaded_rows", 
rlTaskTxnCommitAttachment.getLoadedRows())
-                    .add("progress_partition_offset_size", 0)
-                    .add("msg", "commit attachment info is incorrect"));
-            return false;
+        if (txnStatusChangeReason != null && txnStatusChangeReason == 
TransactionState.TxnStatusChangeReason.NO_PARTITIONS) {
+            // Because the max_filter_ratio of routine load task is always 1.
+            // Therefore, under normal circumstances, routine load task will 
not return the error "too many filtered rows".
+            // If no data is imported, the error "all partitions have no load 
data" may only be returned.
+            // In this case, the status of the transaction is ABORTED,
+            // but we still need to update the offset to skip these error 
lines.
+            Preconditions.checkState(txnState.getTransactionStatus() == 
TransactionStatus.ABORTED, txnState.getTransactionStatus());
+            return true;
         }
-        return true;
+
+        // Running here, the status of the transaction should be ABORTED,
+        // and it is caused by other errors. In this case, we should not 
update the offset.
+        LOG.debug("no need to update the progress of kafka routine load. txn 
status: {}, " +
+                        "txnStatusChangeReason: {}, task: {}, job: {}",
+                txnState.getTransactionStatus(), txnStatusChangeReason,
+                DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()), id);
+        return false;
     }
 
     @Override
     protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws 
UserException {
         super.updateProgress(attachment);
-        this.progress.update(attachment.getProgress());
+        this.progress.update(attachment);
     }
 
     @Override
     protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
         super.replayUpdateProgress(attachment);
-        this.progress.update(attachment.getProgress());
+        this.progress.update(attachment);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java
index 70d67e7..73417ae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java
@@ -67,6 +67,10 @@ public class RLTaskTxnCommitAttachment extends 
TxnCommitAttachment {
         }
     }
 
+    public long getJobId() {
+        return jobId;
+    }
+
     public TUniqueId getTaskId() {
         return taskId;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index e82d234..f1d858b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -69,6 +69,10 @@ import 
org.apache.doris.transaction.AbstractTxnStateChangeCallback;
 import org.apache.doris.transaction.TransactionException;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -77,8 +81,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -878,8 +880,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                         entity -> entity.getTxnId() == 
txnState.getTransactionId()).findFirst();
                 RoutineLoadTaskInfo routineLoadTaskInfo = 
routineLoadTaskInfoOptional.get();
                 taskBeId = routineLoadTaskInfo.getBeId();
-                executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, 
TransactionStatus.COMMITTED);
+                executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, 
TransactionStatus.COMMITTED, null);
                 ++committedTaskNum;
+                LOG.debug("routine load task committed. task id: {}, job id: 
{}", txnState.getLabel(), id);
             }
         } catch (Throwable e) {
             LOG.warn("after committed failed", e);
@@ -989,8 +992,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                                       .build());
                 }
                 ++abortedTaskNum;
+                TransactionState.TxnStatusChangeReason txnStatusChangeReason = 
null;
                 if (txnStatusChangeReasonString != null) {
-                    TransactionState.TxnStatusChangeReason 
txnStatusChangeReason =
+                    txnStatusChangeReason =
                             
TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString);
                     if (txnStatusChangeReason != null) {
                         switch (txnStatusChangeReason) {
@@ -1009,7 +1013,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                     // TODO(ml): use previous be id depend on change reason
                 }
                 // step2: commit task , update progress, maybe create a new 
task
-                executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, 
TransactionStatus.ABORTED);
+                executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, 
TransactionStatus.ABORTED, txnStatusChangeReason);
             }
         } catch (Exception e) {
             String msg = "be " + taskBeId + " abort task " + 
txnState.getLabel() + " failed with error " + e.getMessage();
@@ -1037,7 +1041,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
     // check task exists or not before call method
     private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo 
routineLoadTaskInfo, TransactionState txnState,
-            TransactionStatus txnStatus) throws UserException {
+            TransactionStatus txnStatus, 
TransactionState.TxnStatusChangeReason txnStatusChangeReason) throws 
UserException {
         // step0: get progress from transaction state
         RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment();
         if (rlTaskTxnCommitAttachment == null) {
@@ -1049,7 +1053,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                                           + " maybe task was aborted by master 
when timeout")
                                   .build());
             }
-        } else if (checkCommitInfo(rlTaskTxnCommitAttachment, 
txnState.getTransactionStatus())) {
+        } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, 
txnStatusChangeReason)) {
             // step2: update job progress
             updateProgress(rlTaskTxnCommitAttachment);
         }
@@ -1256,7 +1260,8 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
     // check the correctness of commit info
     protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment,
-            TransactionStatus txnStatus);
+                                               TransactionState txnState,
+                                               
TransactionState.TxnStatusChangeReason txnStatusChangeReason);
 
     protected abstract String getStatistic();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
index c8948d4..bf746a6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
@@ -37,7 +37,7 @@ public abstract class RoutineLoadProgress implements Writable 
{
         this.loadDataSourceType = loadDataSourceType;
     }
 
-    abstract void update(RoutineLoadProgress progress);
+    abstract void update(RLTaskTxnCommitAttachment attachment);
 
     abstract String toJsonString();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index d66476f..e353c79 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -36,13 +36,13 @@ import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -130,6 +130,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
             if (!allocateTaskToBe(routineLoadTaskInfo)) {
                 // allocate failed, push it back to the queue to wait next 
scheduling
                 needScheduleTasksQueue.put(routineLoadTaskInfo);
+                return;
             }
         } catch (UserException e) {
             routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).
@@ -152,6 +153,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
                 // set BE id to -1 to release the BE slot
                 routineLoadTaskInfo.setBeId(-1);
                 needScheduleTasksQueue.put(routineLoadTaskInfo);
+                return;
             }
         } catch (Exception e) {
             // exception happens, PAUSE the job
@@ -196,6 +198,10 @@ public class RoutineLoadTaskScheduler extends MasterDaemon 
{
             submitTask(routineLoadTaskInfo.getBeId(), tRoutineLoadTask);
             LOG.debug("send routine load task cost(ms): {}, job id: {}", 
                     (System.currentTimeMillis() - startTime), 
routineLoadTaskInfo.getJobId());
+            if (tRoutineLoadTask.isSetKafkaLoadInfo()) {
+                LOG.debug("send kafka routine load task {} with partition 
offset: {}, job: {}",
+                        tRoutineLoadTask.label, 
tRoutineLoadTask.kafka_load_info.partition_begin_offset, 
tRoutineLoadTask.getJobId());
+            }
         } catch (LoadException e) {
             // submit task failed (such as TOO_MANY_TASKS error), but txn has 
already begun.
             // Here we will still set the ExecuteStartTime of this task, which 
means
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index ed404d5..85ed3a5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -54,6 +54,10 @@ import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.thrift.TUniqueId;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -61,10 +65,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -291,7 +291,8 @@ public class DatabaseTransactionMgr {
             checkRunningTxnExceedLimit(sourceType);
 
             long tid = idGenerator.getNextTransactionId();
-            LOG.info("begin transaction: txn id {} with label {} from 
coordinator {}", tid, label, coordinator);
+            LOG.info("begin transaction: txn id {} with label {} from 
coordinator {}, listner id: {}",
+                    tid, label, coordinator, listenerId);
             TransactionState transactionState = new TransactionState(dbId, 
tableIdList, tid, label, requestId, sourceType,
                     coordinator, listenerId, timeoutSecond * 1000);
             transactionState.setPrepareTime(System.currentTimeMillis());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 6d161f5..0559195 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -100,7 +100,8 @@ public class TransactionState implements Writable {
         DB_DROPPED,
         TIMEOUT,
         OFFSET_OUT_OF_RANGE,
-        PAUSE;
+        PAUSE,
+        NO_PARTITIONS;
 
         public static TxnStatusChangeReason fromString(String reasonString) {
             for (TxnStatusChangeReason txnStatusChangeReason : 
TxnStatusChangeReason.values()) {
@@ -116,6 +117,8 @@ public class TransactionState implements Writable {
             switch (this) {
                 case OFFSET_OUT_OF_RANGE:
                     return "Offset out of range";
+                case NO_PARTITIONS:
+                    return "all partitions have no load data";
                 default:
                     return this.name();
             }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 3ea320b..9a29542 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.load.routineload;
 
-
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.catalog.Catalog;
@@ -28,16 +27,18 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.common.util.KafkaUtil;
 import org.apache.doris.persist.EditLog;
+import org.apache.doris.thrift.TKafkaRLTaskProgress;
 import org.apache.doris.transaction.TransactionException;
 import org.apache.doris.transaction.TransactionState;
 
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
 import org.apache.kafka.common.PartitionInfo;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import java.util.List;
 import java.util.Map;
 
@@ -96,12 +97,19 @@ public class RoutineLoadJobTest {
 
     @Test
     public void testAfterAborted(@Injectable TransactionState transactionState,
-                                 @Injectable KafkaTaskInfo routineLoadTaskInfo,
-                                 @Injectable KafkaProgress progress) throws 
UserException {
+                                 @Injectable KafkaTaskInfo 
routineLoadTaskInfo) throws UserException {
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = 
Lists.newArrayList();
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
         long txnId = 1L;
 
+        RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment();
+        TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress();
+        tKafkaRLTaskProgress.partitionCmtOffset = Maps.newHashMap();
+        KafkaProgress kafkaProgress = new KafkaProgress(tKafkaRLTaskProgress);
+        Deencapsulation.setField(attachment, "progress", kafkaProgress);
+
+        KafkaProgress currentProgress = new 
KafkaProgress(tKafkaRLTaskProgress);
+
         new Expectations() {
             {
                 transactionState.getTransactionId();
@@ -112,7 +120,7 @@ public class RoutineLoadJobTest {
                 result = txnId;
                 transactionState.getTxnCommitAttachment();
                 minTimes = 0;
-                result = new RLTaskTxnCommitAttachment();
+                result = attachment;
                 routineLoadTaskInfo.getPartitions();
                 minTimes = 0;
                 result = Lists.newArrayList();
@@ -129,7 +137,7 @@ public class RoutineLoadJobTest {
         RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
         Deencapsulation.setField(routineLoadJob, "state", 
RoutineLoadJob.JobState.RUNNING);
         Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", 
routineLoadTaskInfoList);
-        Deencapsulation.setField(routineLoadJob, "progress", progress);
+        Deencapsulation.setField(routineLoadJob, "progress", currentProgress);
         routineLoadJob.afterAborted(transactionState, true, 
txnStatusChangeReasonString);
 
         Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, 
routineLoadJob.getState());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 65864e5..ac736dc 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -17,10 +17,6 @@
 
 package org.apache.doris.transaction;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.CatalogTestUtil;
 import org.apache.doris.catalog.FakeCatalog;
@@ -49,31 +45,31 @@ import org.apache.doris.thrift.TLoadSourceType;
 import org.apache.doris.thrift.TRLTaskTxnCommitAttachment;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
-import org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import org.apache.doris.transaction.TransactionState.TxnSourceType;
 
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import mockit.Injectable;
-import mockit.Mocked;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import mockit.Injectable;
+import mockit.Mocked;
+
 public class GlobalTransactionMgrTest {
 
     private static FakeEditLog fakeEditLog;
@@ -345,7 +341,7 @@ public class GlobalTransactionMgrTest {
         rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA);
         TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress();
         Map<Integer, Long> kafkaProgress = Maps.newHashMap();
-        kafkaProgress.put(1, 10L);
+        kafkaProgress.put(1, 100L); // start from 0, so rows number is 101, 
and consumed offset is 100
         tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress);
         rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress);
         TxnCommitAttachment txnCommitAttachment = new 
RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment);
@@ -358,7 +354,7 @@ public class GlobalTransactionMgrTest {
 
         Assert.assertEquals(Long.valueOf(101), 
Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
         Assert.assertEquals(Long.valueOf(1), 
Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
-        Assert.assertEquals(Long.valueOf(11L), ((KafkaProgress) 
routineLoadJob.getProgress()).getOffsetByPartition(1));
+        Assert.assertEquals(Long.valueOf(101L), ((KafkaProgress) 
routineLoadJob.getProgress()).getOffsetByPartition(1));
         // todo(ml): change to assert queue
         // Assert.assertEquals(1, 
routineLoadManager.getNeedScheduleTasksQueue().size());
         // Assert.assertNotEquals("label", 
routineLoadManager.getNeedScheduleTasksQueue().peek().getId());
@@ -411,7 +407,7 @@ public class GlobalTransactionMgrTest {
         rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA);
         TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress();
         Map<Integer, Long> kafkaProgress = Maps.newHashMap();
-        kafkaProgress.put(1, 10L);
+        kafkaProgress.put(1, 110L); // start from 0, so rows number is 111, 
consumed offset is 110
         tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress);
         rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress);
         TxnCommitAttachment txnCommitAttachment = new 
RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment);
@@ -422,9 +418,10 @@ public class GlobalTransactionMgrTest {
         
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1),
 "idToRunningTransactionState", idToTransactionState);
         masterTransMgr.commitTransaction(1L, 1L, transTablets, 
txnCommitAttachment);
 
+        // current total rows and error rows will be reset after job pause, so 
here they should be 0.
         Assert.assertEquals(Long.valueOf(0), 
Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
         Assert.assertEquals(Long.valueOf(0), 
Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
-        Assert.assertEquals(Long.valueOf(11L),
+        Assert.assertEquals(Long.valueOf(111L),
                 ((KafkaProgress) 
routineLoadJob.getProgress()).getOffsetByPartition(1));
         // todo(ml): change to assert queue
         // Assert.assertEquals(0, 
routineLoadManager.getNeedScheduleTasksQueue().size());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to