This is an automated email from the ASF dual-hosted git repository.

gaoxingcun pushed a commit to branch reconfiguration_scheduling
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git

commit d43fbad9086cea2e8c2e5e6859e4e900d5c9beca
Author: TJxiaobao <[email protected]>
AuthorDate: Wed Sep 3 12:58:37 2025 +0800

    modified:modify some architecture directories
---
 docs/ARCHITECTURE_COMPARISON.md                    | 255 ++++++++++
 docs/SCHEDULING_ARCHITECTURE.md                    | 533 +++++++++++++++++++++
 examples/main_simulation.go                        | 289 +++++++++++
 pkg/banner/embed.go                                |  24 +-
 pkg/collector/basic/database/jdbc_auto_register.go |  36 ++
 pkg/collector/bootstrap.go                         | 119 +++--
 pkg/collector/collect_service.go                   | 248 +++++++++-
 .../common/dispatcher/entrance/collect_server.go   |  51 +-
 pkg/collector/common/timer/timer_dispatcher.go     | 203 +++++++-
 pkg/collector/config/config.go                     | 118 -----
 pkg/collector/config/config_test.go                |  45 --
 pkg/collector/registry.go                          |  39 +-
 pkg/collector/registry/registry_center.go          | 251 ++++++++++
 pkg/collector/server/server.go                     |  69 ---
 pkg/collector/server/server_test.go                |   1 -
 pkg/types/job/job_types.go                         |  17 +
 16 files changed, 1995 insertions(+), 303 deletions(-)

