This is an automated email from the ASF dual-hosted git repository.

lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2334f5d  Fix some problem related with publish version task (#4089)
2334f5d is described below

commit 2334f5d9978ce23df667ab4303f968427af54d82
Author: caiconghui <55968745+caicong...@users.noreply.github.com>
AuthorDate: Thu Jul 23 07:06:02 2020 -0500

    Fix some problem related with publish version task (#4089)
    
    This PR is mainly do following three things:
    1. Add thread name in fe log to make trace problem more easy.
    2. Add agent_task_resend_wait_time_ms config to escape sending duplicate 
agent task to be.
    3. Skip to continue to update replica version when new version is lower 
than replica version in fe.
---
 docs/en/administrator-guide/config/fe_config.md    | 10 +++++++
 docs/zh-CN/administrator-guide/config/fe_config.md |  8 ++++++
 .../java/org/apache/doris/catalog/Replica.java     | 31 +++++++++++++++-------
 .../main/java/org/apache/doris/common/Config.java  |  8 ++++++
 .../java/org/apache/doris/common/Log4jConfig.java  |  4 +--
 .../org/apache/doris/master/ReportHandler.java     | 23 +++++++++++-----
 .../main/java/org/apache/doris/task/AgentTask.java | 16 +++++++++--
 .../org/apache/doris/task/PublishVersionTask.java  |  4 +--
 .../doris/transaction/DatabaseTransactionMgr.java  |  5 ++--
 .../doris/transaction/PublishVersionDaemon.java    |  4 ++-
 10 files changed, 87 insertions(+), 26 deletions(-)

diff --git a/docs/en/administrator-guide/config/fe_config.md 
b/docs/en/administrator-guide/config/fe_config.md
index 4f81d17..7612067 100644
--- a/docs/en/administrator-guide/config/fe_config.md
+++ b/docs/en/administrator-guide/config/fe_config.md
@@ -110,6 +110,16 @@ There are two ways to configure FE configuration items:
 
 ## Configurations
 
+### `agent_task_resend_wait_time_ms`
+
+This configuration will decide whether to resend agent task when create_time 
for agent_task is set, only when current_time - create_time > 
agent_task_resend_wait_time_ms can ReportHandler do resend agent task.     
+
+This configuration is currently mainly used to solve the problem of repeated 
sending of `PUBLISH_VERSION` agent tasks. The current default value of this 
configuration is 5000, which is an experimental value.
+ 
+Because there is a certain time delay between submitting agent tasks to 
AgentTaskQueue and submitting to be, Increasing the value of this configuration 
can effectively solve the problem of repeated sending of agent tasks,
+
+But at the same time, it will cause the submission of failed or failed 
execution of the agent task to be executed again for an extended period of time.
+
 ### `alter_table_timeout_second`
 
 ### `async_load_task_pool_size`
diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md 
b/docs/zh-CN/administrator-guide/config/fe_config.md
index dcc70c2..ec41490 100644
--- a/docs/zh-CN/administrator-guide/config/fe_config.md
+++ b/docs/zh-CN/administrator-guide/config/fe_config.md
@@ -110,6 +110,14 @@ FE 的配置项有两种方式进行配置:
 
 ## 配置项列表
 
+### `agent_task_resend_wait_time_ms`
+
+当代理任务的创建时间被设置的时候,此配置将决定是否重新发送代理任务, 当且仅当当前时间减去创建时间大于 
`agent_task_task_resend_wait_time_ms` 时,ReportHandler可以重新发送代理任务。 
+  
+该配置目前主要用来解决`PUBLISH_VERSION`代理任务的重复发送问题, 
目前该配置的默认值是5000,是个实验值,由于把代理任务提交到代理任务队列和提交到be存在一定的时间延迟,所以调大该配置的值可以有效解决代理任务的重复发送问题,
+
+但同时会导致提交失败或者执行失败的代理任务再次被执行的时间延长。  
+    
 ### `alter_table_timeout_second`
 
 ### `async_load_task_pool_size`
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index c9cab33..0e813f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -73,16 +73,16 @@ public class Replica implements Writable {
     private long backendId;
     // the version could be queried
     @SerializedName(value = "version")
-    private long version;
+    private volatile long version;
     @SerializedName(value = "versionHash")
     private long versionHash;
     private int schemaHash = -1;
     @SerializedName(value = "dataSize")
-    private long dataSize = 0;
+    private volatile long dataSize = 0;
     @SerializedName(value = "rowCount")
-    private long rowCount = 0;
+    private volatile long rowCount = 0;
     @SerializedName(value = "state")
-    private ReplicaState state;
+    private volatile ReplicaState state;
 
     // the last load failed version
     @SerializedName(value = "lastFailedVersion")
@@ -318,12 +318,23 @@ public class Replica implements Writable {
             long lastFailedVersion, long lastFailedVersionHash, 
             long lastSuccessVersion, long lastSuccessVersionHash, 
             long newDataSize, long newRowCount) {
-        LOG.debug("before update: {}", this.toString());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("before update: {}", this.toString());
+        }
 
         if (newVersion < this.version) {
-            // yiguolei: could not find any reason why new version less than 
this.version should run???
-            LOG.warn("replica {} on backend {}'s new version {} is lower than 
meta version {}",
-                    id, backendId, newVersion, this.version);
+            // This case means that replica meta version has been updated by 
ReportHandler before
+            // For example, the publish version daemon has already sent some 
publish verison tasks to one be to publish version 2, 3, 4, 5, 6,
+            // and the be finish all publish version tasks, the be's replica 
version is 6 now, but publish version daemon need to wait
+            // for other be to finish most of publish version tasks to update 
replica version in fe.
+            // At the moment, the replica version in fe is 4, when 
ReportHandler sync tablet, it find reported replica version in be is 6 and then
+            // set version to 6 for replica in fe. And then publish version 
daemon try to finish txn, and use visible version(5)
+            // to update replica. Finally, it find the newer version(5) is 
lower than replica version(6) in fe.
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("replica {} on backend {}'s new version {} is lower 
than meta version {},"
+                        + "not to continue to update replica", id, backendId, 
newVersion, this.version);
+            }
+            return;
         }
 
         this.version = newVersion;
@@ -383,7 +394,9 @@ public class Replica implements Writable {
             }
         }
 
-        LOG.debug("after update {}", this.toString());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("after update {}", this.toString());
+        }
     }
     
     public synchronized void updateLastFailedVersion(long lastFailedVersion, 
long lastFailedVersionHash) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 44dfe12..714c4ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1117,5 +1117,13 @@ public class Config extends ConfigBase {
      */
     @ConfField
     public static String thrift_server_type = ThriftServer.THREAD_POOL;
+
+    /**
+     * This config will decide whether to resend agent task when create_time 
for agent_task is set,
+     * only when current_time - create_time > agent_task_resend_wait_time_ms 
can ReportHandler do resend agent task
+     */
+    @ConfField (mutable = true, masterOnly = true)
+    public static long agent_task_resend_wait_time_ms = 5000;
+
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Log4jConfig.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Log4jConfig.java
index 5912cd0..c3109bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Log4jConfig.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Log4jConfig.java
@@ -42,7 +42,7 @@ public class Log4jConfig extends XmlConfiguration {
             "  <Appenders>\n" + 
             "    <RollingFile name=\"Sys\" fileName=\"${sys_log_dir}/fe.log\" 
filePattern=\"${sys_log_dir}/fe.log.${sys_file_pattern}-%i\">\n" + 
             "      <PatternLayout charset=\"UTF-8\">\n" + 
-            "        <Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS} %p %tid 
[%C{1}.%M():%L] %m%n</Pattern>\n" + 
+            "        <Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS} %p (%t|%tid) 
[%C{1}.%M():%L] %m%n</Pattern>\n" +
             "      </PatternLayout>\n" + 
             "      <Policies>\n" + 
             "        <TimeBasedTriggeringPolicy/>\n" + 
@@ -57,7 +57,7 @@ public class Log4jConfig extends XmlConfiguration {
             "    </RollingFile>\n" + 
             "    <RollingFile name=\"SysWF\" 
fileName=\"${sys_log_dir}/fe.warn.log\" 
filePattern=\"${sys_log_dir}/fe.warn.log.${sys_file_pattern}-%i\">\n" + 
             "      <PatternLayout charset=\"UTF-8\">\n" + 
-            "        <Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS} %p %tid 
[%C{1}.%M():%L] %m%n</Pattern>\n" + 
+            "        <Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS} %p (%t|%tid) 
[%C{1}.%M():%L] %m%n</Pattern>\n" +
             "      </PatternLayout>\n" + 
             "      <Policies>\n" + 
             "        <TimeBasedTriggeringPolicy/>\n" + 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 8d40812..3b826ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -328,17 +328,20 @@ public class ReportHandler extends Daemon {
         LOG.info("begin to handle task report from backend {}", backendId);
         long start = System.currentTimeMillis();
 
-        for (TTaskType type : runningTasks.keySet()) {
-            Set<Long> taskSet = runningTasks.get(type);
-            if (!taskSet.isEmpty()) {
-                String signatures = StringUtils.join(taskSet, ", ");
-                LOG.debug("backend task[{}]: {}", type.name(), signatures);
+        if (LOG.isDebugEnabled()) {
+            for (TTaskType type : runningTasks.keySet()) {
+                Set<Long> taskSet = runningTasks.get(type);
+                if (!taskSet.isEmpty()) {
+                    String signatures = StringUtils.join(taskSet, ", ");
+                    LOG.debug("backend task[{}]: {}", type.name(), signatures);
+                }
             }
         }
 
         List<AgentTask> diffTasks = AgentTaskQueue.getDiffTasks(backendId, 
runningTasks);
 
         AgentBatchTask batchTask = new AgentBatchTask();
+        long taskReportTime = System.currentTimeMillis();
         for (AgentTask task : diffTasks) {
             // these tasks no need to do diff
             // 1. CREATE
@@ -350,7 +353,12 @@ public class ReportHandler extends Daemon {
                     || task.getTaskType() == TTaskType.CHECK_CONSISTENCY) {
                 continue;
             }
-            batchTask.addTask(task);
+
+            // to escape sending duplicate agent task to be
+            if (task.shouldResend(taskReportTime)) {
+                batchTask.addTask(task);
+            }
+
         }
 
         LOG.debug("get {} diff task(s) to resend", batchTask.getTaskNum());
@@ -742,10 +750,11 @@ public class ReportHandler extends Daemon {
     private static void handleRepublishVersionInfo(Map<Long, 
ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish, 
             long backendId) {
         AgentBatchTask batchTask = new AgentBatchTask();
+        long createPublishVersionTaskTime = System.currentTimeMillis();
         for (Long dbId : transactionsToPublish.keySet()) {
             ListMultimap<Long, TPartitionVersionInfo> map = 
transactionsToPublish.get(dbId);
             for (long txnId : map.keySet()) {
-                PublishVersionTask task = new PublishVersionTask(backendId, 
txnId, dbId, map.get(txnId));
+                PublishVersionTask task = new PublishVersionTask(backendId, 
txnId, dbId, map.get(txnId), createPublishVersionTaskTime);
                 batchTask.addTask(task);
                 // add to AgentTaskQueue for handling finish report.
                 AgentTaskQueue.addTask(task);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
index 9ce7557..77d3807 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.task;
 
+import org.apache.doris.common.Config;
 import org.apache.doris.thrift.TResourceInfo;
 import org.apache.doris.thrift.TTaskType;
 
@@ -39,9 +40,10 @@ public abstract class AgentTask {
     // some of are not.
     // so whether the task is finished depends on caller's logic, not the 
value of this member.
     protected boolean isFinished = false;
+    protected long createTime;
 
     public AgentTask(TResourceInfo resourceInfo, long backendId, TTaskType 
taskType,
-                     long dbId, long tableId, long partitionId, long indexId, 
long tabletId, long signature) {
+                     long dbId, long tableId, long partitionId, long indexId, 
long tabletId, long signature, long createTime) {
         this.backendId = backendId;
         this.signature = signature;
         this.taskType = taskType;
@@ -55,11 +57,17 @@ public abstract class AgentTask {
         this.resourceInfo = resourceInfo;
 
         this.failedTimes = 0;
+        this.createTime = createTime;
     }
     
     public AgentTask(TResourceInfo resourceInfo, long backendId, TTaskType 
taskType,
             long dbId, long tableId, long partitionId, long indexId, long 
tabletId) {
-        this(resourceInfo, backendId, taskType, dbId, tableId, partitionId, 
indexId, tabletId, tabletId);
+        this(resourceInfo, backendId, taskType, dbId, tableId, partitionId, 
indexId, tabletId, tabletId, -1);
+    }
+
+    public AgentTask(TResourceInfo resourceInfo, long backendId, TTaskType 
taskType,
+                     long dbId, long tableId, long partitionId, long indexId, 
long tabletId, long signature) {
+        this(resourceInfo, backendId, taskType, dbId, tableId, partitionId, 
indexId, tabletId, signature, -1);
     }
 
     public long getSignature() {
@@ -122,6 +130,10 @@ public abstract class AgentTask {
         return isFinished;
     }
 
+    public boolean shouldResend(long currentTimeMillis) {
+        return createTime == -1 || currentTimeMillis - createTime > 
Config.agent_task_resend_wait_time_ms;
+    }
+
     @Override
     public String toString() {
         return "[" + taskType + "], signature: " + signature + ", backendId: " 
+ backendId + ", tablet id: " + tabletId;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
index 9dbb47b..7323bd8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
@@ -36,8 +36,8 @@ public class PublishVersionTask extends AgentTask {
     private boolean isFinished;
 
     public PublishVersionTask(long backendId, long transactionId, long dbId,
-            List<TPartitionVersionInfo> partitionVersionInfos) {
-        super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, 
-1L, transactionId);
+            List<TPartitionVersionInfo> partitionVersionInfos, long 
createTime) {
+        super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, 
-1L, transactionId, createTime);
         this.transactionId = transactionId;
         this.partitionVersionInfos = partitionVersionInfos;
         this.errorTablets = new ArrayList<Long>();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index aa15bc0..4aad965 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -721,9 +721,8 @@ public class DatabaseTransactionMgr {
                                                 + " and its version not equal 
to partition commit version or commit version - 1"
                                                 + " if its not a upgrate 
stage, its a fatal error. ", transactionState, replica);
                                     }
-                                } else if (replica.getVersion() == 
partitionCommitInfo.getVersion()
-                                        && replica.getVersionHash() == 
partitionCommitInfo.getVersionHash()) {
-                                    // the replica's version and version hash 
is equal to current transaction partition's version and version hash
+                                } else if (replica.getVersion() >= 
partitionCommitInfo.getVersion()) {
+                                    // the replica's version is larger than or 
equal to current transaction partition's version
                                     // the replica is normal, then remove it 
from error replica ids
                                     errorReplicaIds.remove(replica.getId());
                                     ++healthReplicaNum;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 2371fee..9e68bee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -76,6 +76,7 @@ public class PublishVersionDaemon extends MasterDaemon {
             LOG.warn("some transaction state need to publish, but no backend 
exists");
             return;
         }
+        long createPublishVersionTaskTime = System.currentTimeMillis();
         // every backend-transaction identified a single task
         AgentBatchTask batchTask = new AgentBatchTask();
         // traverse all ready transactions and dispatch the publish version 
task to all backends
@@ -113,7 +114,8 @@ public class PublishVersionDaemon extends MasterDaemon {
                 PublishVersionTask task = new PublishVersionTask(backendId,
                         transactionState.getTransactionId(),
                         transactionState.getDbId(),
-                        partitionVersionInfos);
+                        partitionVersionInfos,
+                        createPublishVersionTaskTime);
                 // add to AgentTaskQueue for handling finish report.
                 // not check return value, because the add will success
                 AgentTaskQueue.addTask(task);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to