This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5890d031c8b branch-3.0: [Job](Fix)Improve Event Publishing with 
Timeout #45103 (#45299)
5890d031c8b is described below

commit 5890d031c8b7a4565f87c6d370ca31322862df29
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Dec 12 09:52:42 2024 +0800

    branch-3.0: [Job](Fix)Improve Event Publishing with Timeout #45103 (#45299)
    
    Cherry-picked from #45103
    
    Co-authored-by: Calvin Kirs <guoqi...@selectdb.com>
---
 .../doris/job/disruptor/ExecuteTaskEvent.java      |  5 ++
 .../apache/doris/job/disruptor/TaskDisruptor.java  | 48 ++++++++----
 .../job/executor/DefaultTaskExecutorHandler.java   | 40 ++++------
 .../doris/job/executor/DispatchTaskHandler.java    |  7 +-
 .../apache/doris/job/executor/TaskProcessor.java   | 87 ++++++++++++++++++++++
 .../job/manager/TaskDisruptorGroupManager.java     | 48 ++++--------
 6 files changed, 158 insertions(+), 77 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java
index 3d8f9ed1534..ac67715d2ca 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java
@@ -34,4 +34,9 @@ public class ExecuteTaskEvent<T extends AbstractTask> {
         return ExecuteTaskEvent::new;
     }
 