diff --git a/docs/ARCHITECTURE_COMPARISON.md b/docs/ARCHITECTURE_COMPARISON.md
new file mode 100644
index 0000000..972d942
--- /dev/null
+++ b/docs/ARCHITECTURE_COMPARISON.md
@@ -0,0 +1,255 @@
+# HertzBeat Go vs Java 架构对比速览
+
+## 🔄 调度流程对比
+
+### Java版本调度流程
+```mermaid
+graph TD
+    A[Manager调度器] --> B[一致性哈希分配]
+    B --> C[网络通信ClusterMsg]
+    C --> D[Collector接收]
+    D --> E[TimerDispatcher]
+    E --> F[WheelTimerTask]
+    F --> G[CommonDispatcher]
+    G --> H[MetricsCollectorQueue]
+    H --> I[WorkerPool线程池]
+    I --> J[MetricsCollect]
+    J --> K[CollectStrategyFactory]
+    K --> L[SPI加载采集器]
+    L --> M[具体采集器JDBC/HTTP/SSH]
+    M --> N[返回结果]
+```
+
+### Go版本调度流程
+```mermaid
+graph TD
+    A1[Manager调度器] --> B1[网络通信待完善]
+    B1 --> C1[CollectServer接收]
+    C1 --> D1[TimerDispatcher]
+    D1 --> E1[WheelTimerTask]
+    E1 --> F1[DispatchMetricsTask]
+    F1 --> G1[WorkerPool协程池]
+    G1 --> H1[MetricsCollect]
+    H1 --> I1[CollectService]
+    I1 --> J1[CollectorRegistry]
+    J1 --> K1[自动注册采集器]
+    K1 --> L1[具体采集器JDBC等]
+    L1 --> M1[返回结果]
+```
+
+## 📊 核心差异对比表
+
+| 维度 | Java版本 | Go版本 | 备注 |
+|------|----------|---------|------|
+| **网络通信** | ✅ 完整的ClusterMsg协议 | ⚠️ 待完善(目前模拟) | Go版本待开发 |
+| **任务调度** | TimerDispatcher + CommonDispatcher | TimerDispatcher直接分发 | 
Go版本更简洁 |
+| **工作池** | ThreadPoolExecutor | Goroutine Pool | Go版本更轻量 |
+| **任务队列** | 优先级队列MetricsCollectorQueue | 直接Channel队列 | Java版本更复杂 |
+| **采集器注册** | Java SPI自动发现 | init()函数+注册中心 | 各有优势 |
+| **并发模型** | 线程池(~2MB/线程) | 协程池(~2KB/协程) | Go版本更高效 |
+| **内存占用** | 较大(JVM开销) | 较小(原生编译) | Go版本优势明显 |
+| **启动速度** | 较慢(JVM+类加载) | 很快(静态编译) | Go版本优势明显 |
+| **超时处理** | 独立ScheduledExecutor | 集成在TimerDispatcher | Go版本更统一 |
+| **重试机制** | 基于优先级队列 | 指数退避策略 | 实现方式不同 |
+
+## 🎯 关键技术对比
+
+### 1. 并发处理
+
+#### Java版本
+```java
+// 线程池配置
+ThreadPoolExecutor workerExecutor = new ThreadPoolExecutor(
+    coreSize,           // 核心线程数
+    maxSize,            // 最大线程数  
+    10, TimeUnit.SECONDS, // 空闲超时
+    new SynchronousQueue<>(), // 同步队列
+    threadFactory,      // 线程工厂
+    new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
+);
+```
+
+#### Go版本
+```go
+// 协程池配置
+type WorkerPool struct {
+    config     WorkerPoolConfig
+    taskQueue  chan Task        // 任务通道
+    workers    sync.Map         // 工作协程映射
+    ctx        context.Context  // 上下文控制
+}
+
+// 动态协程管理
+func (wp *WorkerPool) adjustWorkerCount() {
+    if queuedTasks > currentWorkers*2 {
+        wp.addWorker(false) // 添加非核心worker
+    }
+}
+```
+
+### 2. 采集器注册
+
+#### Java版本 (SPI机制)
+```java
+// META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
+org.apache.hertzbeat.collector.collect.database.JdbcCommonCollect
+org.apache.hertzbeat.collector.collect.http.HttpCollectImpl
+
+// 自动加载
+ServiceLoader<AbstractCollect> loader = 
ServiceLoader.load(AbstractCollect.class);
+for (AbstractCollect collect : loader) {
+    COLLECT_STRATEGY.put(collect.supportProtocol(), collect);
+}
+```
+
+#### Go版本 (注册中心机制)
+```go
+// 自动注册
+func init() {
+    registry.RegisterCollectorFactory(
+        "jdbc",
+        func(logger logger.Logger) basic.AbstractCollector {
+            return NewJDBCCollector(logger)
+        },
+        registry.WithPriority(10),
+    )
+}
+
+// 统一注册
+func RegisterBuiltinCollectors(service *CollectService, logger logger.Logger) {
+    collectors, _ := registry.GetGlobalCenter().CreateCollectors(logger)
+    for protocol, collector := range collectors {
+        service.RegisterCollector(protocol, collector)
+    }
+}
+```
+
+### 3. 任务分发策略
+
+#### Java版本 (多层分发)
+```java
+// 1. TimerDispatcher调度
+wheelTimer.newTimeout(timerJob, interval, TimeUnit.SECONDS);
+
+// 2. CommonDispatcher分发
+MetricsCollect metricsCollect = jobRequestQueue.getJob();
+workerPool.executeJob(metricsCollect);
+
+// 3. WorkerPool执行
+workerExecutor.execute(runnable);
+```
+
+#### Go版本 (直接分发)
+```go
+// 1. TimerDispatcher调度
+timeout := td.wheelTimer.NewTimeout(timerTask, delay)
+
+// 2. 直接分发到WorkerPool
+for _, metric := range job.Metrics {
+    metricsCollect := worker.NewMetricsCollect(...)
+    td.workerPool.Submit(metricsCollect)
+}
+```
+
+## 🚀 性能特性对比
+
+### 启动性能
+| 指标 | Java版本 | Go版本 | 对比 |
+|------|----------|--------|------|
+| 启动时间 | ~3-5秒 | ~0.1-0.5秒 | Go快10倍 |
+| 内存占用 | ~128MB+ | ~20-50MB | Go省60%+ |
+| 文件大小 | ~50MB+ | ~10-20MB | Go小50%+ |
+
+### 运行时性能
+| 指标 | Java版本 | Go版本 | 对比 |
+|------|----------|--------|------|
+| 协程/线程开销 | 2MB/线程 | 2KB/协程 | Go省99.9% |
+| 上下文切换 | 重量级 | 轻量级 | Go更快 |
+| GC延迟 | 可能较长 | 通常<1ms | Go更稳定 |
+
+## 🔧 开发体验对比
+
+### 添加新采集器复杂度
+
+#### Java版本
+1. ✅ 实现AbstractCollect接口
+2. ✅ 在META-INF/services中注册 
+3. ✅ 自动发现,无需修改代码
+
+**总结**: 配置简单,但依赖SPI机制
+
+#### Go版本  
+1. ✅ 实现AbstractCollector接口
+2. ✅ 添加init()注册函数
+3. ✅ 在registry.go中添加import
+
+**总结**: 需要手动导入,但更灵活可控
+
+### 调试和监控
+
+#### Java版本
+```java
+// JVM工具链丰富
+- JProfiler、VisualVM等性能分析
+- JMX监控指标
+- 成熟的APM工具支持
+```
+
+#### Go版本
+```go
+// Go原生工具
+- go tool pprof性能分析  
+- runtime.ReadMemStats()监控
+- Prometheus metrics集成
+```
+
+## 🎯 选择建议
+
+### 适合Java版本的场景
+- ✅ 企业级环境,成熟度要求高
+- ✅ 需要丰富的第三方库支持
+- ✅ 团队Java技能更强
+- ✅ CPU密集型采集任务
+
+### 适合Go版本的场景  
+- ✅ 云原生环境,资源敏感
+- ✅ 高并发IO密集型采集
+- ✅ 快速启动和部署需求
+- ✅ 容器化微服务架构
+
+## 📈 发展趋势
+
+### Java版本优势保持
+- 生态系统成熟
+- 企业级特性完善
+- 社区支持强大
+
+### Go版本发展方向
+- 网络通信层完善
+- 更多协议采集器
+- 云原生特性增强
+- 性能优势扩大
+
+---
+
+## 📋 快速参考
+
+### 核心类对应关系
+| Java类 | Go对应 | 功能 |
+|--------|--------|------|
+| `TimerDispatcher` | `TimerDispatcher` | 时间轮调度 |
+| `CommonDispatcher` | `DispatchMetricsTask` | 任务分发 |
+| `WorkerPool` | `WorkerPool` | 工作池 |
+| `MetricsCollect` | `MetricsCollect` | 采集任务 |
+| `CollectStrategyFactory` | `CollectService` | 采集器管理 |
+| `AbstractCollect` | `AbstractCollector` | 采集器接口 |
+
+### 关键配置参数
+| 配置项 | Java默认值 | Go默认值 | 说明 |
+|--------|------------|----------|------|
+| 核心工作线程 | CPU核数 | CPU核数 | 基础并发数 |
+| 最大工作线程 | CPU核数×16 | CPU核数×4 | 最大并发数 |
+| 任务队列大小 | SynchronousQueue | 1000 | 队列容量 |
+| 时间轮大小 | 512 | 512 | 调度精度 |
+
+这个对比文档帮助开发者快速理解两个版本的差异,选择合适的版本进行开发和部署。
diff --git a/docs/SCHEDULING_ARCHITECTURE.md b/docs/SCHEDULING_ARCHITECTURE.md
new file mode 100644
index 0000000..193c1f0
--- /dev/null
+++ b/docs/SCHEDULING_ARCHITECTURE.md
@@ -0,0 +1,533 @@
+# HertzBeat Go Collector 调度架构设计文档
+
+## 📋 概述
+
+本文档详细说明了HertzBeat Go版本Collector的调度架构设计,包括与Java版本的对比分析、核心组件说明、调度流程详解等内容。
+
+## 🏗️ 整体架构对比
+
+### Java版本架构
+```
+Manager调度器 → 一致性哈希 → 网络通信 → Collector
+    ↓
+TimerDispatcher → WheelTimerTask → CommonDispatcher
+    ↓
+MetricsCollectorQueue → WorkerPool → MetricsCollect
+    ↓
+CollectStrategyFactory(SPI) → 具体采集器 → 返回结果
+```
+
+### Go版本架构
+```
+Manager调度器 → 网络通信(待完善) → CollectServer
+    ↓
+TimerDispatcher → WheelTimerTask → DispatchMetricsTask
+    ↓
+WorkerPool → MetricsCollect → CollectService
+    ↓
+CollectorRegistry(手动注册) → 具体采集器 → 返回结果
+```
+
+## 🔄 调度流程详细对比
+
+### 1. 任务接收阶段
+
+#### Java版本
+```java
+// Manager发送任务
+ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
+    .setType(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK)
+    .setDirection(ClusterMsg.Direction.REQUEST)
+    .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
+    .build();
+manageServer.sendMsg(node.getIdentity(), message);
+
+// Collector接收任务
+@Override
+public void response(ClusterMsg.Message message) {
+    Job job = JsonUtil.fromJson(message.getMsg().toStringUtf8(), Job.class);
+    collectJobService.addAsyncCollectJob(job);
+}
+```
+
+#### Go版本
+```go
+// Manager发送任务 (目前为模拟方式)
+func (cs *CollectServer) ReceiveJob(job *jobtypes.Job, eventListener 
timer.CollectResponseEventListener) error {
+    if !cs.isStarted {
+        return fmt.Errorf("collect server is not started")
+    }
+    
+    // 直接添加到调度器
+    return cs.timerDispatcher.AddJob(job, eventListener)
+}
+```
+
+**主要差异**:
+- Java: 完整的网络通信协议栈
+- Go: 网络层待完善,目前使用模拟接口
+
+### 2. 任务调度阶段
+
+#### Java版本
+```java
+// TimerDispatcher调度
+@Override
+public void addJob(Job addJob, CollectResponseEventListener eventListener) {
+    WheelTimerTask timerJob = new WheelTimerTask(addJob);
+    if (addJob.isCyclic()) {
+        Timeout timeout = wheelTimer.newTimeout(timerJob, 
addJob.getInterval(), TimeUnit.SECONDS);
+        currentCyclicTaskMap.put(addJob.getId(), timeout);
+    }
+}
+
+// WheelTimerTask执行
+@Override
+public void run(Timeout timeout) throws Exception {
+    // 分发到CommonDispatcher
+    metricsTaskDispatch.dispatchMetricsTask(this);
+}
+```
+
+#### Go版本
+```go
+// TimerDispatcher调度
+func (td *TimerDispatcher) AddJob(job *jobtypes.Job, eventListener 
CollectResponseEventListener) error {
+    timerTask := NewWheelTimerTask(job, td.metricsDispatcher, td.logger)
+    
+    var delay time.Duration
+    if job.DefaultInterval > 0 {
+        delay = time.Duration(job.DefaultInterval) * time.Second
+    }
+    
+    timeout := td.wheelTimer.NewTimeout(timerTask, delay)
+    
+    if job.IsCyclic {
+        td.cyclicTasks.Store(job.ID, timeout)
+    } else {
+        td.tempTasks.Store(job.ID, timeout)
+    }
+}
+
+// DispatchMetricsTask执行
+func (td *TimerDispatcher) DispatchMetricsTask(timeout *jobtypes.Timeout) 
error {
+    if task, ok := timeout.Task().(*WheelTimerTask); ok {
+        job := task.GetJob()
+        
+        // 为每个metric创建采集任务
+        for _, metric := range job.Metrics {
+            metricsCollect := worker.NewMetricsCollect(
+                &metric, timeout, td.collectDispatcher,
+                "collector-go", td.collectService, td.logger,
+            )
+            td.workerPool.Submit(metricsCollect)
+        }
+    }
+}
+```
+
+**主要差异**:
+- Java: 使用CommonDispatcher作为中间层
+- Go: TimerDispatcher直接分发任务到WorkerPool
+
+### 3. 任务分发阶段
+
+#### Java版本
+```java
+// CommonDispatcher处理
+public void run() {
+    while (!Thread.currentThread().isInterrupted()) {
+        MetricsCollect metricsCollect = jobRequestQueue.getJob();
+        if (metricsCollect != null) {
+            workerPool.executeJob(metricsCollect);
+        }
+    }
+}
+
+// 使用优先级队列
+private final MetricsCollectorQueue jobRequestQueue;
+```
+
+#### Go版本
+```go
+// WorkerPool直接处理
+func (wp *WorkerPool) Submit(task Task) error {
+    select {
+    case wp.taskQueue <- task:
+        atomic.AddInt64(&wp.stats.QueuedTasks, 1)
+        return nil
+    default:
+        return fmt.Errorf("task queue is full")
+    }
+}
+
+// Worker执行任务
+func (w *worker) executeTask(task Task) {
+    timeout := task.Timeout()
+    ctx, cancel := context.WithTimeout(w.ctx, timeout)
+    defer cancel()
+    
+    done := make(chan error, 1)
+    go func() {
+        done <- task.Execute()
+    }()
+    
+    select {
+    case err := <-done:
+        // 任务完成
+    case <-ctx.Done():
+        // 任务超时
+    }
+}
+```
+
+**主要差异**:
+- Java: 专门的优先级队列 + 分发线程
+- Go: 直接的通道队列 + Goroutine池
+
+### 4. 采集器调用阶段
+
+#### Java版本 (SPI自动发现)
+```java
+// CollectStrategyFactory使用SPI
+@Override
+public void run(String... args) throws Exception {
+    ServiceLoader<AbstractCollect> loader = 
ServiceLoader.load(AbstractCollect.class);
+    for (AbstractCollect collect : loader) {
+        COLLECT_STRATEGY.put(collect.supportProtocol(), collect);
+    }
+}
+
+// META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
+org.apache.hertzbeat.collector.collect.database.JdbcCommonCollect
+org.apache.hertzbeat.collector.collect.http.HttpCollectImpl
+// ... 其他采集器
+```
+
+#### Go版本 (手动注册 + 新的自动注册机制)
+```go
+// 传统手动注册方式
+func RegisterBuiltinCollectors(service *CollectService, logger logger.Logger) {
+    jdbcCollector := database.NewJDBCCollector(logger)
+    service.RegisterCollector(jdbcCollector.SupportProtocol(), jdbcCollector)
+}
+
+// 新的自动注册机制
+// pkg/collector/basic/database/jdbc_auto_register.go
+func init() {
+    registry.RegisterCollectorFactory(
+        "jdbc",
+        func(logger logger.Logger) basic.AbstractCollector {
+            return NewJDBCCollector(logger)
+        },
+        registry.WithPriority(10),
+    )
+}
+```
+
+**主要差异**:
+- Java: 使用Java SPI机制自动发现
+- Go: 使用init()函数 + 注册中心的自动注册机制
+
+## 🔧 核心组件详解
+
+### 1. TimerDispatcher (时间轮调度器)
+
+#### 共同特性
+- 都使用HashedWheelTimer实现
+- 支持循环任务和一次性任务
+- 任务超时管理
+
+#### 差异对比
+| 特性 | Java版本 | Go版本 |
+|------|---------|--------|
+| 超时监控 | 独立的ScheduledExecutor | 集成在TimerDispatcher中 |
+| 任务存储 | ConcurrentHashMap | sync.Map |
+| 重试机制 | 基于优先级队列 | 指数退避策略 |
+| 事件监听 | EventListener接口 | CollectResponseEventListener |
+
+### 2. WorkerPool (工作线程池)
+
+#### Java版本特性
+```java
+// 基于ThreadPoolExecutor
+private ThreadPoolExecutor workerExecutor;
+
+// 动态线程池配置
+int coreSize = Math.max(2, Runtime.getRuntime().availableProcessors());
+int maxSize = Runtime.getRuntime().availableProcessors() * 16;
+workerExecutor = new ThreadPoolExecutor(coreSize, maxSize, 10, 
TimeUnit.SECONDS,
+    new SynchronousQueue<>(), threadFactory, new 
ThreadPoolExecutor.AbortPolicy());
+```
+
+#### Go版本特性
+```go
+// 基于Goroutine池
+type WorkerPool struct {
+    config     WorkerPoolConfig
+    taskQueue  chan Task
+    workers    sync.Map
+    ctx        context.Context
+    cancel     context.CancelFunc
+}
+
+// 动态Goroutine管理
+func (wp *WorkerPool) adjustWorkerCount() {
+    queuedTasks := atomic.LoadInt64(&wp.stats.QueuedTasks)
+    currentWorkers := wp.workerCount.Load()
+    
+    if queuedTasks > int64(currentWorkers)*2 && currentWorkers < 
int32(wp.config.MaxSize) {
+        wp.addWorker(false) // 添加非核心worker
+    }
+}
+```
+
+#### 差异对比
+| 特性 | Java版本 | Go版本 |
+|------|---------|--------|
+| 并发模型 | 线程池 | Goroutine池 |
+| 队列类型 | SynchronousQueue | 带缓冲的Channel |
+| 拒绝策略 | AbortPolicy | 阻塞等待 |
+| 资源管理 | JVM线程管理 | Go运行时管理 |
+
+### 3. 采集器注册机制
+
+#### Java版本 (SPI机制)
+```java
+// 优点:
+- 自动发现,无需手动注册
+- 标准Java机制,成熟稳定
+- 支持插件化扩展
+
+// 缺点:
+- 启动时全量加载
+- 难以动态控制
+- 依赖类路径配置
+```
+
+#### Go版本 (注册中心机制)
+```go
+// 优点:
+- 支持优先级控制
+- 运行时动态启用/禁用
+- 避免循环依赖
+- 更好的错误处理
+
+// 缺点:
+- 需要手动导入包
+- 相对复杂的注册流程
+```
+
+## 📊 性能对比分析
+
+### 1. 内存使用
+
+#### Java版本
+- **优势**: JVM成熟的内存管理
+- **劣势**: 较大的基础内存占用
+- **特点**: GC压力,但有成熟的调优工具
+
+#### Go版本
+- **优势**: 更小的内存占用
+- **劣势**: 需要手动管理某些资源
+- **特点**: GC延迟低,内存效率高
+
+### 2. 并发性能
+
+#### Java版本
+- **线程开销**: 每个线程约2MB栈空间
+- **上下文切换**: 相对较重
+- **适用场景**: CPU密集型任务
+
+#### Go版本
+- **Goroutine开销**: 每个约2KB栈空间
+- **上下文切换**: 非常轻量
+- **适用场景**: IO密集型任务
+
+### 3. 启动速度
+
+#### Java版本
+- **JVM启动**: 相对较慢
+- **类加载**: SPI机制需要扫描classpath
+- **预热时间**: JIT编译需要时间
+
+#### Go版本
+- **编译启动**: 非常快速
+- **静态链接**: 无需额外依赖
+- **即时性能**: 无需预热
+
+## 🔮 发展路线图
+
+### 短期目标 (已完成)
+- ✅ 基础调度框架
+- ✅ JDBC采集器实现
+- ✅ 自动注册机制
+- ✅ 超时监控和重试
+
+### 中期目标 (进行中)
+- 🔄 完善网络通信层
+- 🔄 实现更多协议采集器 (HTTP, SSH, SNMP等)
+- 🔄 集群模式支持
+- 🔄 配置文件驱动的采集器管理
+
+### 长期目标 (规划中)
+- 📋 插件化架构
+- 📋 热更新能力
+- 📋 云原生部署支持
+- 📋 AI驱动的智能调度
+
+## 🛠️ 开发指南
+
+### 1. 添加新的采集器
+
+```go
+// 步骤1: 实现采集器接口
+type RedisCollector struct {
+    logger logger.Logger
+}
+
+func (rc *RedisCollector) SupportProtocol() string {
+    return "redis"
+}
+
+func (rc *RedisCollector) Collect(metrics *jobtypes.Metrics) 
*jobtypes.CollectRepMetricsData {
+    // 实现采集逻辑
+}
+
+// 步骤2: 添加自动注册
+func init() {
+    registry.RegisterCollectorFactory("redis", 
+        func(logger logger.Logger) basic.AbstractCollector {
+            return NewRedisCollector(logger)
+        },
+        registry.WithPriority(15),
+    )
+}
+
+// 步骤3: 在registry.go中添加导入
+_ "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic/redis"
+```
+
+### 2. 调试和监控
+
+```go
+// 查看调度器状态
+stats := timerDispatcher.GetStats()
+logger.Info("调度器状态", 
+    "cyclicJobs", stats.CyclicJobs,
+    "tempJobs", stats.TempJobs,
+    "executedJobs", stats.ExecutedJobs)
+
+// 查看工作池状态
+poolStats, queueStats := workerPool.GetStats()
+logger.Info("工作池状态",
+    "activeWorkers", poolStats.ActiveWorkers,
+    "queuedTasks", poolStats.QueuedTasks)
+```
+
+### 3. 性能优化建议
+
+#### 调度器优化
+```go
+// 1. 合理设置时间轮参数
+timerDispatcher := timer.NewTimerDispatcher(logger)
+timerDispatcher.SetWheelSize(512)  // 根据任务数量调整
+timerDispatcher.SetTickDuration(100 * time.Millisecond)
+
+// 2. 批量处理任务
+func (td *TimerDispatcher) BatchDispatch(jobs []*jobtypes.Job) error {
+    for _, job := range jobs {
+        td.AddJob(job, nil)
+    }
+}
+```
+
+#### 工作池优化
+```go
+// 1. 根据任务特性调整池大小
+config := worker.WorkerPoolConfig{
+    CoreSize:    runtime.NumCPU(),           // CPU密集型
+    MaxSize:     runtime.NumCPU() * 4,      // IO密集型可以更大
+    QueueSize:   1000,                      // 根据内存情况调整
+    IdleTimeout: 60 * time.Second,
+}
+
+// 2. 任务优先级处理
+type PriorityTask struct {
+    Priority int
+    Task     Task
+}
+```
+
+## 📝 最佳实践
+
+### 1. 错误处理
+```go
+// 采集器中的错误处理
+func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics) 
*jobtypes.CollectRepMetricsData {
+    defer func() {
+        if r := recover(); r != nil {
+            jc.logger.Error(fmt.Errorf("panic in JDBC collect: %v", r), 
"采集器panic")
+        }
+    }()
+    
+    // 具体采集逻辑...
+}
+```
+
+### 2. 资源管理
+```go
+// 连接池管理
+type JDBCCollector struct {
+    connectionPool map[string]*sql.DB
+    poolMutex      sync.RWMutex
+}
+
+func (jc *JDBCCollector) getConnection(dsn string) (*sql.DB, error) {
+    jc.poolMutex.RLock()
+    if conn, exists := jc.connectionPool[dsn]; exists {
+        jc.poolMutex.RUnlock()
+        return conn, nil
+    }
+    jc.poolMutex.RUnlock()
+    
+    // 创建新连接...
+}
+```
+
+### 3. 配置管理
+```go
+// 支持配置文件和环境变量
+type CollectorConfig struct {
+    JDBC struct {
+        MaxConnections    int           `yaml:"max_connections" 
env:"JDBC_MAX_CONNECTIONS"`
+        ConnectionTimeout time.Duration `yaml:"connection_timeout" 
env:"JDBC_CONNECTION_TIMEOUT"`
+        QueryTimeout      time.Duration `yaml:"query_timeout" 
env:"JDBC_QUERY_TIMEOUT"`
+    } `yaml:"jdbc"`
+}
+```
+
+## 🎯 总结
+
+HertzBeat Go版本的调度架构在保持与Java版本核心理念一致的同时,充分利用了Go语言的特性优势:
+
+### 核心优势
+1. **高并发性能**: Goroutine模型适合IO密集的采集任务
+2. **低资源占用**: 更小的内存占用和更快的启动速度
+3. **简洁架构**: 减少了中间层,调度链路更直接
+4. **类型安全**: 编译时类型检查,减少运行时错误
+
+### 主要差异
+1. **网络层**: 待完善,目前使用模拟接口
+2. **采集器注册**: 从SPI改为init()函数 + 注册中心
+3. **并发模型**: 从线程池改为Goroutine池
+4. **错误处理**: 更显式的错误处理机制
+
+### 发展方向
+Go版本将在保持高性能和低资源占用优势的基础上,逐步完善网络通信、集群支持等企业级特性,最终实现与Java版本功能对等但性能更优的目标。
+
+---
+
+*文档版本: v1.0*  
+*最后更新: 2024年1月*  
+*维护者: HertzBeat Go Team*
diff --git a/examples/main_simulation.go b/examples/main_simulation.go
new file mode 100644
index 0000000..2756b22
--- /dev/null
+++ b/examples/main_simulation.go
@@ -0,0 +1,289 @@
+package main
+
+import (
+       "context"
+       "flag"
+       "fmt"
+       "os"
+       "os/signal"
+       "syscall"
+       "time"
+
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/dispatcher/entrance"
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types"
+       jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job"
+)
+
+// go build -ldflags "-X main.Version=x.y.z"
+var (
+       ConfPath   string
+       Version    string
+       Simulation bool // 新增:是否启用模拟模式
+)
+
+func init() {
+       flag.StringVar(&ConfPath, "conf", "hertzbeat-collector.yaml", "path to 
config file")
+       flag.BoolVar(&Simulation, "simulation", true, "enable manager task 
simulation mode")
+}
+
+func main() {
+       flag.Parse()
+
+       // 初始化日志
+       log := logger.DefaultLogger(os.Stdout, types.LogLevelInfo)
+       log.Info("🚀 启动HertzBeat Collector", "version", Version, "simulation", 
Simulation)
+       Simulation = true
+       if Simulation {
+               // 模拟模式:启动完整的CollectServer并模拟Manager发送任务
+               runSimulationMode(log)
+       } else {
+               // 正常模式:启动标准的collector服务
+               if err := collector.Bootstrap(ConfPath, Version); err != nil {
+                       log.Error(err, "❌ 启动collector失败")
+                       os.Exit(1)
+               }
+       }
+}
+
+// runSimulationMode 运行模拟Manager发送任务的模式
+func runSimulationMode(log logger.Logger) {
+       log.Info("🎭 启动模拟模式:模拟Manager发送JDBC采集任务")
+
+       // 1. 创建CollectServer (完整的采集器架构)
+       config := entrance.DefaultServerConfig()
+       collectServer, err := entrance.NewCollectServer(config, log)
+       if err != nil {
+               log.Error(err, "❌ 创建CollectServer失败")
+               return
+       }
+
+       // 2. 启动CollectServer
+       if err := collectServer.Start(); err != nil {
+               log.Error(err, "❌ 启动CollectServer失败")
+               return
+       }
+       defer collectServer.Stop()
+
+       log.Info("✅ CollectServer已启动,准备接收Manager任务")
+
+       // 3. 创建模拟的Manager任务
+       jdbcJob := createManagerJDBCTask()
+
+       // 4. 启动任务模拟器
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       // 启动模拟Manager发送任务的goroutine
+       go simulateManagerTasks(ctx, collectServer, jdbcJob, log)
+
+       // 5. 等待中断信号
+       log.Info("🔄 模拟器运行中 (每60秒模拟Manager发送任务),按Ctrl+C停止...")
+       sigCh := make(chan os.Signal, 1)
+       signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
+       <-sigCh
+
+       log.Info("📴 收到停止信号,正在关闭...")
+       cancel()
+       time.Sleep(2 * time.Second)
+       log.Info("👋 模拟器已停止")
+}
+
+// createManagerJDBCTask 创建模拟从Manager接收的JDBC采集任务
+func createManagerJDBCTask() *jobtypes.Job {
+       return &jobtypes.Job{
+               ID:              1001,
+               TenantID:        1,
+               MonitorID:       2001,
+               App:             "mysql",
+               Category:        "database",
+               IsCyclic:        true,
+               DefaultInterval: 30, // 30秒间隔
+               Timestamp:       time.Now().Unix(),
+
+               // Manager发送的任务元数据
+               Metadata: map[string]string{
+                       "instancename":       "生产MySQL实例",
+                       "instancehost":       "localhost:43306",
+                       "manager_node":       "hertzbeat-manager-001",
+                       "task_source":        "manager_scheduler",
+                       "collector_assigned": "auto",
+               },
+
+               Labels: map[string]string{
+                       "env":        "production",
+                       "database":   "mysql",
+                       "region":     "localhost",
+                       "managed_by": "hertzbeat",
+                       "priority":   "high",
+               },
+
+               Annotations: map[string]string{
+                       "description":   "MySQL数据库性能监控",
+                       "created_by":    "manager-scheduler",
+                       "task_type":     "cyclic_monitoring",
+                       "alert_enabled": "true",
+               },
+
+               // JDBC采集指标配置
+               Metrics: []jobtypes.Metrics{
+                       {
+                               Name:     "mysql_basic_info",
+                               Priority: 0,
+                               Protocol: "jdbc",
+                               Host:     "localhost",
+                               Port:     "3306",
+                               Timeout:  "15s",
+                               Interval: 30,
+                               Fields: []jobtypes.Field{
+                                       {Field: "database_name", Type: 1, 
Label: true},
+                                       {Field: "version", Type: 1, Label: 
false},
+                                       {Field: "uptime_seconds", Type: 0, 
Label: false},
+                                       {Field: "server_id", Type: 0, Label: 
false},
+                               },
+                               JDBC: &jobtypes.JDBCProtocol{
+                                       Host:      "localhost",
+                                       Port:      "3306",
+                                       Platform:  "mysql",
+                                       Username:  "root",
+                                       Password:  "password", // 请修改为实际密码
+                                       Database:  "mysql",
+                                       Timeout:   "15",
+                                       QueryType: "oneRow",
+                                       SQL: `SELECT 
+                                               DATABASE() as database_name,
+                                               VERSION() as version,
+                                               (SELECT VARIABLE_VALUE FROM 
INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME='Uptime') as 
uptime_seconds,
+                                               @@server_id as server_id`,
+                               },
+                       },
+               },
+       }
+}
+
+// simulateManagerTasks 模拟Manager定期发送采集任务
+func simulateManagerTasks(ctx context.Context, collectServer 
*entrance.CollectServer, job *jobtypes.Job, log logger.Logger) {
+       log.Info("📡 开始模拟Manager任务调度", "jobId", job.ID, "interval", "60s")
+
+       // 创建任务响应监听器 (模拟发送结果回Manager)
+       eventListener := &ManagerResponseSimulator{
+               logger:    log,
+               jobID:     job.ID,
+               monitorID: job.MonitorID,
+       }
+
+       // 立即发送第一个任务 (模拟Manager初始调度)
+       log.Info("📤 Manager发送初始任务", "timestamp", time.Now().Format("15:04:05"))
+       if err := collectServer.ReceiveJob(job, eventListener); err != nil {
+               log.Error(err, "❌ 接收初始任务失败")
+               return
+       }
+
+       // 模拟Manager定期发送任务更新/重新调度
+       ticker := time.NewTicker(60 * time.Second) // 每60秒模拟一次Manager调度
+       defer ticker.Stop()
+
+       taskSequence := 1
+
+       for {
+               select {
+               case <-ticker.C:
+                       taskSequence++
+
+                       // 创建任务更新 (模拟Manager重新调度或配置更新)
+                       updatedJob := *job
+                       updatedJob.Timestamp = time.Now().Unix()
+                       updatedJob.ID = job.ID + int64(taskSequence)
+
+                       // 更新任务元数据 (模拟Manager的管理信息)
+                       updatedJob.Metadata["task_sequence"] = 
fmt.Sprintf("%d", taskSequence)
+                       updatedJob.Metadata["last_scheduled"] = 
time.Now().Format(time.RFC3339)
+                       updatedJob.Metadata["scheduler_version"] = "v1.0.0"
+
+                       log.Info("📤 Manager发送任务更新",
+                               "sequence", taskSequence,
+                               "jobId", updatedJob.ID,
+                               "timestamp", time.Now().Format("15:04:05"))
+
+                       // 发送更新任务到Collector
+                       if err := collectServer.ReceiveJob(&updatedJob, 
eventListener); err != nil {
+                               log.Error(err, "❌ 接收任务更新失败", "sequence", 
taskSequence)
+                       }
+
+               case <-ctx.Done():
+                       log.Info("🛑 停止模拟Manager任务调度")
+                       return
+               }
+       }
+}
+
+// ManagerResponseSimulator 模拟Manager接收采集结果的响应处理器
+type ManagerResponseSimulator struct {
+       logger    logger.Logger
+       jobID     int64
+       monitorID int64
+}
+
+// Response 处理采集结果 (模拟发送给Manager的过程)
+func (mrs *ManagerResponseSimulator) Response(metricsData []interface{}) {
+       mrs.logger.Info("📨 Collector采集完成,准备发送结果到Manager",
+               "jobId", mrs.jobID,
+               "monitorId", mrs.monitorID,
+               "metricsCount", len(metricsData))
+
+       // 模拟处理每个采集指标的结果
+       for i, data := range metricsData {
+               if collectData, ok := data.(*jobtypes.CollectRepMetricsData); 
ok {
+                       mrs.logger.Info("📊 处理采集指标",
+                               "metricIndex", i+1,
+                               "metricName", collectData.Metrics,
+                               "code", collectData.Code,
+                               "time", collectData.Time)
+
+                       if collectData.Code == 200 {
+                               mrs.logger.Info("✅ 指标采集成功",
+                                       "metric", collectData.Metrics,
+                                       "fieldsCount", len(collectData.Fields),
+                                       "valuesCount", len(collectData.Values))
+
+                               // 打印采集数据 (模拟发送到Manager的数据)
+                               for rowIndex, valueRow := range 
collectData.Values {
+                                       mrs.logger.Info("📈 采集数据",
+                                               "metric", collectData.Metrics,
+                                               "row", rowIndex+1,
+                                               "values", valueRow.Columns)
+                               }
+                       } else {
+                               mrs.logger.Error(fmt.Errorf("采集失败"), "❌ 指标采集错误",
+                                       "metric", collectData.Metrics,
+                                       "code", collectData.Code,
+                                       "message", collectData.Msg)
+                       }
+               }
+       }
+
+       // 模拟发送HTTP响应到Manager
+       mrs.simulateHTTPResponseToManager(metricsData)
+}
+
+// simulateHTTPResponseToManager 模拟通过HTTP将采集结果发送回Manager
+func (mrs *ManagerResponseSimulator) simulateHTTPResponseToManager(metricsData 
[]interface{}) {
+       // 模拟HTTP请求参数
+       managerEndpoint := "http://hertzbeat-manager:1157/api/collector/collect";
+       collectorIdentity := "collector-go-001"
+
+       mrs.logger.Info("🚀 模拟HTTP请求发送采集结果",
+               "endpoint", managerEndpoint,
+               "collectorId", collectorIdentity,
+               "jobId", mrs.jobID,
+               "dataCount", len(metricsData),
+               "timestamp", time.Now().Format("15:04:05"))
+
+       // 这里可以添加真实的HTTP客户端代码
+       // httpClient.Post(managerEndpoint, jsonData)
+
+       mrs.logger.Info("📤 采集结果已发送到Manager (模拟成功)",
+               "status", "200 OK",
+               "responseTime", "15ms")
+}
diff --git a/pkg/banner/embed.go b/pkg/banner/embed.go
index 0da67df..cf4c31e 100644
--- a/pkg/banner/embed.go
+++ b/pkg/banner/embed.go
@@ -5,19 +5,23 @@ import (
        "os"
        "strconv"
        "text/template"
-
-       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/server"
 )
 
 //go:embed banner.txt
 var EmbedLogo embed.FS
 
+// LoggerInterface defines the interface for logging
+type LoggerInterface interface {
+       Error(err error, msg string)
+       Info(msg string, keysAndValues ...interface{})
+}
+
 type Banner struct {
-       server *server.CollectorServer
+       logger LoggerInterface
 }
 
-func New(server *server.CollectorServer) *Banner {
-       return &Banner{server: server}
+func New(logger LoggerInterface) *Banner {
+       return &Banner{logger: logger}
 }
 
 type bannerVars struct {
@@ -28,15 +32,19 @@ type bannerVars struct {
 }
 
 func (b *Banner) PrintBanner(appName, port string) error {
+       return b.PrintBannerWithVersion(appName, port, "unknown")
+}
+
+func (b *Banner) PrintBannerWithVersion(appName, port, version string) error {
        data, err := EmbedLogo.ReadFile("banner.txt")
        if err != nil {
-               b.server.Logger.Error(err, "read banner file failed")
+               b.logger.Error(err, "read banner file failed")
                return err
        }
 
        tmpl, err := template.New("banner").Parse(string(data))
        if err != nil {
-               b.server.Logger.Error(err, "parse banner template failed")
+               b.logger.Error(err, "parse banner template failed")
                return err
        }
 
@@ -44,7 +52,7 @@ func (b *Banner) PrintBanner(appName, port string) error {
                CollectorName: appName,
                ServerPort:    port,
                Pid:           strconv.Itoa(os.Getpid()),
-               Version:       b.server.Version,
+               Version:       version,
        }
 
        err = tmpl.Execute(os.Stdout, vars)
diff --git a/pkg/collector/basic/database/jdbc_auto_register.go 
b/pkg/collector/basic/database/jdbc_auto_register.go
new file mode 100644
index 0000000..9eac413
--- /dev/null
+++ b/pkg/collector/basic/database/jdbc_auto_register.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package database
+
+import (
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic"
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/registry"
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
+)
+
+// init 函数会在包被导入时自动执行,完成JDBC采集器的自动注册
+func init() {
+       // 注册JDBC采集器工厂函数到全局注册中心
+       registry.RegisterCollectorFactory(
+               "jdbc", // 协议名称
+               func(logger logger.Logger) basic.AbstractCollector {
+                       return NewJDBCCollector(logger)
+               },
+               registry.WithPriority(10), // 高优先级,数据库采集很重要
+       )
+}
diff --git a/pkg/collector/bootstrap.go b/pkg/collector/bootstrap.go
index 999f9fe..024b0a9 100644
--- a/pkg/collector/bootstrap.go
+++ b/pkg/collector/bootstrap.go
@@ -7,47 +7,27 @@ import (
        "syscall"
 
        "hertzbeat.apache.org/hertzbeat-collector-go/pkg/banner"
-       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/config"
-       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/server"
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types"
 )
 
 func Bootstrap(confPath, version string) error {
+       // Initialize logger
+       log := logger.DefaultLogger(os.Stdout, types.LogLevelInfo)
+       log.Info("starting HertzBeat Collector Go", "version", version)
 
-       // Init collector server
-       cs := server.NewCollectorServer(version)
-
-       // Load HertzBeat collector config
-       loader := config.New(confPath, cs, nil)
-       cfg, err := loader.LoadConfig()
-       if err != nil {
-               cs.Logger.Error(err, "load collector config failed")
-               return err
-       }
-       err = loader.ValidateConfig(cfg)
-       if err != nil {
-               cs.Logger.Error(err, "validate collector config failed")
-               return err
-       }
-
-       // todo: optimize log init eg. dynamic update log level
-
-       // render banner
-       err = banner.New(cs).PrintBanner(cfg.Collector.Info.Name, 
cfg.Collector.Info.Port)
+       // Print banner
+       bannerPrinter := banner.New(&BannerAdapter{logger: log})
+       err := bannerPrinter.PrintBannerWithVersion("HertzBeat-Collector-Go", 
"1159", version)
        if err != nil {
-               cs.Logger.Error(err, "print banner failed")
+               log.Error(err, "failed to print banner")
                return err
        }
 
-       // Load collector job
-
-       // check collector server
-       err = cs.Validate()
-       if err != nil {
-               cs.Logger.Error(err, "validate collector server failed")
-               return err
-       }
+       // Create and initialize the collector application
+       app := NewCollectorApp(confPath, version, log)
 
-       // Start collector server
+       // Setup graceful shutdown
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
@@ -55,17 +35,82 @@ func Bootstrap(confPath, version string) error {
                sigCh := make(chan os.Signal, 1)
                signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
                <-sigCh
+               log.Info("received shutdown signal")
                cancel()
        }()
 
-       err = cs.Start(ctx)
-       if err != nil {
-               cs.Logger.Error(err, "start collector server failed")
+       // Start the application
+       if err := app.Start(); err != nil {
+               log.Error(err, "failed to start collector application")
+               return err
+       }
+
+       // Wait for shutdown signal
+       <-ctx.Done()
+
+       // Graceful shutdown
+       log.Info("shutting down collector application")
+       if err := app.Stop(); err != nil {
+               log.Error(err, "error during shutdown")
                return err
        }
 
-       // shutdown collector server
-       _ = cs.Close()
+       log.Info("HertzBeat Collector Go stopped successfully")
+       return nil
+}
+
+// CollectorApp wraps the complete collector application
+type CollectorApp struct {
+       confPath string
+       version  string
+       logger   logger.Logger
+
+       // Core components
+       collectService *CollectService
+}
+
+// NewCollectorApp creates a new collector application
+func NewCollectorApp(confPath, version string, logger logger.Logger) 
*CollectorApp {
+       return &CollectorApp{
+               confPath: confPath,
+               version:  version,
+               logger:   logger.WithName("collector-app"),
+       }
+}
+
+// Start starts the collector application
+func (app *CollectorApp) Start() error {
+       app.logger.Info("initializing collector application", "version", 
app.version)
+
+       // Initialize collect service with enhanced scheduling
+       app.collectService = NewCollectService(app.logger)
+
+       // Register built-in collectors
+       RegisterBuiltinCollectors(app.collectService, app.logger)
+
+       app.logger.Info("collector application started successfully")
+       return nil
+}
+
+// Stop stops the collector application
+func (app *CollectorApp) Stop() error {
+       app.logger.Info("stopping collector application")
 
+       // TODO: Implement graceful shutdown of components
+
+       app.logger.Info("collector application stopped successfully")
        return nil
 }
+
+// BannerAdapter adapts logger interface for banner printer
+type BannerAdapter struct {
+       logger logger.Logger
+}
+
+func (ba *BannerAdapter) Error(err error, msg string) {
+       ba.logger.Error(err, msg)
+}
+
+func (ba *BannerAdapter) Info(msg string, keysAndValues ...interface{}) {
+       ba.logger.Info(msg, keysAndValues...)
+}
diff --git a/pkg/collector/collect_service.go b/pkg/collector/collect_service.go
index 0f970be..9862b03 100644
--- a/pkg/collector/collect_service.go
+++ b/pkg/collector/collect_service.go
@@ -2,23 +2,30 @@ package collector
 
 import (
        "fmt"
+       "time"
 
        "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic"
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/timer"
        "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
        jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job"
 )
 
 // CollectService manages all collectors and provides a unified interface
 type CollectService struct {
-       registry *basic.CollectorRegistry
-       logger   logger.Logger
+       registry        *basic.CollectorRegistry
+       logger          logger.Logger
+       timerDispatcher *timer.TimerDispatcher // Optional: for advanced task 
management
+       defaultPriority int                    // Default task priority
+       timeoutRetries  int                    // Default retry count for 
timeouts
 }
 
 // NewCollectService creates a new collect service
 func NewCollectService(logger logger.Logger) *CollectService {
        service := &CollectService{
-               registry: basic.NewCollectorRegistry(logger),
-               logger:   logger.WithName("collect-service"),
+               registry:        basic.NewCollectorRegistry(logger),
+               logger:          logger.WithName("collect-service"),
+               defaultPriority: 5, // Medium priority (1-10 scale)
+               timeoutRetries:  3, // Default retry count
        }
 
        // Note: Collectors should be registered externally to avoid circular 
dependencies
@@ -27,6 +34,32 @@ func NewCollectService(logger logger.Logger) *CollectService 
{
        return service
 }
 
+// SetTimerDispatcher sets the timer dispatcher for advanced task management
+func (cs *CollectService) SetTimerDispatcher(dispatcher 
*timer.TimerDispatcher) {
+       cs.timerDispatcher = dispatcher
+       cs.logger.Info("timer dispatcher configured for collect service")
+}
+
+// SetDefaultPriority sets the default priority for collection tasks
+func (cs *CollectService) SetDefaultPriority(priority int) {
+       if priority < 1 || priority > 10 {
+               cs.logger.Info("invalid priority, using default", "provided", 
priority, "default", cs.defaultPriority)
+               return
+       }
+       cs.defaultPriority = priority
+       cs.logger.Info("default priority updated", "priority", priority)
+}
+
+// SetTimeoutRetries sets the default retry count for timeout handling
+func (cs *CollectService) SetTimeoutRetries(retries int) {
+       if retries < 0 || retries > 10 {
+               cs.logger.Info("invalid retry count, using default", 
"provided", retries, "default", cs.timeoutRetries)
+               return
+       }
+       cs.timeoutRetries = retries
+       cs.logger.Info("timeout retries updated", "retries", retries)
+}
+
 // RegisterCollector registers a collector with the service
 func (cs *CollectService) RegisterCollector(protocol string, collector 
basic.AbstractCollector) {
        cs.registry.Register(protocol, collector)
@@ -100,8 +133,209 @@ func (cs *CollectService) GetCollector(protocol string) 
(basic.AbstractCollector
 
 // DispatchMetricsTask implements MetricsTaskDispatcher interface
 func (cs *CollectService) DispatchMetricsTask(timeout *jobtypes.Timeout) error 
{
-       cs.logger.Info("dispatching metrics task", "timeout", timeout)
-       // TODO: Implement actual metrics task dispatching logic
-       // This would typically involve creating MetricsCollect tasks and 
submitting them to a worker pool
+       if timeout == nil {
+               cs.logger.Error(fmt.Errorf("timeout is nil"), "failed to 
dispatch metrics task")
+               return fmt.Errorf("timeout cannot be nil")
+       }
+
+       cs.logger.Info("dispatching metrics task")
+
+       // Extract job information from timeout task
+       var job *jobtypes.Job
+       if wheelTask, ok := timeout.Task().(*timer.WheelTimerTask); ok {
+               job = wheelTask.GetJob()
+       } else {
+               cs.logger.Error(fmt.Errorf("timeout task is not a 
WheelTimerTask"), "failed to dispatch metrics task")
+               return fmt.Errorf("timeout task must be a WheelTimerTask")
+       }
+
+       if job == nil {
+               cs.logger.Error(fmt.Errorf("job is nil"), "failed to dispatch 
metrics task")
+               return fmt.Errorf("job cannot be nil")
+       }
+
+       // Get metrics to collect for this job
+       metricsSet := job.GetNextCollectMetrics()
+       if len(metricsSet) == 0 {
+               cs.logger.Info("no metrics to collect for job", "job_id", 
job.ID, "app", job.App)
+               return nil
+       }
+
+       cs.logger.Info("creating metrics collection tasks",
+               "job_id", job.ID,
+               "app", job.App,
+               "metrics_count", len(metricsSet))
+
+       // Create and submit collection tasks for each metrics
+       var lastError error
+       successCount := 0
+
+       for _, metrics := range metricsSet {
+               if err := cs.createAndSubmitTask(metrics, timeout, job); err != 
nil {
+                       cs.logger.Error(err, "failed to create metrics 
collection task",
+                               "metrics", metrics.Name,
+                               "protocol", metrics.Protocol)
+                       lastError = err
+               } else {
+                       successCount++
+               }
+       }
+
+       cs.logger.Info("metrics tasks dispatched",
+               "total", len(metricsSet),
+               "success", successCount,
+               "failed", len(metricsSet)-successCount)
+
+       // Return error only if all tasks failed
+       if successCount == 0 && lastError != nil {
+               return fmt.Errorf("all metrics tasks failed: %w", lastError)
+       }
+
+       return nil
+}
+
+// createAndSubmitTask creates a metrics collection task and submits it to the 
worker pool
+func (cs *CollectService) createAndSubmitTask(metrics *jobtypes.Metrics, 
timeout *jobtypes.Timeout, job *jobtypes.Job) error {
+       // Validate metrics
+       if metrics == nil {
+               return fmt.Errorf("metrics is nil")
+       }
+
+       // Check if we have a collector for this protocol
+       if _, exists := cs.registry.GetCollector(metrics.Protocol); !exists {
+               return fmt.Errorf("no collector found for protocol: %s", 
metrics.Protocol)
+       }
+
+       cs.logger.Info("creating metrics collection task",
+               "metrics", metrics.Name,
+               "protocol", metrics.Protocol,
+               "host", metrics.Host,
+               "port", metrics.Port)
+
+       // Calculate task priority based on metrics configuration
+       taskPriority := cs.calculateTaskPriority(metrics, job)
+
+       // Add timeout monitoring if timer dispatcher is available
+       if cs.timerDispatcher != nil {
+               cs.timerDispatcher.AddMetricsTimeout(metrics, timeout)
+               defer cs.timerDispatcher.RemoveMetricsTimeout(metrics, timeout)
+       }
+
+       // Perform collection with priority and timeout handling
+       start := time.Now()
+       result := cs.collectWithPriority(metrics, timeout, taskPriority)
+       duration := time.Since(start)
+
+       if result == nil {
+               return fmt.Errorf("collection returned nil result")
+       }
+
+       cs.logger.Info("metrics collection completed",
+               "metrics", metrics.Name,
+               "protocol", metrics.Protocol,
+               "priority", taskPriority,
+               "duration", duration,
+               "code", result.Code,
+               "message", result.Msg)
+
+       // TODO: Dispatch collected data to appropriate handlers
+       // This would typically involve calling 
collectDataDispatch.dispatchCollectData()
+
        return nil
 }
+
+// calculateTaskPriority calculates the priority for a metrics collection task
+func (cs *CollectService) calculateTaskPriority(metrics *jobtypes.Metrics, job 
*jobtypes.Job) int {
+       priority := cs.defaultPriority
+
+       // Adjust priority based on metrics configuration
+       if metrics.Priority > 0 {
+               priority = metrics.Priority
+       }
+
+       // Adjust priority based on protocol criticality
+       switch metrics.Protocol {
+       case "icmp", "ping":
+               priority += 2 // High priority for availability checks
+       case "http", "https":
+               priority += 1 // Medium-high priority for web services
+       case "snmp":
+               priority -= 1 // Lower priority for SNMP (usually less 
time-sensitive)
+       }
+
+       // Ensure priority stays within valid range
+       if priority < 1 {
+               priority = 1
+       } else if priority > 10 {
+               priority = 10
+       }
+
+       return priority
+}
+
+// collectWithPriority performs metrics collection with priority and timeout 
handling
+func (cs *CollectService) collectWithPriority(metrics *jobtypes.Metrics, 
timeout *jobtypes.Timeout, priority int) *jobtypes.CollectRepMetricsData {
+       // Create channel for result
+       resultChan := make(chan *jobtypes.CollectRepMetricsData, 1)
+
+       // Calculate timeout duration based on priority (higher priority gets 
more time)
+       baseTimeout := 60 * time.Second
+       if metrics.Timeout != "" {
+               if duration, err := time.ParseDuration(metrics.Timeout); err == 
nil {
+                       baseTimeout = duration
+               }
+       }
+
+       // Adjust timeout based on priority
+       timeoutDuration := baseTimeout
+       if priority >= 8 {
+               timeoutDuration = baseTimeout + (baseTimeout / 2) // 50% more 
time for high priority
+       } else if priority <= 3 {
+               timeoutDuration = baseTimeout - (baseTimeout / 4) // 25% less 
time for low priority
+       }
+
+       cs.logger.Info("starting prioritized collection",
+               "metrics", metrics.Name,
+               "protocol", metrics.Protocol,
+               "priority", priority,
+               "timeout", timeoutDuration)
+
+       // Perform collection in goroutine
+       go func() {
+               defer func() {
+                       if r := recover(); r != nil {
+                               cs.logger.Error(fmt.Errorf("panic in 
collection: %v", r), "metrics collection panicked",
+                                       "metrics", metrics.Name, "protocol", 
metrics.Protocol, "priority", priority)
+                               resultChan <- &jobtypes.CollectRepMetricsData{
+                                       Code: 500,
+                                       Msg:  fmt.Sprintf("collection panicked: 
%v", r),
+                               }
+                       }
+               }()
+
+               // Add priority-based delay for lower priority tasks (simple 
congestion control)
+               if priority <= 3 {
+                       time.Sleep(time.Duration(5-priority) * 100 * 
time.Millisecond)
+               }
+
+               result := cs.Collect(metrics)
+               resultChan <- result
+       }()
+
+       // Wait for result or timeout
+       select {
+       case result := <-resultChan:
+               return result
+       case <-time.After(timeoutDuration):
+               cs.logger.Info("metrics collection timed out",
+                       "metrics", metrics.Name,
+                       "protocol", metrics.Protocol,
+                       "priority", priority,
+                       "timeout", timeoutDuration)
+
+               return &jobtypes.CollectRepMetricsData{
+                       Code: 504, // Gateway Timeout
+                       Msg:  fmt.Sprintf("collection timed out after %v 
(priority: %d)", timeoutDuration, priority),
+               }
+       }
+}
diff --git a/pkg/collector/common/dispatcher/entrance/collect_server.go 
b/pkg/collector/common/dispatcher/entrance/collect_server.go
index c657f81..8e66e87 100644
--- a/pkg/collector/common/dispatcher/entrance/collect_server.go
+++ b/pkg/collector/common/dispatcher/entrance/collect_server.go
@@ -5,9 +5,11 @@ import (
        "sync"
        "time"
 
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector"
        "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/timer"
        "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/worker"
        "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
+       jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job"
 )
 
 // CollectServer manages the complete collection infrastructure including
@@ -20,6 +22,7 @@ type CollectServer struct {
        timerDispatcher   *timer.TimerDispatcher
        workerPool        *worker.WorkerPool
        collectJobService *CollectJobService
+       collectService    *collector.CollectService
        networkClient     *NetworkClient
 
        // Component states
@@ -138,6 +141,11 @@ func (cs *CollectServer) initializeComponents() error {
        // Initialize timer dispatcher
        cs.timerDispatcher = timer.NewTimerDispatcher(cs.logger)
 
+       // Initialize collect service with enhanced scheduling capabilities
+       cs.collectService = collector.NewCollectService(cs.logger)
+       cs.collectService.SetTimerDispatcher(cs.timerDispatcher)
+       collector.RegisterBuiltinCollectors(cs.collectService, cs.logger)
+
        // Initialize collect job service
        cs.collectJobService = NewCollectJobService(
                cs.timerDispatcher,
@@ -163,14 +171,18 @@ func (cs *CollectServer) wireComponents() {
        // Set network client in collect job service
        cs.collectJobService.SetNetworkClient(cs.networkClient)
 
+       // Wire collect service with timer dispatcher for enhanced scheduling
+       cs.timerDispatcher.SetCollectService(cs.collectService)
+       cs.timerDispatcher.SetCollectDataDispatcher(cs.collectJobService)
+
        // Register message processors
        cs.registerMessageProcessors()
 
-       // Set collect job service as the metrics task dispatcher
-       cs.timerDispatcher.SetMetricsTaskDispatcher(cs.collectJobService)
+       // TimerDispatcher implements MetricsTaskDispatcher itself, so we use 
it as its own dispatcher
+       cs.timerDispatcher.SetMetricsDispatcher(cs.timerDispatcher)
 
-       // Set collect job service as the collect data dispatcher
-       // cs.timerDispatcher.SetCollectDataDispatcher(cs.collectJobService) // 
TODO: Fix interface compatibility
+       // TODO: Set collect service as the primary metrics task dispatcher 
(enhanced version)
+       // cs.timerDispatcher.SetMetricsDispatcher(cs.collectService)
 }
 
 // registerMessageProcessors registers all message processors with the network 
client.
@@ -287,6 +299,37 @@ func (cs *CollectServer) IsStarted() bool {
        return cs.isStarted
 }
 
+// ReceiveJob simulates receiving a job from the manager and adds it to the 
dispatcher.
+// This method is used for testing and simulation purposes when network 
communication is not available.
+func (cs *CollectServer) ReceiveJob(job *jobtypes.Job, eventListener 
timer.CollectResponseEventListener) error {
+       if !cs.isStarted {
+               return fmt.Errorf("collect server is not started")
+       }
+
+       cs.logger.Info("received job from manager (simulated)",
+               "jobId", job.ID,
+               "monitorId", job.MonitorID,
+               "app", job.App,
+               "category", job.Category,
+               "isCyclic", job.IsCyclic,
+               "interval", job.DefaultInterval)
+
+       // Add the job to the timer dispatcher for scheduling
+       if err := cs.timerDispatcher.AddJob(job, eventListener); err != nil {
+               cs.logger.Error(err, "failed to add received job to timer 
dispatcher",
+                       "jobId", job.ID,
+                       "monitorId", job.MonitorID)
+               return fmt.Errorf("failed to schedule received job: %w", err)
+       }
+
+       cs.logger.Info("job successfully scheduled for execution",
+               "jobId", job.ID,
+               "monitorId", job.MonitorID,
+               "nextExecution", "immediate")
+
+       return nil
+}
+
 // GetCollectJobService returns the collect job service.
 func (cs *CollectServer) GetCollectJobService() *CollectJobService {
        return cs.collectJobService
diff --git a/pkg/collector/common/timer/timer_dispatcher.go 
b/pkg/collector/common/timer/timer_dispatcher.go
index 1f78db5..215a406 100644
--- a/pkg/collector/common/timer/timer_dispatcher.go
+++ b/pkg/collector/common/timer/timer_dispatcher.go
@@ -42,6 +42,12 @@ type TimerDispatcher struct {
        tempTasks      sync.Map // map[int64]*jobtypes.Timeout
        eventListeners sync.Map // map[int64]CollectResponseEventListener
 
+       // Timeout monitoring
+       timeoutMonitor       sync.Map // map[string]*MetricsTimeoutInfo
+       timeoutCheckInterval time.Duration
+       timeoutCheckTicker   *time.Ticker
+       stopTimeoutMonitor   chan struct{}
+
        // State management
        started atomic.Bool
 
@@ -53,15 +59,27 @@ type TimerDispatcher struct {
        logger            logger.Logger
 }
 
+// MetricsTimeoutInfo tracks timeout information for metrics collection tasks
+type MetricsTimeoutInfo struct {
+       StartTime   time.Time
+       Metrics     *jobtypes.Metrics
+       Timeout     *jobtypes.Timeout
+       MaxDuration time.Duration
+       RetryCount  int
+       MaxRetries  int
+}
+
 // NewTimerDispatcher creates a new timer dispatcher
 func NewTimerDispatcher(logger logger.Logger) *TimerDispatcher {
        // Create worker pool with default configuration
        workerConfig := worker.DefaultWorkerPoolConfig()
 
        td := &TimerDispatcher{
-               wheelTimer: NewTimerWheel(logger.WithName("timer-wheel")),
-               workerPool: worker.NewPriorityWorkerPool(workerConfig, logger),
-               logger:     logger.WithName("timer-dispatcher"),
+               wheelTimer:           
NewTimerWheel(logger.WithName("timer-wheel")),
+               workerPool:           
worker.NewPriorityWorkerPool(workerConfig, logger),
+               timeoutCheckInterval: 10 * time.Second, // Check for timeouts 
every 10 seconds
+               stopTimeoutMonitor:   make(chan struct{}),
+               logger:               logger.WithName("timer-dispatcher"),
        }
 
        td.started.Store(true)
@@ -99,6 +117,9 @@ func (td *TimerDispatcher) Start() error {
                return fmt.Errorf("failed to start timer wheel: %w", err)
        }
 
+       // Start timeout monitoring
+       td.startTimeoutMonitoring()
+
        td.logger.Info("timer dispatcher started successfully")
        return nil
 }
@@ -110,6 +131,9 @@ func (td *TimerDispatcher) Stop() error {
        // Mark as offline
        td.GoOffline()
 
+       // Stop timeout monitoring
+       td.stopTimeoutMonitoring()
+
        // Stop the timer wheel first
        if err := td.wheelTimer.Stop(); err != nil {
                td.logger.Info("failed to stop timer wheel", "error", err)
@@ -431,3 +455,176 @@ func (td *TimerDispatcher) 
SetMetricsTaskDispatcher(dispatcher interface{}) {
        // This method is for interface compatibility with communication layer
        // The TimerDispatcher itself implements MetricsTaskDispatcher
 }
+
+// startTimeoutMonitoring starts the timeout monitoring goroutine
+func (td *TimerDispatcher) startTimeoutMonitoring() {
+       td.timeoutCheckTicker = time.NewTicker(td.timeoutCheckInterval)
+
+       go func() {
+               defer td.timeoutCheckTicker.Stop()
+
+               for {
+                       select {
+                       case <-td.timeoutCheckTicker.C:
+                               td.checkTimeouts()
+                       case <-td.stopTimeoutMonitor:
+                               td.logger.Info("timeout monitoring stopped")
+                               return
+                       }
+               }
+       }()
+
+       td.logger.Info("timeout monitoring started", "interval", 
td.timeoutCheckInterval)
+}
+
+// stopTimeoutMonitoring stops the timeout monitoring
+func (td *TimerDispatcher) stopTimeoutMonitoring() {
+       if td.timeoutCheckTicker != nil {
+               close(td.stopTimeoutMonitor)
+               td.timeoutCheckTicker.Stop()
+       }
+}
+
+// checkTimeouts checks for timed out metrics collection tasks
+func (td *TimerDispatcher) checkTimeouts() {
+       now := time.Now()
+       var timeoutKeys []string
+
+       td.timeoutMonitor.Range(func(key, value interface{}) bool {
+               timeoutKey := key.(string)
+               timeoutInfo := value.(*MetricsTimeoutInfo)
+
+               // Check if task has timed out
+               if now.Sub(timeoutInfo.StartTime) > timeoutInfo.MaxDuration {
+                       timeoutKeys = append(timeoutKeys, timeoutKey)
+
+                       td.logger.Info("metrics collection task timed out",
+                               "key", timeoutKey,
+                               "metrics", timeoutInfo.Metrics.Name,
+                               "duration", now.Sub(timeoutInfo.StartTime),
+                               "maxDuration", timeoutInfo.MaxDuration,
+                               "retryCount", timeoutInfo.RetryCount)
+               }
+
+               return true
+       })
+
+       // Handle timed out tasks
+       for _, timeoutKey := range timeoutKeys {
+               td.handleTimeout(timeoutKey)
+       }
+}
+
+// handleTimeout handles a timed out metrics collection task
+func (td *TimerDispatcher) handleTimeout(timeoutKey string) {
+       value, exists := td.timeoutMonitor.Load(timeoutKey)
+       if !exists {
+               return
+       }
+
+       timeoutInfo := value.(*MetricsTimeoutInfo)
+
+       // Remove from timeout monitor
+       td.timeoutMonitor.Delete(timeoutKey)
+
+       // Check if we should retry
+       if timeoutInfo.RetryCount < timeoutInfo.MaxRetries {
+               td.logger.Info("retrying timed out metrics collection task",
+                       "key", timeoutKey,
+                       "metrics", timeoutInfo.Metrics.Name,
+                       "retryCount", timeoutInfo.RetryCount+1,
+                       "maxRetries", timeoutInfo.MaxRetries)
+
+               // Retry the task
+               td.retryMetricsCollection(timeoutInfo)
+       } else {
+               td.logger.Info("metrics collection task exceeded max retries",
+                       "key", timeoutKey,
+                       "metrics", timeoutInfo.Metrics.Name,
+                       "retryCount", timeoutInfo.RetryCount,
+                       "maxRetries", timeoutInfo.MaxRetries)
+
+               // TODO: Notify failure to appropriate handlers
+               // This could involve sending a failure response or triggering 
alerts
+       }
+}
+
+// retryMetricsCollection retries a failed metrics collection task
+func (td *TimerDispatcher) retryMetricsCollection(timeoutInfo 
*MetricsTimeoutInfo) {
+       // Create new timeout info with incremented retry count
+       newTimeoutInfo := &MetricsTimeoutInfo{
+               StartTime:   time.Now(),
+               Metrics:     timeoutInfo.Metrics,
+               Timeout:     timeoutInfo.Timeout,
+               MaxDuration: timeoutInfo.MaxDuration,
+               RetryCount:  timeoutInfo.RetryCount + 1,
+               MaxRetries:  timeoutInfo.MaxRetries,
+       }
+
+       // Generate retry key
+       retryKey := fmt.Sprintf("%s-retry-%d",
+               td.generateTimeoutKey(timeoutInfo.Metrics, timeoutInfo.Timeout),
+               newTimeoutInfo.RetryCount)
+
+       // Add to timeout monitor
+       td.timeoutMonitor.Store(retryKey, newTimeoutInfo)
+
+       // Dispatch the retry task with exponential backoff
+       retryDelay := time.Duration(timeoutInfo.RetryCount) * 5 * time.Second
+       time.AfterFunc(retryDelay, func() {
+               if td.metricsDispatcher != nil {
+                       if err := 
td.metricsDispatcher.DispatchMetricsTask(timeoutInfo.Timeout); err != nil {
+                               td.logger.Error(err, "failed to retry metrics 
collection task",
+                                       "key", retryKey,
+                                       "metrics", timeoutInfo.Metrics.Name)
+                               // Remove from monitor if retry dispatch fails
+                               td.timeoutMonitor.Delete(retryKey)
+                       }
+               }
+       })
+}
+
+// AddMetricsTimeout adds a metrics collection task to timeout monitoring
+func (td *TimerDispatcher) AddMetricsTimeout(metrics *jobtypes.Metrics, 
timeout *jobtypes.Timeout) {
+       // Calculate max duration based on metrics timeout or default
+       maxDuration := 120 * time.Second // default 2 minutes
+       if metrics.Timeout != "" {
+               if duration, err := time.ParseDuration(metrics.Timeout); err == 
nil {
+                       maxDuration = duration
+               }
+       }
+
+       timeoutInfo := &MetricsTimeoutInfo{
+               StartTime:   time.Now(),
+               Metrics:     metrics,
+               Timeout:     timeout,
+               MaxDuration: maxDuration,
+               RetryCount:  0,
+               MaxRetries:  3, // default max retries
+       }
+
+       key := td.generateTimeoutKey(metrics, timeout)
+       td.timeoutMonitor.Store(key, timeoutInfo)
+
+       td.logger.Info("added metrics timeout monitoring",
+               "key", key,
+               "metrics", metrics.Name,
+               "maxDuration", maxDuration,
+               "maxRetries", timeoutInfo.MaxRetries)
+}
+
+// RemoveMetricsTimeout removes a metrics collection task from timeout 
monitoring
+func (td *TimerDispatcher) RemoveMetricsTimeout(metrics *jobtypes.Metrics, 
timeout *jobtypes.Timeout) {
+       key := td.generateTimeoutKey(metrics, timeout)
+       td.timeoutMonitor.Delete(key)
+
+       td.logger.Info("removed metrics timeout monitoring",
+               "key", key,
+               "metrics", metrics.Name)
+}
+
+// generateTimeoutKey generates a unique key for timeout monitoring
+func (td *TimerDispatcher) generateTimeoutKey(metrics *jobtypes.Metrics, 
timeout *jobtypes.Timeout) string {
+       // Use a combination of metrics name and timestamp to ensure uniqueness
+       return fmt.Sprintf("%s-%d-%s", metrics.Name, time.Now().UnixNano(), 
metrics.Protocol)
+}
diff --git a/pkg/collector/config/config.go b/pkg/collector/config/config.go
deleted file mode 100644
index 7b1270d..0000000
--- a/pkg/collector/config/config.go
+++ /dev/null
@@ -1,118 +0,0 @@
-package config
-
-import (
-       "context"
-       "errors"
-       "os"
-
-       "gopkg.in/yaml.v3"
-       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/server"
-
-       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
-       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types"
-)
-
-const (
-       DefaultHertzBeatCollectorName = "hertzbeat-collector"
-)
-
-type HookFunc func(c context.Context, server *server.CollectorServer) error
-
-type Loader struct {
-       cfgPath string
-       logger  logger.Logger
-       cancel  context.CancelFunc
-       server  *server.CollectorServer
-
-       hook HookFunc
-
-       // todo file watcher
-       // watcher *fsnotify.Watcher
-}
-
-func New(cfgPath string, server *server.CollectorServer, f HookFunc) *Loader {
-
-       return &Loader{
-               cfgPath: cfgPath,
-               server:  server,
-               logger:  server.Logger.WithName("collector-config-loader"),
-               hook:    f,
-       }
-}
-
-func (ld *Loader) LoadConfig() (*types.CollectorConfig, error) {
-
-       ld.runHook()
-
-       if ld.cfgPath == "" {
-               ld.logger.Info("collector-config-loader: path is empty")
-               return nil, errors.New("collector-config-loader: path is empty")
-       }
-
-       if _, err := os.Stat(ld.cfgPath); os.IsNotExist(err) {
-               ld.logger.Error(err, "collector-config-loader: file not exist", 
"path", ld.cfgPath)
-               return nil, err
-       }
-
-       file, err := os.Open(ld.cfgPath)
-       if err != nil {
-               return nil, err
-       }
-       defer func(file *os.File) {
-               err := file.Close()
-               if err != nil {
-                       ld.logger.Error(err, "close config file failed")
-               }
-       }(file)
-
-       var cfg types.CollectorConfig
-       decoder := yaml.NewDecoder(file)
-       if err := decoder.Decode(&cfg); err != nil {
-               ld.logger.Error(err, "decode config file failed")
-               return nil, err
-       }
-
-       return &cfg, nil
-}
-
-func (ld *Loader) ValidateConfig(cfg *types.CollectorConfig) error {
-
-       if cfg == nil {
-               ld.logger.Sugar().Debug("collector-config-loader is nil")
-               return errors.New("collector-config-loader is nil")
-       }
-
-       // other check
-       if cfg.Collector.Info.IP == "" {
-               ld.logger.Sugar().Debug("collector-config-loader ip is empty")
-               return errors.New("collector-config-loader ip is empty")
-       }
-
-       if cfg.Collector.Info.Port == "" {
-               ld.logger.Sugar().Debug("collector-config-loader port is empty")
-               return errors.New("collector-config-loader port is empty")
-       }
-
-       if cfg.Collector.Info.Name == "" {
-               ld.logger.Sugar().Debug("collector-config-loader: name is 
empty")
-               cfg.Collector.Info.Name = DefaultHertzBeatCollectorName
-       }
-
-       return nil
-}
-
-func (r *Loader) runHook() {
-
-       if r.hook == nil {
-               return
-       }
-
-       r.logger.Info("running hook")
-       c, cancel := context.WithCancel(context.TODO())
-       r.cancel = cancel
-       go func(ctx context.Context) {
-               if err := r.hook(ctx, r.server); err != nil {
-                       r.logger.Error(err, "hook error")
-               }
-       }(c)
-}
diff --git a/pkg/collector/config/config_test.go 
b/pkg/collector/config/config_test.go
deleted file mode 100644
index b16fc79..0000000
--- a/pkg/collector/config/config_test.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package config
-
-import (
-       "os"
-       "testing"
-)
-
-func TestLoadConfig(t *testing.T) {
-
-       content := `{
-        "collector": {
-            "version": "1.0.0",
-            "ip": "127.0.0.1"
-        },
-        "dispatcher": {}
-    }`
-       dumpfile, err := os.CreateTemp("", "collector_config_*.json")
-       if err != nil {
-               t.Fatalf("failed to create temp file: %v", err)
-       }
-       defer func(name string) {
-               err := os.Remove(name)
-               if err != nil {
-                       t.Logf("failed to remove temp file: %v", err)
-               }
-       }(dumpfile.Name())
-       if _, err := dumpfile.Write([]byte(content)); err != nil {
-               t.Fatalf("failed to write to temp file: %v", err)
-       }
-       err = dumpfile.Close()
-       if err != nil {
-               return
-       }
-
-       cfg, err := LoadConfig(dumpfile.Name())
-       if err != nil {
-               t.Fatalf("LoadConfig failed: %v", err)
-       }
-       if cfg.Collector.Version != "1.0.0" {
-               t.Errorf("expected version '1.0.0', got '%s'", 
cfg.Collector.Version)
-       }
-       if cfg.Collector.IP != "127.0.0.1" {
-               t.Errorf("expected ip '127.0.0.1', got '%s'", cfg.Collector.IP)
-       }
-}
diff --git a/pkg/collector/registry.go b/pkg/collector/registry.go
index 64af30f..7f8331c 100644
--- a/pkg/collector/registry.go
+++ b/pkg/collector/registry.go
@@ -18,23 +18,40 @@
 package collector
 
 import (
-       
"hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic/database"
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/registry"
        "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
+
+       // 导入所有采集器以触发自动注册
+       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic/database"
 )
 
-// RegisterBuiltinCollectors registers all built-in collectors with the service
-// This function should be called during application initialization to avoid 
circular dependencies
+// RegisterBuiltinCollectors 自动注册所有内置采集器
+// 使用新的自动注册机制,无需手动添加每个采集器
 func RegisterBuiltinCollectors(service *CollectService, logger logger.Logger) {
-       // Register JDBC collector
-       jdbcCollector := database.NewJDBCCollector(logger)
-       service.RegisterCollector(jdbcCollector.SupportProtocol(), 
jdbcCollector)
+       // 从注册中心创建所有启用的采集器
+       collectors, err := registry.GetGlobalCenter().CreateCollectors(logger)
+       if err != nil {
+               logger.Error(err, "创建采集器失败")
+               return
+       }
+
+       // 注册到CollectService
+       for protocol, collector := range collectors {
+               service.RegisterCollector(protocol, collector)
+               logger.Info("采集器注册到服务", "protocol", protocol)
+       }
+}
 
-       // TODO: Register other built-in collectors here
-       // httpCollector := http.NewHTTPCollector(logger)
-       // service.RegisterCollector(httpCollector.SupportProtocol(), 
httpCollector)
+// RegisterBuiltinCollectorsLegacy 传统的手动注册方式 (已废弃)
+// 保留用于兼容性,建议使用RegisterBuiltinCollectors
+//
+// Deprecated: 使用RegisterBuiltinCollectors替代,它会自动注册所有采集器
+func RegisterBuiltinCollectorsLegacy(service *CollectService, logger 
logger.Logger) {
+       logger.Info("⚠️  使用已废弃的手动注册方式,建议切换到自动注册")
 
-       // sshCollector := ssh.NewSSHCollector(logger)
-       // service.RegisterCollector(sshCollector.SupportProtocol(), 
sshCollector)
+       // 这里可以保留原来的手动注册逻辑用于兼容
+       // jdbcCollector := database.NewJDBCCollector(logger)
+       // service.RegisterCollector(jdbcCollector.SupportProtocol(), 
jdbcCollector)
 }
 
 // NewCollectServiceWithBuiltins creates a new collect service and registers 
all built-in collectors
diff --git a/pkg/collector/registry/registry_center.go 
b/pkg/collector/registry/registry_center.go
new file mode 100644
index 0000000..0cc8a4e
--- /dev/null
+++ b/pkg/collector/registry/registry_center.go
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+import (
+       "fmt"
+       "sync"
+
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic"
+       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
+)
+
+// CollectorFactory 采集器工厂函数类型
+type CollectorFactory func(logger logger.Logger) basic.AbstractCollector
+
+// CollectorRegistration 采集器注册信息
+type CollectorRegistration struct {
+       Protocol string
+       Factory  CollectorFactory
+       Enabled  bool
+       Priority int // 优先级,数字越小优先级越高
+}
+
+// RegistryCenter 注册中心
+type RegistryCenter struct {
+       registrations map[string]*CollectorRegistration
+       mutex         sync.RWMutex
+}
+
+// 全局注册中心实例
+var globalCenter = &RegistryCenter{
+       registrations: make(map[string]*CollectorRegistration),
+}
+
+// GetGlobalCenter 获取全局注册中心
+func GetGlobalCenter() *RegistryCenter {
+       return globalCenter
+}
+
+// RegisterFactory 注册采集器工厂函数
+func (rc *RegistryCenter) RegisterFactory(protocol string, factory 
CollectorFactory, options ...RegistrationOption) {
+       rc.mutex.Lock()
+       defer rc.mutex.Unlock()
+
+       registration := &CollectorRegistration{
+               Protocol: protocol,
+               Factory:  factory,
+               Enabled:  true, // 默认启用
+               Priority: 100,  // 默认优先级
+       }
+
+       // 应用选项
+       for _, option := range options {
+               option(registration)
+       }
+
+       rc.registrations[protocol] = registration
+}
+
+// RegistrationOption 注册选项函数类型
+type RegistrationOption func(*CollectorRegistration)
+
+// WithDisabled 禁用采集器选项
+func WithDisabled() RegistrationOption {
+       return func(reg *CollectorRegistration) {
+               reg.Enabled = false
+       }
+}
+
+// WithPriority 设置优先级选项
+func WithPriority(priority int) RegistrationOption {
+       return func(reg *CollectorRegistration) {
+               reg.Priority = priority
+       }
+}
+
+// GetRegistrations 获取所有注册信息
+func (rc *RegistryCenter) GetRegistrations() map[string]*CollectorRegistration 
{
+       rc.mutex.RLock()
+       defer rc.mutex.RUnlock()
+
+       // 返回副本,避免外部修改
+       result := make(map[string]*CollectorRegistration)
+       for k, v := range rc.registrations {
+               result[k] = &CollectorRegistration{
+                       Protocol: v.Protocol,
+                       Factory:  v.Factory,
+                       Enabled:  v.Enabled,
+                       Priority: v.Priority,
+               }
+       }
+       return result
+}
+
+// GetRegistration 获取特定协议的注册信息
+func (rc *RegistryCenter) GetRegistration(protocol string) 
(*CollectorRegistration, bool) {
+       rc.mutex.RLock()
+       defer rc.mutex.RUnlock()
+
+       reg, exists := rc.registrations[protocol]
+       if !exists {
+               return nil, false
+       }
+
+       return &CollectorRegistration{
+               Protocol: reg.Protocol,
+               Factory:  reg.Factory,
+               Enabled:  reg.Enabled,
+               Priority: reg.Priority,
+       }, true
+}
+
+// SetEnabled 设置协议启用状态
+func (rc *RegistryCenter) SetEnabled(protocol string, enabled bool) bool {
+       rc.mutex.Lock()
+       defer rc.mutex.Unlock()
+
+       if reg, exists := rc.registrations[protocol]; exists {
+               reg.Enabled = enabled
+               return true
+       }
+       return false
+}
+
+// GetEnabledProtocols 获取启用的协议列表
+func (rc *RegistryCenter) GetEnabledProtocols() []string {
+       rc.mutex.RLock()
+       defer rc.mutex.RUnlock()
+
+       var enabled []string
+       for protocol, reg := range rc.registrations {
+               if reg.Enabled {
+                       enabled = append(enabled, protocol)
+               }
+       }
+       return enabled
+}
+
+// GetAllProtocols 获取所有协议列表
+func (rc *RegistryCenter) GetAllProtocols() []string {
+       rc.mutex.RLock()
+       defer rc.mutex.RUnlock()
+
+       protocols := make([]string, 0, len(rc.registrations))
+       for protocol := range rc.registrations {
+               protocols = append(protocols, protocol)
+       }
+       return protocols
+}
+
+// CreateCollectors 创建所有启用的采集器实例
+func (rc *RegistryCenter) CreateCollectors(logger logger.Logger) 
(map[string]basic.AbstractCollector, error) {
+       registrations := rc.GetRegistrations()
+
+       // 按优先级排序
+       sortedRegs := make([]*CollectorRegistration, 0)
+       for _, reg := range registrations {
+               if reg.Enabled {
+                       sortedRegs = append(sortedRegs, reg)
+               }
+       }
+
+       // 简单排序
+       for i := 0; i < len(sortedRegs)-1; i++ {
+               for j := i + 1; j < len(sortedRegs); j++ {
+                       if sortedRegs[i].Priority > sortedRegs[j].Priority {
+                               sortedRegs[i], sortedRegs[j] = sortedRegs[j], 
sortedRegs[i]
+                       }
+               }
+       }
+
+       collectors := make(map[string]basic.AbstractCollector)
+
+       for _, reg := range sortedRegs {
+               collector := reg.Factory(logger.WithName("collector-" + 
reg.Protocol))
+               if collector == nil {
+                       logger.Error(fmt.Errorf("工厂函数返回nil"), "采集器创建失败", 
"protocol", reg.Protocol)
+                       continue
+               }
+
+               // 验证协议匹配
+               if collector.SupportProtocol() != reg.Protocol {
+                       logger.Error(fmt.Errorf("协议不匹配"), "注册协议与实际协议不符",
+                               "expected", reg.Protocol,
+                               "actual", collector.SupportProtocol())
+                       continue
+               }
+
+               collectors[reg.Protocol] = collector
+               logger.Info("采集器创建成功",
+                       "protocol", reg.Protocol,
+                       "priority", reg.Priority)
+       }
+
+       logger.Info("采集器创建完成",
+               "成功", len(collectors),
+               "总数", len(sortedRegs))
+
+       return collectors, nil
+}
+
+// 全局便捷函数
+// RegisterCollectorFactory 注册采集器工厂到全局注册中心
+func RegisterCollectorFactory(protocol string, factory CollectorFactory, 
options ...RegistrationOption) {
+       globalCenter.RegisterFactory(protocol, factory, options...)
+}
+
+// GetRegisteredProtocols 获取全局注册中心的协议列表
+func GetRegisteredProtocols() []string {
+       return globalCenter.GetAllProtocols()
+}
+
+// GetEnabledProtocols 获取全局注册中心启用的协议列表
+func GetEnabledProtocols() []string {
+       return globalCenter.GetEnabledProtocols()
+}
+
+// EnableProtocol 启用协议
+func EnableProtocol(protocol string, logger logger.Logger) bool {
+       if globalCenter.SetEnabled(protocol, true) {
+               logger.Info("启用协议", "protocol", protocol)
+               return true
+       }
+       logger.Info("协议未注册,无法启用", "protocol", protocol)
+       return false
+}
+
+// DisableProtocol 禁用协议
+func DisableProtocol(protocol string, logger logger.Logger) bool {
+       if globalCenter.SetEnabled(protocol, false) {
+               logger.Info("禁用协议", "protocol", protocol)
+               return true
+       }
+       logger.Info("协议未注册,无法禁用", "protocol", protocol)
+       return false
+}
diff --git a/pkg/collector/server/server.go b/pkg/collector/server/server.go
deleted file mode 100644
index d1f0de7..0000000
--- a/pkg/collector/server/server.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package server
-
-import (
-       "context"
-       "os"
-
-       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/job"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/transport"
-       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
-       "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types"
-)
-
-const (
-       DefaultHertzBeatCollectorVersion = "0.0.1-DEV"
-)
-
-type Run interface {
-       Start(ctx context.Context) error
-       Close() error
-}
-
-// CollectorServer HertzBeat Collector Server
-type CollectorServer struct {
-       Version string
-       Logger  logger.Logger
-
-       job       *job.Server
-       transport *transport.Server
-}
-
-func NewCollectorServer(version string) *CollectorServer {
-
-       if version == "" {
-               version = DefaultHertzBeatCollectorVersion
-       }
-
-       return &CollectorServer{
-               Version: version,
-               Logger:  logger.DefaultLogger(os.Stdout, types.LogLevelDebug),
-       }
-}
-
-func (s *CollectorServer) Start(ctx context.Context) error {
-
-       s.Logger.Info("hi, starting collector server...")
-
-       // start job server
-       s.job = job.NewServer(s.Logger.WithName("job"))
-
-       // init and start transport server
-       s.transport = transport.NewServer(s.Logger.WithName("transport"))
-
-       // Wait until done
-       <-ctx.Done()
-
-       return nil
-}
-
-func (s *CollectorServer) Validate() error {
-
-       return nil
-}
-
-// Close Shutdown the server hook
-func (s *CollectorServer) Close() error {
-
-       s.Logger.Info("collector server shutting down... bye!")
-       return nil
-}
diff --git a/pkg/collector/server/server_test.go 
b/pkg/collector/server/server_test.go
deleted file mode 100644
index abb4e43..0000000
--- a/pkg/collector/server/server_test.go
+++ /dev/null
@@ -1 +0,0 @@
-package server
diff --git a/pkg/types/job/job_types.go b/pkg/types/job/job_types.go
index ef58676..50ee83d 100644
--- a/pkg/types/job/job_types.go
+++ b/pkg/types/job/job_types.go
@@ -32,3 +32,20 @@ type Job struct {
        PriorMetrics     []Metrics            `json:"-"`
        ResponseDataTemp []MetricsData        `json:"-"`
 }
+
+// GetNextCollectMetrics returns the metrics that should be collected next
+// This is a simplified version - in the full implementation this would handle
+// metric priorities, dependencies, and collection levels
+func (j *Job) GetNextCollectMetrics() []*Metrics {
+       result := make([]*Metrics, 0, len(j.Metrics))
+       for i := range j.Metrics {
+               result = append(result, &j.Metrics[i])
+       }
+       return result
+}
+
+// ConstructPriorMetrics prepares prior metrics for collection dependencies
+func (j *Job) ConstructPriorMetrics() {
+       j.PriorMetrics = make([]Metrics, len(j.Metrics))
+       copy(j.PriorMetrics, j.Metrics)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to