This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0e63133c804 [Chore](job) Provides configuration of job execution queue 
size (#42253) (#42530)
0e63133c804 is described below

commit 0e63133c8048078a58eca66c511524635b312d45
Author: Calvin Kirs <k...@apache.org>
AuthorDate: Mon Oct 28 13:42:08 2024 +0800

    [Chore](job) Provides configuration of job execution queue size (#42253) 
(#42530)
    
    When dealing with a large number of tasks, the default execution queue
    size is 1024. This can lead to tasks being dropped if the queue becomes
    full.
    eg
    
    `dispatch instant task failed, job id is xxx`
    
    To address this, you can add the parameters `insert_task_queue_size` and
    `mtmv_task_queue_size` in the `fe.conf` configuration file. These
    parameters must be set to a power of 2.
    
    **Keep in mind, increasing this value is recommended only when thread
    resources are limited; otherwise, you should consider increasing the
    number of task execution threads.**
    
    (cherry picked from commit f9ea8f8229e9f5514c1773bd25c3cc11985c63fb)
    
    ## Proposed changes
    
    Issue Number: #42253
    
    <!--Describe your changes.-->
---
 .../main/java/org/apache/doris/common/Config.java  | 26 +++++++++++-----
 .../org/apache/doris/job/base/AbstractJob.java     | 19 +++++++++++-
 .../main/java/org/apache/doris/job/base/Job.java   |  7 +++++
 .../doris/job/executor/TimerJobSchedulerTask.java  | 10 ++++--
 .../doris/job/extensions/insert/InsertJob.java     |  6 ++++
 .../apache/doris/job/extensions/mtmv/MTMVJob.java  |  7 +++++
 .../job/manager/TaskDisruptorGroupManager.java     | 24 +++++++++++++--
 .../apache/doris/job/scheduler/JobScheduler.java   |  4 +--
 .../job/manager/TaskDisruptorGroupManagerTest.java | 36 ++++++++++++++++++++++
 regression-test/pipeline/p0/conf/fe.conf           |  1 +
 10 files changed, 125 insertions(+), 15 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 101f662aff5..fe0d28aee4f 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1728,7 +1728,7 @@ public class Config extends ConfigBase {
      * corresponding type of job
      * The value should be greater than 0, if it is 0 or <=0, set it to 5
      */
-    @ConfField(description = {"用于分发定时任务的线程数",
+    @ConfField(masterOnly = true, description = {"用于分发定时任务的线程数",
             "The number of threads used to dispatch timer job."})
     public static int job_dispatch_timer_job_thread_num = 2;
 
@@ -1739,24 +1739,34 @@ public class Config extends ConfigBase {
      * {@code @dispatch_timer_job_thread_num}
      * The value should be greater than 0, if it is 0 or <=0, set it to 1024
      */
-    @ConfField(description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs 
that can be queued."})
+    @ConfField(masterOnly = true, description = {"任务堆积时用于存放定时任务的队列大小", "The 
number of timer jobs that can be queued."})
     public static int job_dispatch_timer_job_queue_size = 1024;
-    @ConfField(description = {"一个 Job 的 task 最大的持久化数量,超过这个限制将会丢弃旧的 task 记录, 
如果值 < 1, 将不会持久化。",
+    @ConfField(masterOnly = true, description = {"一个 Job 的 task 
最大的持久化数量,超过这个限制将会丢弃旧的 task 记录, 如果值 < 1, 将不会持久化。",
             "Maximum number of persistence allowed per task in a job,exceeding 
which old tasks will be discarded,"
                    + "If the value is less than 1, it will not be persisted." 
})
     public static int max_persistence_task_count = 100;
-    @ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时",
+    @ConfField(masterOnly = true, description = {"MV task 的等待队列大小,如果是负数,则会使用 
1024,如果不是 2 的幂,则会自动选择一个最接近的"
+            + " 2 的幂次方数", "The size of the MV task's waiting queue If the size 
is negative, 1024 will be used. If "
+            + "the size is not a power of two, the nearest power of the size 
will be"
+            + " automatically selected."})
+    public static int mtmv_task_queue_size = 1024;
+    @ConfField(masterOnly = true, description = {"Insert task 
的等待队列大小,如果是负数,则会使用 1024,如果不是 2 的幂,则会自动选择一个最接近"
+            + " 的 2 的幂次方数", "The size of the Insert task's waiting queue If 
the size is negative, 1024 will be used."
+            + " If the size is not a power of two, the nearest power of the 
size will "
+            + "be automatically selected."})
+    public static int insert_task_queue_size = 1024;
+    @ConfField(masterOnly = true, description = {"finished 状态的 job 
最长保存时间,超过这个时间将会被删除, 单位:小时",
             "The longest time to save the job in finished status, it will be 
deleted after this time. Unit: hour"})
     public static int finished_job_cleanup_threshold_time_hour = 24;
 
-    @ConfField(description = {"用于执行 Insert 任务的线程数,值应该大于0,否则默认为5",
+    @ConfField(masterOnly = true, description = {"用于执行 Insert 
任务的线程数,值应该大于0,否则默认为10",
             "The number of threads used to consume Insert tasks, "
-                    + "the value should be greater than 0, if it is <=0, 
default is 5."})
+                    + "the value should be greater than 0, if it is <=0, 
default is 10."})
     public static int job_insert_task_consumer_thread_num = 10;
 
-    @ConfField(description = {"用于执行 MTMV 任务的线程数,值应该大于0,否则默认为5",
+    @ConfField(masterOnly = true, description = {"用于执行 MTMV 
任务的线程数,值应该大于0,否则默认为10",
             "The number of threads used to consume mtmv tasks, "
-                    + "the value should be greater than 0, if it is <=0, 
default is 5."})
+                    + "the value should be greater than 0, if it is <=0, 
default is 10."})
     public static int job_mtmv_task_consumer_thread_num = 10;
 
     /* job test config */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 94a0b0146cd..62ac0c4d59d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -63,7 +63,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> 
implements Job<T, C
             new Column("SucceedTaskCount", ScalarType.createStringType()),
             new Column("FailedTaskCount", ScalarType.createStringType()),
             new Column("CanceledTaskCount", ScalarType.createStringType())
-            );
+    );
     @SerializedName(value = "jid")
     private Long jobId;
 
@@ -415,6 +415,23 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         return getCommonTvfInfo();
     }
 
+    /**
+     * Generates a common error message when the execution queue is full.
+     *
+     * @param taskId                The ID of the task.
+     * @param queueConfigName       The name of the queue configuration.
+     * @param executeThreadConfigName The name of the execution thread 
configuration.
+     * @return A formatted error message.
+     */
+    protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String 
queueConfigName,
+                                                            String 
executeThreadConfigName) {
+        return String.format("Dispatch task failed, jobId: %d, jobName: %s, 
taskId: %d, the queue size is full, "
+                        + "you can increase the queue size by setting the 
property "
+                        + "%s in the fe.conf file or increase the value of "
+                        + "the property %s in the fe.conf file", getJobId(), 
getJobName(), taskId, queueConfigName,
+                executeThreadConfigName);
+    }
+
     @Override
     public ShowResultSetMetaData getJobMetaData() {
         ShowResultSetMetaData.Builder builder = 
ShowResultSetMetaData.builder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
index 1124e7f2d28..a7e75554c71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
@@ -153,4 +153,11 @@ public interface Job<T extends AbstractTask, C> {
      * @return TRow
      */
     TRow getTvfInfo();
+
+    /**
+     * Generates a common error message when the execution queue is full.
+     * @param taskId task id
+     * @return error msg for user
+     */
+    String formatMsgWhenExecuteQueueFull(Long taskId);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
index 25bbccf3fa2..65a9cf2e091 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
@@ -45,11 +45,17 @@ public class TimerJobSchedulerTask<T extends AbstractJob> 
implements TimerTask {
                 return;
             }
             if (!dispatchDisruptor.publishEvent(this.job)) {
-                log.warn("dispatch timer job failed, job id is {}, job name is 
{}",
-                        this.job.getJobId(), this.job.getJobName());
+                log.warn("dispatch timer job failed, queue maybe full. job id 
is {}, job name is {}",
+                        this.job.getJobId(), this.job.getJobName() + 
getMsgWhenExecuteQueueFull());
             }
         } catch (Exception e) {
             log.warn("dispatch timer job error, task id is {}", 
this.job.getJobId(), e);
         }
     }
+
+    private String getMsgWhenExecuteQueueFull() {
+        return "you can increase the queue size by setting the property "
+                + "job_dispatch_timer_job_queue_size in the fe.conf file or 
increase the value of "
+                + "the property job_dispatch_timer_job_thread_num in the 
fe.conf file";
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 487591efc04..ce35227feb1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -522,6 +522,12 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         }
     }
 
+    @Override
+    public String formatMsgWhenExecuteQueueFull(Long taskId) {
+        return commonFormatMsgWhenExecuteQueueFull(taskId, 
"insert_task_queue_size",
+                "job_insert_task_consumer_thread_num");
+    }
+
     private String getPriority() {
         return properties.getOrDefault(LoadStmt.PRIORITY, 
Priority.NORMAL.name());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
index add191001f9..62c005ca280 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
@@ -218,6 +218,13 @@ public class MTMVJob extends AbstractJob<MTMVTask, 
MTMVTaskContext> {
         return data;
     }
 
+    @Override
+    public String formatMsgWhenExecuteQueueFull(Long taskId) {
+        return commonFormatMsgWhenExecuteQueueFull(taskId, 
"mtmv_task_queue_size",
+                "job_mtmv_task_consumer_thread_num");
+
+    }
+
     @Override
     public TRow getTvfInfo() {
         TRow trow = new TRow();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
index b1ccb976443..cc82b59a36a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
@@ -65,8 +65,8 @@ public class TaskDisruptorGroupManager<T extends 
AbstractTask> {
     private static final int DISPATCH_MTMV_THREAD_NUM = 
Config.job_mtmv_task_consumer_thread_num > 0
             ? Config.job_mtmv_task_consumer_thread_num : 
DEFAULT_CONSUMER_THREAD_NUM;
 
-    private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = 
DEFAULT_RING_BUFFER_SIZE;
-    private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = 
DEFAULT_RING_BUFFER_SIZE;
+    private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = 
normalizeRingbufferSize(Config.insert_task_queue_size);
+    private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = 
normalizeRingbufferSize(Config.mtmv_task_queue_size);
 
 
     public void init() {
@@ -133,4 +133,24 @@ public class TaskDisruptorGroupManager<T extends 
AbstractTask> {
     }
 
 
+    /**
+     * Normalizes the given size to the nearest power of two.
+     * This method ensures that the size is a power of two, which is often 
required for optimal
+     * performance in certain data structures like ring buffers.
+     *
+     * @param size The input size to be normalized.
+     * @return The nearest power of two greater than or equal to the input 
size.
+     */
+    public static int normalizeRingbufferSize(int size) {
+        int ringBufferSize = size - 1;
+        if (size < 1) {
+            return DEFAULT_RING_BUFFER_SIZE;
+        }
+        ringBufferSize |= ringBufferSize >>> 1;
+        ringBufferSize |= ringBufferSize >>> 2;
+        ringBufferSize |= ringBufferSize >>> 4;
+        ringBufferSize |= ringBufferSize >>> 8;
+        ringBufferSize |= ringBufferSize >>> 16;
+        return ringBufferSize + 1;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 862b85597cd..ea0c263a5ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -168,8 +168,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
         for (AbstractTask task : tasks) {
             if (!taskDisruptorGroupManager.dispatchInstantTask(task, 
job.getJobType(),
                     job.getJobConfig())) {
-                throw new JobException("dispatch instant task failed, job id 
is "
-                        + job.getJobId() + ", task id is " + task.getTaskId());
+                throw new 
JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId()));
+
             }
             log.info("dispatch instant job, job id is {}, job name is {}, task 
id is {}", job.getJobId(),
                     job.getJobName(), task.getTaskId());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java
new file mode 100644
index 00000000000..661bdcdcca2
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.manager;
+
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TaskDisruptorGroupManagerTest {
+
+    @Test
+    public void testInit() {
+        Assertions.assertEquals(1024, 
TaskDisruptorGroupManager.normalizeRingbufferSize(1024));
+        Assertions.assertEquals(1024, 
TaskDisruptorGroupManager.normalizeRingbufferSize(-1));
+        Assertions.assertEquals(16, 
TaskDisruptorGroupManager.normalizeRingbufferSize(15));
+        Assertions.assertEquals(1024, 
TaskDisruptorGroupManager.normalizeRingbufferSize(1023));
+        Assertions.assertEquals(1024, 
TaskDisruptorGroupManager.normalizeRingbufferSize(-8));
+        Assertions.assertEquals(2048, 
TaskDisruptorGroupManager.normalizeRingbufferSize(1025));
+        Assertions.assertEquals(4096, 
TaskDisruptorGroupManager.normalizeRingbufferSize(2049));
+    }
+}
diff --git a/regression-test/pipeline/p0/conf/fe.conf 
b/regression-test/pipeline/p0/conf/fe.conf
index 40d5fbb792f..6f4d1e5f21f 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -107,6 +107,7 @@ label_keep_max_second = 300
 # job test configurations
 #allows the creation of jobs with an interval of second
 enable_job_schedule_second_for_test = true
+mtmv_task_queue_size = 4096
 
 enable_workload_group = true
 publish_topic_info_interval_ms = 1000


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to