+    public void clear() {
+        this.task = null;
+        this.jobConfig = null;
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
index 2b2e3df0418..9fb9d94e8df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
@@ -19,7 +19,6 @@ package org.apache.doris.job.disruptor;
 
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventTranslatorVararg;
-import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.WorkHandler;
 import com.lmax.disruptor.dsl.Disruptor;
@@ -28,6 +27,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Utility class for creating and managing a Disruptor instance.
@@ -73,20 +73,42 @@ public class TaskDisruptor<T> {
      */
     public boolean publishEvent(Object... args) {
         try {
-            RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
-            // Check if the RingBuffer has enough capacity to reserve 10 slots 
for tasks
-            // If there is insufficient capacity (less than 10 slots available)
-            // log a warning and drop the current task
-            if (!ringBuffer.hasAvailableCapacity(10)) {
-                LOG.warn("ring buffer has no available capacity,task will be 
dropped,"
-                        + "please check the task queue size.");
-                return false;
+            // Set the timeout to 1 second, converted to nanoseconds for 
precision
+            long timeoutInNanos = TimeUnit.SECONDS.toNanos(1);  // Timeout set 
to 1 second
+            long startTime = System.nanoTime();  // Record the start time
+
+            // Loop until the timeout is reached
+            while (System.nanoTime() - startTime < timeoutInNanos) {
+                // Check if there is enough remaining capacity in the ring 
buffer
+                // Adjusting to check if the required capacity is available 
(instead of hardcoding 1)
+                if (disruptor.getRingBuffer().remainingCapacity() > 1) {
+                    // Publish the event if there is enough capacity
+                    disruptor.getRingBuffer().publishEvent(eventTranslator, 
args);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("publishEvent success,the remaining buffer 
size is {}",
+                                disruptor.getRingBuffer().remainingCapacity());
+                    }
+                    return true;
+                }
+
+                // Wait for a short period before retrying
+                try {
+                    Thread.sleep(10);  // Adjust the wait time as needed 
(maybe increase if not high-frequency)
+                } catch (InterruptedException e) {
+                    // Log the exception and return false if interrupted
+                    Thread.currentThread().interrupt();  // Restore interrupt 
status
+                    LOG.warn("Thread interrupted while waiting to publish 
event", e);
+                    return false;
+                }
             }
-            ringBuffer.publishEvent(eventTranslator, args);
-            return true;
+
+            // Timeout reached without publishing the event
+            LOG.warn("Failed to publish event within the specified timeout (1 
second)."
+                            + "Queue may be full. the remaining buffer size is 
{}",
+                    disruptor.getRingBuffer().remainingCapacity());
         } catch (Exception e) {
-            LOG.warn("Failed to publish event", e);
-            // Handle the exception, e.g., retry or alert
+            // Catching general exceptions to handle unexpected errors
+            LOG.warn("Failed to publish event due to an unexpected error", e);
         }
         return false;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
index befa8cc35fc..cdfe7c0fe08 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
@@ -36,35 +36,23 @@ public class DefaultTaskExecutorHandler<T extends 
AbstractTask> implements WorkH
 
     @Override
     public void onEvent(ExecuteTaskEvent<T> executeTaskEvent) {
-        T task = executeTaskEvent.getTask();
-        if (null == task) {
-            log.warn("task is null, ignore,maybe task has been canceled");
-            return;
-        }
-        if (task.isCancelled()) {
-            log.info("task is canceled, ignore. task id is {}", 
task.getTaskId());
-            return;
-        }
-        log.info("start to execute task, task id is {}", task.getTaskId());
-        try {
-            task.runTask();
-        } catch (Exception e) {
-            //if task.onFail() throw exception, we will catch it here
-            log.warn("task before error, task id is {}", task.getTaskId(), e);
-        }
-        //todo we need discuss whether we need to use semaphore to control the 
concurrent task num
-        /* Semaphore semaphore = null;
-        // get token
         try {
-            int maxConcurrentTaskNum = 
executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum();
-            semaphore = TaskTokenManager.tryAcquire(task.getJobId(), 
maxConcurrentTaskNum);
+            T task = executeTaskEvent.getTask();
+            if (null == task) {
+                log.warn("task is null, ignore,maybe task has been canceled");
+                return;
+            }
+            if (task.isCancelled()) {
+                log.info("task is canceled, ignore. task id is {}", 
task.getTaskId());
+                return;
+            }
+            log.info("start to execute task, task id is {}", task.getTaskId());
             task.runTask();
         } catch (Exception e) {
-            task.onFail();
-            log.error("execute task error, task id is {}", task.getTaskId(), 
e);
+            log.error("execute task error, task id is {}", 
executeTaskEvent.getTask().getTaskId(), e);
         } finally {
-            if (null != semaphore) {
-                semaphore.release();
-            }*/
+            executeTaskEvent.clear();
+        }
+
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index d93393aa0ef..b8f726c4a0c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -21,7 +21,6 @@ import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.TaskType;
-import org.apache.doris.job.disruptor.TaskDisruptor;
 import org.apache.doris.job.disruptor.TimerJobEvent;
 import org.apache.doris.job.task.AbstractTask;
 
@@ -40,9 +39,9 @@ import java.util.Map;
 @Log4j2
 public class DispatchTaskHandler<T extends AbstractJob> implements 
WorkHandler<TimerJobEvent<T>> {
 
-    private final Map<JobType, TaskDisruptor<T>> disruptorMap;
+    private final Map<JobType, TaskProcessor> disruptorMap;
 
-    public DispatchTaskHandler(Map<JobType, TaskDisruptor<T>> disruptorMap) {
+    public DispatchTaskHandler(Map<JobType, TaskProcessor> disruptorMap) {
         this.disruptorMap = disruptorMap;
     }
 
@@ -66,7 +65,7 @@ public class DispatchTaskHandler<T extends AbstractJob> 
implements WorkHandler<T
                 }
                 JobType jobType = event.getJob().getJobType();
                 for (AbstractTask task : tasks) {
-                    if (!disruptorMap.get(jobType).publishEvent(task, 
event.getJob().getJobConfig())) {
+                    if (!disruptorMap.get(jobType).addTask(task)) {
                         task.cancel();
                         continue;
                     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TaskProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TaskProcessor.java
new file mode 100644
index 00000000000..d9d3f25dcd8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TaskProcessor.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.executor;
+
+import org.apache.doris.job.task.AbstractTask;
+
+import lombok.extern.log4j.Log4j2;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Log4j2
+public class TaskProcessor {
+    private ExecutorService executor;
+
+    public  TaskProcessor(int numberOfThreads, int queueSize, ThreadFactory 
threadFactory) {
+        this.executor = new ThreadPoolExecutor(
+                numberOfThreads,
+                numberOfThreads,
+                0L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<>(queueSize),
+                threadFactory,
+                new ThreadPoolExecutor.AbortPolicy()
+        );
+    }
+
+    public boolean addTask(AbstractTask task) {
+        try {
+            executor.execute(() -> runTask(task));
+            log.info("Add task to executor, task id: {}", task.getTaskId());
+            return true;
+        } catch (RejectedExecutionException e) {
+            log.warn("Failed to add task to executor, task id: {}", 
task.getTaskId(), e);
+            return false;
+        }
+    }
+
+    public void shutdown() {
+        log.info("Shutting down executor service...");
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            executor.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+        log.info("Executor service shut down successfully.");
+    }
+
+    private void runTask(AbstractTask task) {
+        try {
+            if (task == null) {
+                log.warn("Task is null, ignore. Maybe it has been canceled.");
+                return;
+            }
+            if (task.isCancelled()) {
+                log.info("Task is canceled, ignore. Task id: {}", 
task.getTaskId());
+                return;
+            }
+            log.info("Start to execute task, task id: {}", task.getTaskId());
+            task.runTask();
+        } catch (Exception e) {
+            log.warn("Execute task error, task id: {}", task.getTaskId(), e);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
index cc82b59a36a..e77dfbadcb3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
@@ -22,13 +22,10 @@ import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.base.JobExecutionConfiguration;
 import org.apache.doris.job.common.JobType;
-import org.apache.doris.job.disruptor.ExecuteTaskEvent;
 import org.apache.doris.job.disruptor.TaskDisruptor;
 import org.apache.doris.job.disruptor.TimerJobEvent;
-import org.apache.doris.job.executor.DefaultTaskExecutorHandler;
 import org.apache.doris.job.executor.DispatchTaskHandler;
-import org.apache.doris.job.extensions.insert.InsertTask;
-import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.job.executor.TaskProcessor;
 import org.apache.doris.job.task.AbstractTask;
 
 import com.lmax.disruptor.EventFactory;
@@ -44,7 +41,7 @@ import java.util.concurrent.TimeUnit;
 
 public class TaskDisruptorGroupManager<T extends AbstractTask> {
 
-    private final Map<JobType, TaskDisruptor<T>> disruptorMap = new 
EnumMap<>(JobType.class);
+    private final Map<JobType, TaskProcessor> disruptorMap = new 
EnumMap<>(JobType.class);
 
     @Getter
     private TaskDisruptor<TimerJobEvent<AbstractJob>> dispatchDisruptor;
@@ -92,44 +89,27 @@ public class TaskDisruptorGroupManager<T extends 
AbstractTask> {
     }
 
     private void registerInsertDisruptor() {
-        EventFactory<ExecuteTaskEvent<InsertTask>> insertEventFactory = 
ExecuteTaskEvent.factory();
         ThreadFactory insertTaskThreadFactory = new 
CustomThreadFactory("insert-task-execute");
-        WorkHandler[] insertTaskExecutorHandlers = new 
WorkHandler[DISPATCH_INSERT_THREAD_NUM];
-        for (int i = 0; i < DISPATCH_INSERT_THREAD_NUM; i++) {
-            insertTaskExecutorHandlers[i] = new 
DefaultTaskExecutorHandler<InsertTask>();
-        }
-        EventTranslatorVararg<ExecuteTaskEvent<InsertTask>> eventTranslator =
-                (event, sequence, args) -> {
-                    event.setTask((InsertTask) args[0]);
-                    event.setJobConfig((JobExecutionConfiguration) args[1]);
-                };
-        TaskDisruptor insertDisruptor = new 
TaskDisruptor<>(insertEventFactory, DISPATCH_INSERT_TASK_QUEUE_SIZE,
-                insertTaskThreadFactory, new 
LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS),
-                insertTaskExecutorHandlers, eventTranslator);
-        disruptorMap.put(JobType.INSERT, insertDisruptor);
+
+
+        TaskProcessor insertTaskProcessor = new 
TaskProcessor(DISPATCH_INSERT_THREAD_NUM,
+                DISPATCH_INSERT_TASK_QUEUE_SIZE, insertTaskThreadFactory);
+        disruptorMap.put(JobType.INSERT, insertTaskProcessor);
     }
 
     private void registerMTMVDisruptor() {
-        EventFactory<ExecuteTaskEvent<MTMVTask>> mtmvEventFactory = 
ExecuteTaskEvent.factory();
+
         ThreadFactory mtmvTaskThreadFactory = new 
CustomThreadFactory("mtmv-task-execute");
-        WorkHandler[] insertTaskExecutorHandlers = new 
WorkHandler[DISPATCH_MTMV_THREAD_NUM];
-        for (int i = 0; i < DISPATCH_MTMV_THREAD_NUM; i++) {
-            insertTaskExecutorHandlers[i] = new 
DefaultTaskExecutorHandler<MTMVTask>();
-        }
-        EventTranslatorVararg<ExecuteTaskEvent<MTMVTask>> eventTranslator =
-                (event, sequence, args) -> {
-                    event.setTask((MTMVTask) args[0]);
-                    event.setJobConfig((JobExecutionConfiguration) args[1]);
-                };
-        TaskDisruptor mtmvDisruptor = new TaskDisruptor<>(mtmvEventFactory, 
DISPATCH_MTMV_TASK_QUEUE_SIZE,
-                mtmvTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, 
TimeUnit.MILLISECONDS),
-                insertTaskExecutorHandlers, eventTranslator);
-        disruptorMap.put(JobType.MV, mtmvDisruptor);
+        TaskProcessor mtmvTaskProcessor = new 
TaskProcessor(DISPATCH_MTMV_THREAD_NUM,
+                DISPATCH_MTMV_TASK_QUEUE_SIZE, mtmvTaskThreadFactory);
+        disruptorMap.put(JobType.MV, mtmvTaskProcessor);
     }
 
     public boolean dispatchInstantTask(AbstractTask task, JobType jobType,
                                        JobExecutionConfiguration 
jobExecutionConfiguration) {
-        return disruptorMap.get(jobType).publishEvent(task, 
jobExecutionConfiguration);
+
+
+        return disruptorMap.get(jobType).addTask(task);
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to