This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c3183f5eb [Feature](Job)Provide unified internal Job scheduling 
(#21113)
2c3183f5eb is described below

commit 2c3183f5eb6a82b01fd23dbf55ded65ff53a3ca9
Author: Calvin Kirs <acm_mas...@163.com>
AuthorDate: Fri Jun 30 16:43:20 2023 +0800

    [Feature](Job)Provide unified internal Job scheduling (#21113)
    
    We use the time wheel algorithm to complete the scheduling and triggering 
of periodic tasks. The implementation of the time wheel algorithm refers to 
netty's HashedWheelTimer.
    We will periodically (10 minutes by default) put the events that need to be 
triggered in the future cycle into the time wheel for periodic scheduling. In 
order to ensure the efficient triggering of tasks and avoid task blocking and 
subsequent task scheduling delays, we use Disruptor to implement the production 
and consumption model.
    When the task expires and needs to be triggered, the task will be put into 
the RingBuffer of the Disruptor, and then the consumer thread will consume the 
task.
    Consumers need to register for events, and event registration needs to 
provide event executors. Event executors are a functional interface with only 
one method for executing events.
    If it is a single event, the event definition will be deleted after the 
scheduling is completed; if it is a periodic event, it will be put back into 
the time wheel according to the periodic scheduling after the scheduling is 
completed.
---
 .../apache/doris/scheduler/AsyncJobRegister.java   |  82 +++++++
 .../apache/doris/scheduler/JobRegisterFactory.java |  41 ++++
 .../doris/scheduler/constants/JobStatus.java       |  38 +++
 .../doris/scheduler/constants/SystemJob.java       |  42 ++++
 .../scheduler/disruptor/TimerTaskDisruptor.java    | 133 +++++++++++
 .../doris/scheduler/disruptor/TimerTaskEvent.java  |  38 +++
 .../disruptor/TimerTaskExpirationHandler.java      | 125 ++++++++++
 .../doris/scheduler/executor/JobExecutor.java      |  37 +++
 .../doris/scheduler/job/AsyncJobManager.java       | 262 +++++++++++++++++++++
 .../apache/doris/scheduler/job/DorisTimerTask.java |  58 +++++
 .../java/org/apache/doris/scheduler/job/Job.java   | 148 ++++++++++++
 .../doris/scheduler/registry/JobRegister.java      | 111 +++++++++
 .../scheduler/disruptor/AsyncJobManagerTest.java   | 117 +++++++++
 .../disruptor/TimerTaskDisruptorTest.java          |  77 ++++++
 fe/pom.xml                                         |   8 +-
 15 files changed, 1316 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/AsyncJobRegister.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/AsyncJobRegister.java
new file mode 100644
index 0000000000..59c64906e8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/AsyncJobRegister.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler;
+
+import org.apache.doris.scheduler.executor.JobExecutor;
+import org.apache.doris.scheduler.job.AsyncJobManager;
+import org.apache.doris.scheduler.job.Job;
+import org.apache.doris.scheduler.registry.JobRegister;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+/**
+ * This class registers timed scheduling events using the Netty time wheel 
algorithm to trigger events in a timely
+ * manner.
+ * After the event is triggered, it is produced by the Disruptor producer and 
consumed by the consumer, which is an
+ * asynchronous
+ * consumption model that does not guarantee strict timing accuracy.
+ */
+@Slf4j
+public class AsyncJobRegister implements JobRegister {
+
+    private final AsyncJobManager asyncJobManager;
+
+    public AsyncJobRegister() {
+        this.asyncJobManager = new AsyncJobManager();
+    }
+
+    @Override
+    public Long registerJob(String name, Long intervalMs, JobExecutor 
executor) {
+        return this.registerJob(name, intervalMs, null, null, executor);
+    }
+
+    @Override
+    public Long registerJob(String name, Long intervalMs, Long startTimeStamp, 
JobExecutor executor) {
+        return this.registerJob(name, intervalMs, startTimeStamp, null, 
executor);
+    }
+
+    @Override
+    public Long registerJob(String name, Long intervalMs, Long startTimeStamp, 
Long endTimeStamp,
+                                 JobExecutor executor) {
+
+        Job job = new Job(name, intervalMs, startTimeStamp, endTimeStamp, 
executor);
+        return asyncJobManager.registerJob(job);
+    }
+
+    @Override
+    public Boolean pauseJob(Long jobId) {
+        return asyncJobManager.pauseJob(jobId);
+    }
+
+    @Override
+    public Boolean stopJob(Long jobId) {
+        return asyncJobManager.stopJob(jobId);
+    }
+
+    @Override
+    public Boolean resumeJob(Long jobId) {
+        return asyncJobManager.resumeJob(jobId);
+    }
+
+    @Override
+    public void close() throws IOException {
+        asyncJobManager.close();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/JobRegisterFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/JobRegisterFactory.java
new file mode 100644
index 0000000000..2613a0302c
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/JobRegisterFactory.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler;
+
+import org.apache.doris.scheduler.registry.JobRegister;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class provides a factory for creating instances of {@link JobRegister}.
+ * The factory ensures that only one instance of the client is created in a 
lazy manner.
+ */
+public class JobRegisterFactory {
+    private static final AtomicReference<JobRegister> INSTANCE = new 
AtomicReference<>();
+
+    public static JobRegister getInstance() {
+        JobRegister instance = INSTANCE.get();
+        if (instance == null) {
+            instance = new AsyncJobRegister();
+            if (!INSTANCE.compareAndSet(null, instance)) {
+                instance = INSTANCE.get();
+            }
+        }
+        return instance;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java
new file mode 100644
index 0000000000..5c4af0b649
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.constants;
+
+public enum JobStatus {
+
+    /**
+     * When the task is not started, the initial state will be triggered.
+     * The initial state can be started
+     */
+    RUNNING,
+    /**
+     * When the task execution encounters an exception or manually suspends 
the task,
+     * the pause state will be triggered.
+     * Pause state can be resumed
+     */
+    PAUSED,
+    /**
+     * When the task is manually stopped, the stop state will be triggered.
+     * The stop state cannot be resumed
+     */
+    STOPPED,
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java
new file mode 100644
index 0000000000..f24f6e4e19
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.constants;
+
+import lombok.Getter;
+
+/**
+ * System scheduler event job
+ * They will start when scheduler starts
+ */
+public enum SystemJob {
+
+    /**
+     * System cycle scheduler event job, it will start cycle scheduler
+     */
+    SYSTEM_SCHEDULER_JOB("system_scheduler_event_job", 1L);
+
+    @Getter
+    private final String description;
+    @Getter
+    private final Long id;
+
+    SystemJob(String description, Long id) {
+        this.description = description;
+        this.id = id;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptor.java
new file mode 100644
index 0000000000..98a2736542
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptor.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.disruptor;
+
+import org.apache.doris.scheduler.job.AsyncJobManager;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventTranslatorTwoArg;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.WorkHandler;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Closeable;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a disruptor for processing event tasks consumed by a 
Disruptor.
+ *
+ * <p>The work handler retrieves the associated event job and executes it if 
it is running.
+ * If the event job is not running, the work handler logs an error message. If 
the event job execution fails,
+ * the work handler logs an error message and pauses the event job.
+ *
+ * <p>The work handler also handles system events by scheduling batch 
scheduler tasks.
+ */
+@Slf4j
+public class TimerTaskDisruptor implements Closeable {
+
+    private final Disruptor<TimerTaskEvent> disruptor;
+    private static final int DEFAULT_RING_BUFFER_SIZE = 1024;
+
+    /**
+     * The default timeout for {@link #close()} in seconds.
+     */
+    private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5;
+
+    /**
+     * The default number of consumers to create for each {@link Disruptor} 
instance.
+     */
+    private static final int DEFAULT_CONSUMER_COUNT = 
System.getProperty("event.task.disruptor.consumer.count")
+            == null ? Runtime.getRuntime().availableProcessors()
+            : 
Integer.parseInt(System.getProperty("event.task.disruptor.consumer.count"));
+
+    /**
+     * Whether this disruptor has been closed.
+     * if true, then we can't publish any more events.
+     */
+    private boolean isClosed = false;
+
+    /**
+     * The default {@link EventTranslatorTwoArg} to use for {@link 
#tryPublish(Long, Long)}.
+     * This is used to avoid creating a new object for each publish.
+     */
+    private static final EventTranslatorTwoArg<TimerTaskEvent, Long, Long> 
TRANSLATOR
+            = (event, sequence, jobId, taskId) -> {
+                event.setJobId(jobId);
+                event.setTaskId(taskId);
+            };
+
+    public TimerTaskDisruptor(AsyncJobManager asyncJobManager) {
+        ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
+        disruptor = new Disruptor<>(TimerTaskEvent.FACTORY, 
DEFAULT_RING_BUFFER_SIZE, producerThreadFactory,
+                ProducerType.SINGLE, new BlockingWaitStrategy());
+        WorkHandler<TimerTaskEvent>[] workers = new 
TimerTaskExpirationHandler[DEFAULT_CONSUMER_COUNT];
+        for (int i = 0; i < DEFAULT_CONSUMER_COUNT; i++) {
+            workers[i] = new TimerTaskExpirationHandler(asyncJobManager);
+        }
+        disruptor.handleEventsWithWorkerPool(workers);
+        disruptor.start();
+    }
+
+    /**
+     * Publishes an event to the disruptor.
+     *
+     * @param eventId event job id
+     * @param taskId  event task id
+     */
+    public void tryPublish(Long eventId, Long taskId) {
+        if (isClosed) {
+            log.info("tryPublish failed, disruptor is closed, eventId: {}", 
eventId);
+            return;
+        }
+        try {
+            disruptor.publishEvent(TRANSLATOR, eventId, taskId);
+        } catch (Exception e) {
+            log.error("tryPublish failed, eventId: {}", eventId, e);
+        }
+    }
+
+    public boolean tryPublish(TimerTaskEvent timerTaskEvent) {
+        if (isClosed) {
+            log.info("tryPublish failed, disruptor is closed, eventJobId: {}", 
timerTaskEvent.getJobId());
+            return false;
+        }
+        try {
+            disruptor.publishEvent(TRANSLATOR, timerTaskEvent.getJobId(), 
timerTaskEvent.getTaskId());
+            return true;
+        } catch (Exception e) {
+            log.error("tryPublish failed, eventJobId: {}", 
timerTaskEvent.getJobId(), e);
+            return false;
+        }
+    }
+
+
+    @Override
+    public void close() {
+        try {
+            isClosed = true;
+            // we can wait for 5 seconds, so that backlog can be committed
+            disruptor.shutdown(DEFAULT_CLOSE_WAIT_TIME_SECONDS, 
TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            log.warn("close disruptor failed", e);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskEvent.java
new file mode 100644
index 0000000000..3c1cfe440d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskEvent.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.disruptor;
+
+import com.lmax.disruptor.EventFactory;
+import lombok.Data;
+
+/**
+ * This class represents an event task that can be produced and consumed by 
the Disruptor.
+ * The event task contains the ID of the event job and the ID of the event 
task itself.
+ * The class also provides an event factory to create instances of {@link 
TimerTaskEvent}.
+ * <p>
+ * it's used by {@link TimerTaskDisruptor} and {@link 
TimerTaskExpirationHandler}
+ */
+@Data
+public class TimerTaskEvent {
+
+    private Long jobId;
+
+    private Long taskId;
+
+    public static final EventFactory<TimerTaskEvent> FACTORY = 
TimerTaskEvent::new;
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskExpirationHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskExpirationHandler.java
new file mode 100644
index 0000000000..8c4a5db681
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskExpirationHandler.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.disruptor;
+
+import org.apache.doris.scheduler.constants.SystemJob;
+import org.apache.doris.scheduler.job.AsyncJobManager;
+import org.apache.doris.scheduler.job.Job;
+
+import com.lmax.disruptor.WorkHandler;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Objects;
+
+/**
+ * This class represents a work handler for processing event tasks consumed by 
a Disruptor.
+ * The work handler retrieves the associated event job and executes it if it 
is running.
+ * If the event job is not running, the work handler logs an error message.
+ * If the event job execution fails, the work handler logs an error message 
and pauses the event job.
+ * The work handler also handles system events by scheduling batch scheduler 
tasks.
+ */
+@Slf4j
+public class TimerTaskExpirationHandler implements WorkHandler<TimerTaskEvent> 
{
+
+    /**
+     * The event job manager used to retrieve and execute event jobs.
+     */
+    private AsyncJobManager asyncJobManager;
+
+    /**
+     * Constructs a new {@link TimerTaskExpirationHandler} instance with the 
specified event job manager.
+     *
+     * @param asyncJobManager The event job manager used to retrieve and 
execute event jobs.
+     */
+    public TimerTaskExpirationHandler(AsyncJobManager asyncJobManager) {
+        this.asyncJobManager = asyncJobManager;
+    }
+
+    /**
+     * Processes an event task by retrieving the associated event job and 
executing it if it is running.
+     * If the event job is not running, it logs an error message.
+     * If the event job execution fails, it logs an error message and pauses 
the event job.
+     *
+     * @param event The event task to be processed.
+     */
+    @Override
+    public void onEvent(TimerTaskEvent event) {
+        if (checkIsSystemEvent(event)) {
+            onSystemEvent();
+            return;
+        }
+        onEventTask(event);
+    }
+
+    /**
+     * Processes an event task by retrieving the associated event job and 
executing it if it is running.
+     *
+     * @param timerTaskEvent The event task to be processed.
+     */
+    @SuppressWarnings("checkstyle:UnusedLocalVariable")
+    public void onEventTask(TimerTaskEvent timerTaskEvent) {
+        long jobId = timerTaskEvent.getJobId();
+        Job job = asyncJobManager.getJob(jobId);
+        if (job == null) {
+            log.info("Event job is null, eventJobId: {}", jobId);
+            return;
+        }
+        if (!job.isRunning()) {
+            log.info("Event job is not running, eventJobId: {}", jobId);
+            return;
+        }
+        log.debug("Event job is running, eventJobId: {}", jobId);
+        checkJobIsExpired(job);
+        try {
+            // TODO: We should record the result of the event task.
+            //Object result = job.getExecutor().execute();
+            job.getExecutor().execute();
+            job.setLatestCompleteExecuteTimestamp(System.currentTimeMillis());
+        } catch (Exception e) {
+            log.error("Event job execute failed, jobId: {}", jobId, e);
+            job.pause(e.getMessage());
+        }
+    }
+
+    /**
+     * Handles a system event by scheduling batch scheduler tasks.
+     */
+    private void onSystemEvent() {
+        try {
+            asyncJobManager.batchSchedulerTasks();
+        } catch (Exception e) {
+            log.error("System batch scheduler execute failed", e);
+        }
+    }
+
+    /**
+     * Checks whether the specified event task is a system event.
+     *
+     * @param event The event task to be checked.
+     * @return true if the event task is a system event, false otherwise.
+     */
+    private boolean checkIsSystemEvent(TimerTaskEvent event) {
+        return Objects.equals(event.getJobId(), 
SystemJob.SYSTEM_SCHEDULER_JOB.getId());
+    }
+
+    private void checkJobIsExpired(Job job) {
+        if (job.isExpired()) {
+            job.pause();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
new file mode 100644
index 0000000000..cd96b6a6e4
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.executor;
+
+/**
+ * This interface represents a callback for an event registration. All event 
registrations
+ * must implement this interface to provide an execution method.
+ *
+ * @param <T> The result type of the event job execution.
+ */
+@FunctionalInterface
+public interface JobExecutor<T> {
+
+    /**
+     * Executes the event job and returns the result.
+     * Exceptions will be caught internally, so there is no need to define or 
throw them separately.
+     *
+     * @return The result of the event job execution.
+     */
+    T execute();
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/AsyncJobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/AsyncJobManager.java
new file mode 100644
index 0000000000..e0944bf24a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/AsyncJobManager.java
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.job;
+
+import org.apache.doris.scheduler.disruptor.TimerTaskDisruptor;
+
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class AsyncJobManager implements Closeable {
+
+    private final ConcurrentHashMap<Long, Job> jobMap = new 
ConcurrentHashMap<>(128);
+
+    private long lastBatchSchedulerTimestamp;
+
+    /**
+     * batch scheduler interval time
+     */
+    private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 10 * 60 
* 1000L;
+
+    private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
+
+
+    private boolean isClosed = false;
+
+    /**
+     * key: jobid
+     * value: timeout list  for one job
+     * it's used to cancel task, if task has started, it can't be canceled
+     */
+    private final ConcurrentHashMap<Long, Map<Long, Timeout>> jobTimeoutMap =
+            new ConcurrentHashMap<>(128);
+
+    /**
+     * scheduler tasks, it's used to scheduler job
+     */
+    private final HashedWheelTimer dorisTimer = new HashedWheelTimer(1, 
TimeUnit.SECONDS,
+            660);
+
+    /**
+     * Producer and Consumer model
+     * disruptor is used to handle task
+     * disruptor will start a thread pool to handle task
+     */
+    private final TimerTaskDisruptor disruptor;
+
+    public AsyncJobManager() {
+        dorisTimer.start();
+        this.disruptor = new TimerTaskDisruptor(this);
+        this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
+        batchSchedulerTasks();
+        cycleSystemSchedulerTasks();
+    }
+
+    public Long registerJob(Job job) {
+        if (!job.checkJobParam()) {
+            log.warn("registerJob failed, job: {} param is invalid", job);
+            return null;
+        }
+        if (job.getStartTimestamp() != 0L) {
+            job.setNextExecuteTimestamp(job.getStartTimestamp() + 
job.getIntervalMilliSeconds());
+        } else {
+            job.setNextExecuteTimestamp(System.currentTimeMillis() + 
job.getIntervalMilliSeconds());
+        }
+
+        if (job.getNextExecuteTimestamp() < 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) {
+            List<Long> executeTimestamp = findTasksBetweenTime(job, 
System.currentTimeMillis(),
+                    BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + 
lastBatchSchedulerTimestamp,
+                    job.getNextExecuteTimestamp());
+            if (!executeTimestamp.isEmpty()) {
+                for (Long timestamp : executeTimestamp) {
+                    putOneTask(job.getJobId(), timestamp);
+                }
+            }
+        }
+
+        jobMap.putIfAbsent(job.getJobId(), job);
+        return job.getJobId();
+    }
+
+    public void unregisterJob(Long jobId) {
+        jobMap.remove(jobId);
+    }
+
+    public boolean pauseJob(Long jobId) {
+        if (jobMap.get(jobId) == null) {
+            log.warn("pauseJob failed, jobId: {} not exist", jobId);
+            return false;
+        }
+        cancelJobAllTask(jobId);
+        jobMap.get(jobId).pause();
+        return true;
+    }
+
+    public boolean resumeJob(Long jobId) {
+        if (jobMap.get(jobId) == null) {
+            log.warn("resumeJob failed, jobId: {} not exist", jobId);
+            return false;
+        }
+        jobMap.get(jobId).resume();
+        return true;
+    }
+
+    public boolean stopJob(Long jobId) {
+        if (jobMap.get(jobId) == null) {
+            log.warn("stopJob failed, jobId: {} not exist", jobId);
+            return false;
+        }
+        cancelJobAllTask(jobId);
+        jobMap.get(jobId).stop();
+        return true;
+    }
+
+    public Job getJob(Long jobId) {
+        return jobMap.get(jobId);
+    }
+
+    public Map<Long, Job> getAllJob() {
+        return jobMap;
+    }
+
+    public boolean batchSchedulerTasks() {
+        executeJobIdsWithinLastTenMinutesWindow();
+        return true;
+    }
+
+    public List<Long> findTasksBetweenTime(Job job, Long startTime, Long 
endTime, Long nextExecuteTime) {
+        List<Long> jobExecuteTimes = new ArrayList<>();
+        if (System.currentTimeMillis() < startTime) {
+            return jobExecuteTimes;
+        }
+        while (endTime >= nextExecuteTime) {
+            if (job.isTaskTimeExceeded()) {
+                break;
+            }
+            jobExecuteTimes.add(nextExecuteTime);
+            nextExecuteTime = job.getExecuteTimestampAndGeneratorNext();
+        }
+        return jobExecuteTimes;
+    }
+
+    /**
+     * We will get the task in the next time window, and then hand it over to 
the time wheel for timing trigger
+     */
+    private void executeJobIdsWithinLastTenMinutesWindow() {
+        if (jobMap.isEmpty()) {
+            return;
+        }
+        jobMap.forEach((k, v) -> {
+            if (v.isRunning() && (v.getNextExecuteTimestamp() + 
v.getIntervalMilliSeconds()
+                    < lastBatchSchedulerTimestamp + 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS)) {
+                List<Long> executeTimes = findTasksBetweenTime(v, 
lastBatchSchedulerTimestamp,
+                        lastBatchSchedulerTimestamp + 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS,
+                        v.getNextExecuteTimestamp());
+                if (!executeTimes.isEmpty()) {
+                    for (Long executeTime : executeTimes) {
+                        putOneTask(v.getJobId(), executeTime);
+                    }
+                }
+            }
+        });
+        this.lastBatchSchedulerTimestamp += 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
+    }
+
+    /**
+     * We will cycle system scheduler tasks every 10 minutes.
+     * Jobs will be re-registered after the task is completed
+     */
+    private void cycleSystemSchedulerTasks() {
+        dorisTimer.newTimeout(timeout -> {
+            batchSchedulerTasks();
+            cycleSystemSchedulerTasks();
+        }, BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, TimeUnit.MILLISECONDS);
+    }
+
+    public void putOneTask(Long jobId, Long startExecuteTime) {
+        DorisTimerTask task = new DorisTimerTask(jobId, startExecuteTime, 
disruptor);
+        if (isClosed) {
+            log.info("putOneTask failed, scheduler is closed, jobId: {}", 
task.getJobId());
+            return;
+        }
+        long delay = getDelaySecond(task.getStartTimestamp());
+        Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS);
+        if (timeout == null) {
+            log.error("putOneTask failed, jobId: {}", task.getJobId());
+            return;
+        }
+        if (jobTimeoutMap.containsKey(task.getJobId())) {
+            jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout);
+            return;
+        }
+        Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>();
+        timeoutMap.put(task.getTaskId(), timeout);
+        jobTimeoutMap.put(task.getJobId(), timeoutMap);
+    }
+
+    // cancel all task for one job
+    // if task has started, it can't be canceled
+    public void cancelJobAllTask(Long jobId) {
+        if (!jobTimeoutMap.containsKey(jobId)) {
+            return;
+        }
+
+        jobTimeoutMap.get(jobId).values().forEach(timeout -> {
+            if (!timeout.isExpired() || timeout.isCancelled()) {
+                timeout.cancel();
+            }
+        });
+    }
+
+    public void stopTask(Long jobId, Long taskId) {
+        if (!jobTimeoutMap.containsKey(jobId)) {
+            return;
+        }
+        cancelJobAllTask(jobId);
+        jobTimeoutMap.get(jobId).remove(taskId);
+    }
+
+    // get delay time, if startTimestamp is less than now, return 0
+    private long getDelaySecond(long startTimestamp) {
+        long delay = 0;
+        long now = System.currentTimeMillis();
+        if (startTimestamp > now) {
+            delay = startTimestamp - now;
+        } else {
+            log.warn("startTimestamp is less than now, startTimestamp: {}, 
now: {}", startTimestamp, now);
+        }
+        return delay / 1000;
+    }
+
+    @Override
+    public void close() throws IOException {
+        isClosed = true;
+        dorisTimer.stop();
+        disruptor.close();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/DorisTimerTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/DorisTimerTask.java
new file mode 100644
index 0000000000..7522548ad6
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/DorisTimerTask.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.job;
+
+import org.apache.doris.scheduler.disruptor.TimerTaskDisruptor;
+
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import lombok.Getter;
+
+import java.util.UUID;
+
+/**
+ * This class represents a timer task that can be scheduled by a Netty timer.
+ * When the timer task is triggered, it produces an event task using the 
Disruptor.
+ * The event task contains the ID of the event and the ID of the task itself.
+ */
+@Getter
+public class DorisTimerTask implements TimerTask {
+
+    private final Long jobId;
+
+    // more fields should be added here and record in feature
+    private final Long taskId = UUID.randomUUID().getMostSignificantBits();
+
+    private final Long startTimestamp;
+
+    private final TimerTaskDisruptor timerTaskDisruptor;
+
+    public DorisTimerTask(Long jobId, Long startTimestamp, TimerTaskDisruptor 
timerTaskDisruptor) {
+        this.jobId = jobId;
+        this.startTimestamp = startTimestamp;
+        this.timerTaskDisruptor = timerTaskDisruptor;
+    }
+
+    @Override
+    public void run(Timeout timeout) {
+        if (timeout.isCancelled()) {
+            return;
+        }
+        timerTaskDisruptor.tryPublish(jobId, taskId);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
new file mode 100644
index 0000000000..6923e2277f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.job;
+
+import org.apache.doris.scheduler.constants.JobStatus;
+import org.apache.doris.scheduler.executor.JobExecutor;
+
+import lombok.Data;
+
+import java.util.UUID;
+
+/**
+ * Job is the core of the scheduler module, which is used to store the Job 
information of the job module.
+ * We can use the job to uniquely identify a Job.
+ * The jobName is used to identify the job, which is not unique.
+ * The jobStatus is used to identify the status of the Job, which is used to 
control the execution of the
+ * job.
+ */
+@Data
+public class Job {
+
+    public Job(String jobName, Long intervalMilliSeconds, Long startTimestamp, 
Long endTimestamp,
+               JobExecutor executor) {
+        this.jobName = jobName;
+        this.executor = executor;
+        this.intervalMilliSeconds = intervalMilliSeconds;
+        this.startTimestamp = null == startTimestamp ? 0L : startTimestamp;
+        this.endTimestamp = null == endTimestamp ? 0L : endTimestamp;
+    }
+
+    private Long jobId = UUID.randomUUID().getMostSignificantBits();
+
+    private String jobName;
+
+    /**
+     * The status of the job, which is used to control the execution of the 
job.
+     *
+     * @see JobStatus
+     */
+    private JobStatus jobStatus = JobStatus.RUNNING;
+
+    /**
+     * The executor of the job.
+     *
+     * @see JobExecutor
+     */
+    private JobExecutor executor;
+
+    private String user;
+
+    private String errMsg;
+
+    private Long intervalMilliSeconds;
+
+    private Long updateTime;
+
+    private Long nextExecuteTimestamp;
+    private Long startTimestamp = 0L;
+
+    private Long endTimestamp = 0L;
+
+    private Long firstExecuteTimestamp = 0L;
+
+    private Long latestStartExecuteTimestamp = 0L;
+    private Long latestCompleteExecuteTimestamp = 0L;
+
+    public boolean isRunning() {
+        return jobStatus == JobStatus.RUNNING;
+    }
+
+    public boolean isStopped() {
+        return jobStatus == JobStatus.STOPPED;
+    }
+
+    public boolean isExpired(long nextExecuteTimestamp) {
+        if (endTimestamp == 0L) {
+            return false;
+        }
+        return nextExecuteTimestamp > endTimestamp;
+    }
+
+    public boolean isTaskTimeExceeded() {
+        if (endTimestamp == 0L) {
+            return false;
+        }
+        return System.currentTimeMillis() >= endTimestamp || 
nextExecuteTimestamp > endTimestamp;
+    }
+
+    public boolean isExpired() {
+        if (endTimestamp == 0L) {
+            return false;
+        }
+        return System.currentTimeMillis() >= endTimestamp;
+    }
+
+    public Long getExecuteTimestampAndGeneratorNext() {
+        this.latestStartExecuteTimestamp = nextExecuteTimestamp;
+        // todo The problem of delay should be considered. If it is greater 
than the ten-minute time window,
+        //  should the task be lost or executed on a new time window?
+        this.nextExecuteTimestamp = latestStartExecuteTimestamp + 
intervalMilliSeconds;
+        return nextExecuteTimestamp;
+    }
+
+    public void pause() {
+        this.jobStatus = JobStatus.PAUSED;
+    }
+
+    public void pause(String errMsg) {
+        this.jobStatus = JobStatus.PAUSED;
+        this.errMsg = errMsg;
+    }
+
+    public void resume() {
+        this.jobStatus = JobStatus.RUNNING;
+    }
+
+    public void stop() {
+        this.jobStatus = JobStatus.STOPPED;
+    }
+
+    public boolean checkJobParam() {
+        if (startTimestamp != 0L && startTimestamp < 
System.currentTimeMillis()) {
+            return false;
+        }
+        if (endTimestamp != 0L && endTimestamp < System.currentTimeMillis()) {
+            return false;
+        }
+        if (intervalMilliSeconds == null || intervalMilliSeconds <= 0L) {
+            return false;
+        }
+        return null != executor;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/JobRegister.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/JobRegister.java
new file mode 100644
index 0000000000..ebb6b0d590
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/JobRegister.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.registry;
+
+import org.apache.doris.scheduler.executor.JobExecutor;
+
+import java.io.IOException;
+
+/**
+ * This interface provides a contract for registering timed scheduling events.
+ * The implementation should trigger events in a timely manner using a 
specific algorithm.
+ * The execution of the events may be asynchronous and not guarantee strict 
timing accuracy.
+ */
+public interface JobRegister {
+
+    /**
+     * Register a job
+     *
+     * @param name        job name,it's not unique
+     * @param intervalMs  job interval, unit: ms
+     * @param executor    job executor @See {@link JobExecutor}
+     * @return event job id
+     */
+    Long registerJob(String name, Long intervalMs, JobExecutor executor);
+
+    /**
+     * Register a job
+     *
+     * @param name           job name,it's not unique
+     * @param intervalMs     job interval, unit: ms
+     * @param startTimeStamp job start time stamp, unit: ms
+     *                       if startTimeStamp is null, event job will start 
immediately in the next cycle
+     *                       startTimeStamp should be greater than current time
+     * @param executor       event job executor @See {@link JobExecutor}
+     * @return job id
+     */
+    Long registerJob(String name, Long intervalMs, Long startTimeStamp, 
JobExecutor executor);
+
+
+    /**
+     * Register a event job
+     *
+     * @param name           job name,it's not unique
+     * @param intervalMs     job interval, unit: ms
+     * @param startTimeStamp job start time stamp, unit: ms
+     *                       if startTimeStamp is null, job will start 
immediately in the next cycle
+     *                       startTimeStamp should be greater than current time
+     * @param endTimeStamp   job end time stamp, unit: ms
+     *                       if endTimeStamp is null, job will never stop
+     *                       endTimeStamp must be greater than startTimeStamp 
and endTimeStamp should be greater
+     *                       than current time
+     * @param executor       event job executor @See {@link JobExecutor}
+     * @return event job id
+     */
+    Long registerJob(String name, Long intervalMs, Long startTimeStamp, Long 
endTimeStamp,
+                          JobExecutor executor);
+
+    /**
+     * if job is running, pause it
+     * pause means event job will not be executed in the next cycle,but 
current cycle will not be interrupted
+     * we can resume it by {@link #resumeJob(Long)}
+     *
+     * @param eventId event job id
+     *                if eventId not exist, return false
+     * @return true if pause success, false if pause failed
+     */
+    Boolean pauseJob(Long jodId);
+
+    /**
+     * if job is running, stop it
+     * stop means event job will not be executed in the next cycle and current 
cycle will be interrupted
+     * stop not can be resumed, if you want to resume it, you should register 
it again
+     * we will delete stopped event job
+     *
+     * @param jobId event job id
+     * @return true if stop success, false if stop failed
+     */
+    Boolean stopJob(Long jobId);
+
+    /**
+     * if job is paused, resume it
+     *
+     * @param jobId job id
+     * @return true if resume success, false if resume failed
+     */
+    Boolean resumeJob(Long jobId);
+
+    /**
+     * close job scheduler register
+     * close means job scheduler register will not accept new job
+     * Jobs that have not reached the trigger time will not be executed. Jobs 
that have reached the trigger time will
+     * have an execution time of 5 seconds, and will not be executed if the 
time exceeds
+     */
+    void close() throws IOException;
+
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/AsyncJobManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/AsyncJobManagerTest.java
new file mode 100644
index 0000000000..dceb8049cd
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/AsyncJobManagerTest.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.disruptor;
+
+import org.apache.doris.scheduler.executor.JobExecutor;
+import org.apache.doris.scheduler.job.AsyncJobManager;
+import org.apache.doris.scheduler.job.Job;
+
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class AsyncJobManagerTest {
+
+    AsyncJobManager asyncJobManager;
+
+    private static AtomicInteger testExecuteCount = new AtomicInteger(0);
+    Job job = new Job("test", 6000L, null,
+            null, new TestExecutor());
+
+    @BeforeEach
+    public void init() {
+        testExecuteCount.set(0);
+        asyncJobManager = new AsyncJobManager();
+    }
+
+    @Test
+    public void testCycleScheduler() {
+        asyncJobManager.registerJob(job);
+        //consider the time of the first execution and give some buffer time
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> 
testExecuteCount.get() >= 3);
+    }
+
+    @Test
+    public void testCycleSchedulerAndStop() {
+        asyncJobManager.registerJob(job);
+        long startTime = System.currentTimeMillis();
+        Awaitility.await().atMost(8, TimeUnit.SECONDS).until(() -> 
testExecuteCount.get() >= 1);
+        asyncJobManager.unregisterJob(job.getJobId());
+        //consider the time of the first execution and give some buffer time
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> 
System.currentTimeMillis() >= startTime + 13000L);
+        Assertions.assertEquals(1, testExecuteCount.get());
+
+    }
+
+    @Test
+    public void testCycleSchedulerWithIncludeStartTimeAndEndTime() {
+        job.setStartTimestamp(System.currentTimeMillis() + 6000L);
+        long endTimestamp = System.currentTimeMillis() + 19000L;
+        job.setEndTimestamp(endTimestamp);
+        asyncJobManager.registerJob(job);
+        //consider the time of the first execution and give some buffer time
+
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> 
System.currentTimeMillis()
+                >= endTimestamp + 12000L);
+        Assertions.assertEquals(2, testExecuteCount.get());
+    }
+
+    @Test
+    public void testCycleSchedulerWithIncludeEndTime() {
+        long endTimestamp = System.currentTimeMillis() + 13000;
+        job.setEndTimestamp(endTimestamp);
+        asyncJobManager.registerJob(job);
+        //consider the time of the first execution and give some buffer time
+        Awaitility.await().atMost(36, TimeUnit.SECONDS).until(() -> 
System.currentTimeMillis()
+                >= endTimestamp + 12000L);
+        Assertions.assertEquals(2, testExecuteCount.get());
+    }
+
+    @Test
+    public void testCycleSchedulerWithIncludeStartTime() {
+
+        long startTimestamp = System.currentTimeMillis() + 6000L;
+        job.setStartTimestamp(startTimestamp);
+        asyncJobManager.registerJob(job);
+        //consider the time of the first execution and give some buffer time
+        Awaitility.await().atMost(14, TimeUnit.SECONDS).until(() -> 
System.currentTimeMillis()
+                >= startTimestamp + 7000L);
+        Assertions.assertEquals(1, testExecuteCount.get());
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        asyncJobManager.close();
+    }
+
+    class TestExecutor implements JobExecutor<Boolean> {
+        @Override
+        public Boolean execute() {
+            log.info("test execute count:{}", 
testExecuteCount.incrementAndGet());
+            return true;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptorTest.java
new file mode 100644
index 0000000000..1630b1f864
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptorTest.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.disruptor;
+
+import org.apache.doris.scheduler.executor.JobExecutor;
+import org.apache.doris.scheduler.job.AsyncJobManager;
+import org.apache.doris.scheduler.job.Job;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Tested;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+public class TimerTaskDisruptorTest {
+
+    @Tested
+    private TimerTaskDisruptor timerTaskDisruptor;
+
+    @Injectable
+    private AsyncJobManager asyncJobManager;
+
+    private static boolean testEventExecuteFlag = false;
+
+    @BeforeEach
+    public void init() {
+        timerTaskDisruptor = new TimerTaskDisruptor(asyncJobManager);
+    }
+
+    @Test
+    void testPublishEventAndConsumer() {
+        Job job = new Job("test", 6000L, null,
+                null, new TestExecutor());
+        new Expectations() {{
+                asyncJobManager.getJob(anyLong);
+                result = job;
+            }};
+        timerTaskDisruptor.tryPublish(job.getJobId(), 
UUID.randomUUID().getMostSignificantBits());
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> 
testEventExecuteFlag);
+        Assertions.assertTrue(testEventExecuteFlag);
+    }
+
+
+    class TestExecutor implements JobExecutor<Boolean> {
+        @Override
+        public Boolean execute() {
+            testEventExecuteFlag = true;
+            return true;
+        }
+    }
+
+    @AfterEach
+    public void after() {
+        timerTaskDisruptor.close();
+    }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index 12fe1b7231..048257fdb5 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -310,7 +310,7 @@ under the License.
         <vesoft.client.version>3.0.0</vesoft.client.version>
         <!-- paimon -->
         <paimon.version>0.4.0-incubating</paimon.version>
-        <disruptor.version>3.3.4</disruptor.version>
+        <disruptor.version>3.4.4</disruptor.version>
     </properties>
     <profiles>
         <profile>
@@ -1432,6 +1432,12 @@ under the License.
             <groupId>org.jmockit</groupId>
             <artifactId>jmockit</artifactId>
         </dependency>
+        <!-- should be used in test scope -->
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
+
     </dependencies>
     <reporting>
         <plugins>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to