This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new c57145b [Bug] Fix bug that routine load may lost some data (#5093) c57145b is described below commit c57145b4c2633c0152fc02995c8c1912f59d25a6 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Wed Dec 23 09:33:52 2020 +0800 [Bug] Fix bug that routine load may lost some data (#5093) In the previous implementation, whether a subtask is in commit or abort state, we will try to update the job progress, such as the consumed offset of kafka. Under normal circumstances, the aborted transaction does not consume any data, and all progress is 0, so even we update the progress, the progress will remain unchanged. However, in the case of high cluster load, the subtask may fail half of the execution on the BE side. At this time, although the task is aborted, part of the progress is updated. Cause the next subtask to skip these data for consumption, resulting in data loss. --- be/src/common/config.h | 14 +++--- be/src/exec/olap_scanner.cpp | 2 +- be/src/olap/rowset/alpha_rowset.cpp | 3 +- be/src/runtime/stream_load/stream_load_context.h | 3 ++ .../org/apache/doris/catalog/TabletStatMgr.java | 8 +-- .../main/java/org/apache/doris/common/Config.java | 3 +- .../doris/load/routineload/KafkaProgress.java | 17 +++++-- .../load/routineload/KafkaRoutineLoadJob.java | 58 +++++++++++----------- .../routineload/RLTaskTxnCommitAttachment.java | 4 ++ .../doris/load/routineload/RoutineLoadJob.java | 21 +++++--- .../load/routineload/RoutineLoadProgress.java | 2 +- .../load/routineload/RoutineLoadTaskScheduler.java | 12 +++-- .../doris/transaction/DatabaseTransactionMgr.java | 11 ++-- .../apache/doris/transaction/TransactionState.java | 5 +- .../doris/load/routineload/RoutineLoadJobTest.java | 24 ++++++--- .../transaction/GlobalTransactionMgrTest.java | 29 +++++------ 16 files changed, 126 insertions(+), 90 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 93f6bfe..6bef85a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -36,11 +36,11 @@ CONF_Int32(brpc_port, "8060"); // the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores CONF_Int32(brpc_num_threads, "-1") - // Declare a selection strategy for those servers have many ips. - // Note that there should at most one ip match this list. - // this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24 - // If no ip match this rule, will choose one randomly. - CONF_String(priority_networks, ""); +// Declare a selection strategy for those servers have many ips. +// Note that there should at most one ip match this list. +// this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24 +// If no ip match this rule, will choose one randomly. +CONF_String(priority_networks, ""); //// //// tcmalloc gc parameter @@ -259,7 +259,7 @@ CONF_Int64(index_stream_cache_capacity, "10737418240"); // CONF_Int64(max_packed_row_block_size, "20971520"); // Cache for storage page size -CONF_String(storage_page_cache_limit, "20G"); +CONF_String(storage_page_cache_limit, "20%"); // whether to disable page cache feature in storage CONF_Bool(disable_storage_page_cache, "false"); @@ -372,7 +372,7 @@ CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60"); // or encounter 'tablet writer write failed' error when loading. // CONF_Int32(tablet_writer_rpc_timeout_sec, "600"); // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. -CONF_mInt32(olap_table_sink_send_interval_ms, "10"); +CONF_mInt32(olap_table_sink_send_interval_ms, "1"); // Fragment thread pool CONF_Int32(fragment_pool_thread_num_min, "64"); diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 5a959e5..05d5bf7 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -257,7 +257,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; { SCOPED_TIMER(_parent->_scan_timer); - SCOPED_TIMER(_parent->_scan_cpu_timer); + SCOPED_CPU_TIMER(_parent->_scan_cpu_timer); while (true) { // Batch is full, break if (batch->is_full()) { diff --git a/be/src/olap/rowset/alpha_rowset.cpp b/be/src/olap/rowset/alpha_rowset.cpp index f1e7af3..9ff8178 100644 --- a/be/src/olap/rowset/alpha_rowset.cpp +++ b/be/src/olap/rowset/alpha_rowset.cpp @@ -330,8 +330,7 @@ OLAPStatus AlphaRowset::init() { // table value column, so when first start the two number is not the same, // it causes start failed. When `expect_zone_maps_num > zone_maps_size` it may be the first start after upgrade if (expect_zone_maps_num > zone_maps_size) { - LOG(WARNING) - << "tablet: " << _rowset_meta->tablet_id() << " expect zone map size is " + VLOG(1) << "tablet: " << _rowset_meta->tablet_id() << " expect zone map size is " << expect_zone_maps_num << ", actual num is " << zone_maps_size << ". If this is not the first start after upgrade, please pay attention!"; } diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 1537a17..c0d61ab 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -45,6 +45,9 @@ public: topic(t_info.topic), begin_offset(t_info.partition_begin_offset), properties(t_info.properties) { + // The offset(begin_offset) sent from FE is the starting offset, + // and the offset(cmt_offset) reported by BE to FE is the consumed offset, + // so we need to minus 1 here. for (auto& p : t_info.partition_begin_offset) { cmt_offset[p.first] = p.second - 1; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index 25501bd..a63784b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -28,11 +28,11 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TTabletStat; import org.apache.doris.thrift.TTabletStatResult; -import com.google.common.collect.ImmutableMap; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.collect.ImmutableMap; + import java.util.List; import java.util.Map; @@ -61,7 +61,7 @@ public class TabletStatMgr extends MasterDaemon { client = ClientPool.backendPool.borrowObject(address); TTabletStatResult result = client.getTabletStat(); - LOG.info("get tablet stat from backend: {}, num: {}", backend.getId(), result.getTabletsStatsSize()); + LOG.debug("get tablet stat from backend: {}, num: {}", backend.getId(), result.getTabletsStatsSize()); updateTabletStat(backend.getId(), result); ok = true; @@ -112,7 +112,7 @@ public class TabletStatMgr extends MasterDaemon { index.setRowCount(indexRowCount); } // end for indices } // end for partitions - LOG.info("finished to set row num for table: {} in database: {}", + LOG.debug("finished to set row num for table: {} in database: {}", table.getName(), db.getFullName()); } } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 83e5237..c3ba37a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -68,7 +68,8 @@ public class Config extends ConfigBase { public static String sys_log_dir = PaloFe.DORIS_HOME_DIR + "/log"; @ConfField public static String sys_log_level = "INFO"; @ConfField public static int sys_log_roll_num = 10; - @ConfField public static String[] sys_log_verbose_modules = {"org.apache.thrift", "org.apache.doris.thrift", "org.apache.doris.http", "org.apache.doris.service.FrontendServiceImpl"}; + @ConfField + public static String[] sys_log_verbose_modules = {}; @ConfField public static String sys_log_roll_interval = "DAY"; @ConfField public static String sys_log_delete_age = "7d"; @Deprecated diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 34a5a4e..6d704d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -19,8 +19,12 @@ package org.apache.doris.load.routineload; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.thrift.TKafkaRLTaskProgress; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.base.Joiner; import com.google.common.collect.Maps; import com.google.gson.Gson; @@ -38,6 +42,8 @@ import java.util.Map; */ // {"partitionIdToOffset": {}} public class KafkaProgress extends RoutineLoadProgress { + private static final Logger LOG = LogManager.getLogger(KafkaProgress.class); + public static final String OFFSET_BEGINNING = "OFFSET_BEGINNING"; // -2 public static final String OFFSET_END = "OFFSET_END"; // -1 // OFFSET_ZERO is just for show info, if user specified offset is 0 @@ -47,7 +53,7 @@ public class KafkaProgress extends RoutineLoadProgress { public static final long OFFSET_END_VAL = -1; // (partition id, begin offset) - // the offset the next msg to be consumed + // the offset saved here is the next offset need to be consumed private Map<Integer, Long> partitionIdToOffset = Maps.newConcurrentMap(); public KafkaProgress() { @@ -101,12 +107,13 @@ public class KafkaProgress extends RoutineLoadProgress { } else if (entry.getValue() == -2) { showPartitionIdToOffset.put(entry.getKey(), OFFSET_BEGINNING); } else { + // The offset saved in partitionIdToOffset is the next offset to be consumed. + // So here we minus 1 to return the "already consumed" offset. showPartitionIdToOffset.put(entry.getKey(), "" + (entry.getValue() - 1)); } } } - // modify the partition offset of this progress. // throw exception is the specified partition does not exist in progress. public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException { @@ -138,11 +145,13 @@ public class KafkaProgress extends RoutineLoadProgress { } @Override - public void update(RoutineLoadProgress progress) { - KafkaProgress newProgress = (KafkaProgress) progress; + public void update(RLTaskTxnCommitAttachment attachment) { + KafkaProgress newProgress = (KafkaProgress) attachment.getProgress(); // + 1 to point to the next msg offset to be consumed newProgress.partitionIdToOffset.entrySet().stream() .forEach(entity -> this.partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); + LOG.debug("update kafka progress: {}, task: {}, job: {}", + newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index a9e6b78..a72944a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -42,17 +42,19 @@ import org.apache.doris.common.util.SmallFileMgr; import org.apache.doris.common.util.SmallFileMgr.SmallFile; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -145,7 +147,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } } - @Override public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException { List<RoutineLoadTaskInfo> result = new ArrayList<>(); @@ -198,46 +199,45 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { return currentTaskConcurrentNum; } - // case1: BE execute the task successfully and commit it to FE, but failed on FE(such as db renamed, not found), - // after commit failed, BE try to rollback this txn, and loaded rows in its attachment is larger than 0. - // In this case, FE should not update the progress. - // - // case2: partitionIdToOffset must be not empty when loaded rows > 0 - // be commit txn but fe throw error when committing txn, - // fe rollback txn without partitionIdToOffset by itself - // this task should not be commit - // otherwise currentErrorNum and currentTotalNum is updated when progress is not updated + // Through the transaction status and attachment information, to determine whether the progress needs to be updated. @Override protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, - TransactionStatus txnStatus) { - if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && txnStatus == TransactionStatus.ABORTED) { - // case 1 - return false; + TransactionState txnState, + TransactionState.TxnStatusChangeReason txnStatusChangeReason) { + if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED) { + // For committed txn, update the progress. + return true; } - if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 - && (!((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).hasPartition())) { - // case 2 - LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId())) - .add("job_id", id) - .add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows()) - .add("progress_partition_offset_size", 0) - .add("msg", "commit attachment info is incorrect")); - return false; + if (txnStatusChangeReason != null && txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) { + // Because the max_filter_ratio of routine load task is always 1. + // Therefore, under normal circumstances, routine load task will not return the error "too many filtered rows". + // If no data is imported, the error "all partitions have no load data" may only be returned. + // In this case, the status of the transaction is ABORTED, + // but we still need to update the offset to skip these error lines. + Preconditions.checkState(txnState.getTransactionStatus() == TransactionStatus.ABORTED, txnState.getTransactionStatus()); + return true; } - return true; + + // Running here, the status of the transaction should be ABORTED, + // and it is caused by other errors. In this case, we should not update the offset. + LOG.debug("no need to update the progress of kafka routine load. txn status: {}, " + + "txnStatusChangeReason: {}, task: {}, job: {}", + txnState.getTransactionStatus(), txnStatusChangeReason, + DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()), id); + return false; } @Override protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException { super.updateProgress(attachment); - this.progress.update(attachment.getProgress()); + this.progress.update(attachment); } @Override protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { super.replayUpdateProgress(attachment); - this.progress.update(attachment.getProgress()); + this.progress.update(attachment); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java index 70d67e7..73417ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java @@ -67,6 +67,10 @@ public class RLTaskTxnCommitAttachment extends TxnCommitAttachment { } } + public long getJobId() { + return jobId; + } + public TUniqueId getTaskId() { return taskId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index e82d234..f1d858b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -69,6 +69,10 @@ import org.apache.doris.transaction.AbstractTxnStateChangeCallback; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -77,8 +81,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; @@ -878,8 +880,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); taskBeId = routineLoadTaskInfo.getBeId(); - executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED); + executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null); ++committedTaskNum; + LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id); } } catch (Throwable e) { LOG.warn("after committed failed", e); @@ -989,8 +992,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl .build()); } ++abortedTaskNum; + TransactionState.TxnStatusChangeReason txnStatusChangeReason = null; if (txnStatusChangeReasonString != null) { - TransactionState.TxnStatusChangeReason txnStatusChangeReason = + txnStatusChangeReason = TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString); if (txnStatusChangeReason != null) { switch (txnStatusChangeReason) { @@ -1009,7 +1013,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // TODO(ml): use previous be id depend on change reason } // step2: commit task , update progress, maybe create a new task - executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED); + executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED, txnStatusChangeReason); } } catch (Exception e) { String msg = "be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage(); @@ -1037,7 +1041,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // check task exists or not before call method private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState, - TransactionStatus txnStatus) throws UserException { + TransactionStatus txnStatus, TransactionState.TxnStatusChangeReason txnStatusChangeReason) throws UserException { // step0: get progress from transaction state RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); if (rlTaskTxnCommitAttachment == null) { @@ -1049,7 +1053,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl + " maybe task was aborted by master when timeout") .build()); } - } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState.getTransactionStatus())) { + } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) { // step2: update job progress updateProgress(rlTaskTxnCommitAttachment); } @@ -1256,7 +1260,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // check the correctness of commit info protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, - TransactionStatus txnStatus); + TransactionState txnState, + TransactionState.TxnStatusChangeReason txnStatusChangeReason); protected abstract String getStatistic(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java index c8948d4..bf746a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java @@ -37,7 +37,7 @@ public abstract class RoutineLoadProgress implements Writable { this.loadDataSourceType = loadDataSourceType; } - abstract void update(RoutineLoadProgress progress); + abstract void update(RLTaskTxnCommitAttachment attachment); abstract String toJsonString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index d66476f..e353c79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -36,13 +36,13 @@ import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Queues; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @@ -130,6 +130,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { if (!allocateTaskToBe(routineLoadTaskInfo)) { // allocate failed, push it back to the queue to wait next scheduling needScheduleTasksQueue.put(routineLoadTaskInfo); + return; } } catch (UserException e) { routineLoadManager.getJob(routineLoadTaskInfo.getJobId()). @@ -152,6 +153,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { // set BE id to -1 to release the BE slot routineLoadTaskInfo.setBeId(-1); needScheduleTasksQueue.put(routineLoadTaskInfo); + return; } } catch (Exception e) { // exception happens, PAUSE the job @@ -196,6 +198,10 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { submitTask(routineLoadTaskInfo.getBeId(), tRoutineLoadTask); LOG.debug("send routine load task cost(ms): {}, job id: {}", (System.currentTimeMillis() - startTime), routineLoadTaskInfo.getJobId()); + if (tRoutineLoadTask.isSetKafkaLoadInfo()) { + LOG.debug("send kafka routine load task {} with partition offset: {}, job: {}", + tRoutineLoadTask.label, tRoutineLoadTask.kafka_load_info.partition_begin_offset, tRoutineLoadTask.getJobId()); + } } catch (LoadException e) { // submit task failed (such as TOO_MANY_TASKS error), but txn has already begun. // Here we will still set the ExecuteStartTime of this task, which means diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index ed404d5..85ed3a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -54,6 +54,10 @@ import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUniqueId; +import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -61,10 +65,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.commons.collections.CollectionUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.io.DataOutput; import java.io.IOException; import java.util.ArrayDeque; @@ -291,7 +291,8 @@ public class DatabaseTransactionMgr { checkRunningTxnExceedLimit(sourceType); long tid = idGenerator.getNextTransactionId(); - LOG.info("begin transaction: txn id {} with label {} from coordinator {}", tid, label, coordinator); + LOG.info("begin transaction: txn id {} with label {} from coordinator {}, listner id: {}", + tid, label, coordinator, listenerId); TransactionState transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType, coordinator, listenerId, timeoutSecond * 1000); transactionState.setPrepareTime(System.currentTimeMillis()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 6d161f5..0559195 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -100,7 +100,8 @@ public class TransactionState implements Writable { DB_DROPPED, TIMEOUT, OFFSET_OUT_OF_RANGE, - PAUSE; + PAUSE, + NO_PARTITIONS; public static TxnStatusChangeReason fromString(String reasonString) { for (TxnStatusChangeReason txnStatusChangeReason : TxnStatusChangeReason.values()) { @@ -116,6 +117,8 @@ public class TransactionState implements Writable { switch (this) { case OFFSET_OUT_OF_RANGE: return "Offset out of range"; + case NO_PARTITIONS: + return "all partitions have no load data"; default: return this.name(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index 3ea320b..9a29542 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -17,7 +17,6 @@ package org.apache.doris.load.routineload; - import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.catalog.Catalog; @@ -28,16 +27,18 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.common.util.KafkaUtil; import org.apache.doris.persist.EditLog; +import org.apache.doris.thrift.TKafkaRLTaskProgress; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - import org.apache.kafka.common.PartitionInfo; import org.junit.Assert; import org.junit.Test; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import java.util.List; import java.util.Map; @@ -96,12 +97,19 @@ public class RoutineLoadJobTest { @Test public void testAfterAborted(@Injectable TransactionState transactionState, - @Injectable KafkaTaskInfo routineLoadTaskInfo, - @Injectable KafkaProgress progress) throws UserException { + @Injectable KafkaTaskInfo routineLoadTaskInfo) throws UserException { List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Lists.newArrayList(); routineLoadTaskInfoList.add(routineLoadTaskInfo); long txnId = 1L; + RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment(); + TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress(); + tKafkaRLTaskProgress.partitionCmtOffset = Maps.newHashMap(); + KafkaProgress kafkaProgress = new KafkaProgress(tKafkaRLTaskProgress); + Deencapsulation.setField(attachment, "progress", kafkaProgress); + + KafkaProgress currentProgress = new KafkaProgress(tKafkaRLTaskProgress); + new Expectations() { { transactionState.getTransactionId(); @@ -112,7 +120,7 @@ public class RoutineLoadJobTest { result = txnId; transactionState.getTxnCommitAttachment(); minTimes = 0; - result = new RLTaskTxnCommitAttachment(); + result = attachment; routineLoadTaskInfo.getPartitions(); minTimes = 0; result = Lists.newArrayList(); @@ -129,7 +137,7 @@ public class RoutineLoadJobTest { RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList); - Deencapsulation.setField(routineLoadJob, "progress", progress); + Deencapsulation.setField(routineLoadJob, "progress", currentProgress); routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString); Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 65864e5..ac736dc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -17,10 +17,6 @@ package org.apache.doris.transaction; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; import org.apache.doris.catalog.FakeCatalog; @@ -49,31 +45,31 @@ import org.apache.doris.thrift.TLoadSourceType; import org.apache.doris.thrift.TRLTaskTxnCommitAttachment; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; -import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import mockit.Injectable; -import mockit.Mocked; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import mockit.Injectable; +import mockit.Mocked; + public class GlobalTransactionMgrTest { private static FakeEditLog fakeEditLog; @@ -345,7 +341,7 @@ public class GlobalTransactionMgrTest { rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA); TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress(); Map<Integer, Long> kafkaProgress = Maps.newHashMap(); - kafkaProgress.put(1, 10L); + kafkaProgress.put(1, 100L); // start from 0, so rows number is 101, and consumed offset is 100 tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress); rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress); TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment); @@ -358,7 +354,7 @@ public class GlobalTransactionMgrTest { Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(routineLoadJob, "currentTotalRows")); Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "currentErrorRows")); - Assert.assertEquals(Long.valueOf(11L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); + Assert.assertEquals(Long.valueOf(101L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); // todo(ml): change to assert queue // Assert.assertEquals(1, routineLoadManager.getNeedScheduleTasksQueue().size()); // Assert.assertNotEquals("label", routineLoadManager.getNeedScheduleTasksQueue().peek().getId()); @@ -411,7 +407,7 @@ public class GlobalTransactionMgrTest { rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA); TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress(); Map<Integer, Long> kafkaProgress = Maps.newHashMap(); - kafkaProgress.put(1, 10L); + kafkaProgress.put(1, 110L); // start from 0, so rows number is 111, consumed offset is 110 tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress); rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress); TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment); @@ -422,9 +418,10 @@ public class GlobalTransactionMgrTest { Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState); masterTransMgr.commitTransaction(1L, 1L, transTablets, txnCommitAttachment); + // current total rows and error rows will be reset after job pause, so here they should be 0. Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows")); Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows")); - Assert.assertEquals(Long.valueOf(11L), + Assert.assertEquals(Long.valueOf(111L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); // todo(ml): change to assert queue // Assert.assertEquals(0, routineLoadManager.getNeedScheduleTasksQueue().size()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org